[关闭]
@changedi 2017-04-18T09:09:51.000000Z 字数 13930 阅读 2109

HDFS启动

大数据 HDFS



所有的分析以单机安装的Hadoop版本2.6.4为例分析。步骤依赖于安装文档中的步骤,见Hadoop的单机安装

预制几个重要的脚本文件:
- 假设hadoop的安装目录在HADOOP_HOME。
- 重要的脚本文件hadoop-functions.sh。

步骤详解

格式化系统

第一步要:$ bin/hdfs namenode -format

主要执行HADOOP_HOME/bin/hdfs命令。其中设置了3个重要的变量名

  1. namenode)
  2. HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
  3. HADOOP_CLASSNAME='org.apache.hadoop.hdfs.server.namenode.NameNode'
  4. hadoop_add_param HADOOP_OPTS hdfs.audit.logger "-Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER}"
  5. ;;

然后最后执行

  1. hadoop_java_exec "${HADOOP_SUBCMD}" "${HADOOP_CLASSNAME}" "${HADOOP_SUBCMD_ARGS[@]}"

其中的hadoop_java_exec是hadoop-functions.sh中声明的一个函数,其作用就是启动java进程执行command。

  1. function hadoop_java_exec
  2. {
  3. # run a java command. this is used for
  4. # non-daemons
  5. local command=$1
  6. local class=$2
  7. shift 2
  8. hadoop_debug "Final CLASSPATH: ${CLASSPATH}"
  9. hadoop_debug "Final HADOOP_OPTS: ${HADOOP_OPTS}"
  10. hadoop_debug "Final JAVA_HOME: ${JAVA_HOME}"
  11. hadoop_debug "java: ${JAVA}"
  12. hadoop_debug "Class name: ${class}"
  13. hadoop_debug "Command line options: $*"
  14. export CLASSPATH
  15. #shellcheck disable=SC2086
  16. exec "${JAVA}" "-Dproc_${command}" ${HADOOP_OPTS} "${class}" "$@"
  17. }

所以,整个命令的链路核心目标就是执行org.apache.hadoop.hdfs.server.namenode.NameNode类的main函数,传递的参数为format。

  1. public static void main(String argv[]) throws Exception {
  2. if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
  3. System.exit(0);
  4. }
  5. try {
  6. StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
  7. NameNode namenode = createNameNode(argv, null);
  8. if (namenode != null) {
  9. namenode.join();
  10. }
  11. } catch (Throwable e) {
  12. LOG.error("Failed to start namenode.", e);
  13. terminate(1, e);
  14. }
  15. }

其中startupShutdownMessage方法会打印一些启动信息到控制台,同时如果是unix系统,会注册logger到signal,在接受 { "TERM", "HUP", "INT" }信号时打印错误日志。这样做的意义在于当有系统信号触发进程结束时,可以根据日志来判断是什么原因退出进程的。

  1. if (SystemUtils.IS_OS_UNIX) {
  2. try {
  3. SignalLogger.INSTANCE.register(LOG);
  4. } catch (Throwable t) {
  5. LOG.warn("failed to register any UNIX signal loggers: ", t);
  6. }

接下来就是createNameNode了,首先解析出-format参数为StartOption.FORMAT,然后执行format方法,由于没有指定cluster,所以系统new一个clusterId,比如形如CID-d2425dab-c066-4a67-954f-32228c22abe6。

  1. private static boolean format(Configuration conf, boolean force,
  2. boolean isInteractive) throws IOException {
  3. String nsId = DFSUtil.getNamenodeNameServiceId(conf);
  4. String namenodeId = HAUtil.getNameNodeId(conf, nsId);
  5. initializeGenericKeys(conf, nsId, namenodeId);
  6. checkAllowFormat(conf);
  7. if (UserGroupInformation.isSecurityEnabled()) {
  8. InetSocketAddress socAddr = DFSUtilClient.getNNAddress(conf);
  9. SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
  10. DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
  11. }
  12. Collection<URI> nameDirsToFormat = FSNamesystem.getNamespaceDirs(conf);
  13. List<URI> sharedDirs = FSNamesystem.getSharedEditsDirs(conf);
  14. List<URI> dirsToPrompt = new ArrayList<URI>();
  15. dirsToPrompt.addAll(nameDirsToFormat);
  16. dirsToPrompt.addAll(sharedDirs);
  17. List<URI> editDirsToFormat =
  18. FSNamesystem.getNamespaceEditsDirs(conf);
  19. // if clusterID is not provided - see if you can find the current one
  20. String clusterId = StartupOption.FORMAT.getClusterId();
  21. if(clusterId == null || clusterId.equals("")) {
  22. //Generate a new cluster id
  23. clusterId = NNStorage.newClusterID();
  24. }
  25. System.out.println("Formatting using clusterid: " + clusterId);
  26. FSImage fsImage = new FSImage(conf, nameDirsToFormat, editDirsToFormat);
  27. try {
  28. FSNamesystem fsn = new FSNamesystem(conf, fsImage);
  29. fsImage.getEditLog().initJournalsForWrite();
  30. if (!fsImage.confirmFormat(force, isInteractive)) {
  31. return true; // aborted
  32. }
  33. fsImage.format(fsn, clusterId);
  34. } catch (IOException ioe) {
  35. LOG.warn("Encountered exception during format: ", ioe);
  36. fsImage.close();
  37. throw ioe;
  38. }
  39. return false;
  40. }

接下来构造一个FSImage,设置默认的checkpoint目录,设置存储以及初始化edit log。其中NNStorage负责管理存储目录,FSEditLog是edit log对象。

  1. protected FSImage(Configuration conf,
  2. Collection<URI> imageDirs,
  3. List<URI> editsDirs)
  4. throws IOException {
  5. this.conf = conf;
  6. storage = new NNStorage(conf, imageDirs, editsDirs);
  7. if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,
  8. DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) {
  9. storage.setRestoreFailedStorage(true);
  10. }
  11. this.editLog = FSEditLog.newInstance(conf, storage, editsDirs);
  12. archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
  13. }

有了文件系统镜像,就可以构造FSNamesystem了,这是一个namespace状态存储的容器,负责承载NameNode的一切记录性质的工作。具体的构造函数代码较长,这里就不贴明细了。具体分析一下步骤:
1. 先创建KeyProvider,我们这个例子没有安全模式,因此no KeyProvider found。
2. 读取dfs.namenode.fslock.fair,构造FSNamesystemLock,默认true,即公平读写锁。
3. 设置用户和权限
4. check 是否HA
5. 初始化BlockManager及其代理的一堆manager,包括:DatanodeManager(管理DataNode的下线[DecommissionManager]和其他活动),HeartbeatManager(管理从datanode接收到的心跳),BlockIdManager(分配和管理GenerationStamp和block id)等。
6. 构造FSDirectory,这是个纯内存的结构,用来和FSNamesystem一起管理NameNode,构造INode。
7. 初始化CacheManager来管理DataNode的cache。
8. 初始化RetryCache。cache了一些非幂等的被RPCserver成功处理的请求,用以处理重试。

至此FSNamesystem初始化完成,最后执行FSImage的format方法,进行格式化。然后shutdown NameNode。

启动NameNode和DataNode的进程

第二步就是启动NameNode和DataNode了,具体脚本如下:

  1. $ sbin/start-dfs.sh

NameNode启动

脚本核心代码:

  1. #---------------------------------------------------------
  2. # namenodes
  3. NAMENODES=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -namenodes 2>/dev/null)
  4. if [[ -z "${NAMENODES}" ]]; then
  5. NAMENODES=$(hostname)
  6. fi
  7. echo "Starting namenodes on [${NAMENODES}]"
  8. hadoop_uservar_su hdfs namenode "${HADOOP_HDFS_HOME}/bin/hdfs" \
  9. --workers \
  10. --config "${HADOOP_CONF_DIR}" \
  11. --hostnames "${NAMENODES}" \
  12. --daemon start \
  13. namenode ${nameStartOpt}
  14. HADOOP_JUMBO_RETCOUNTER=$?

也就是先hdfs getconf -namenodes来查询配置列出所有NameNode。然后执行hdfs namenode来启动NameNode。根据上面的分析,我们知道hdfs脚本就是启动对应命令的java进程,namenode子命令还是对应NameNode类的main方法,具体执行的其他步骤一样,只是在createNameNode时,因为参数不同而导致逻辑不同。因为启动脚本里namenode没有其他参数,因此启动默认逻辑

  1. default: {
  2. DefaultMetricsSystem.initialize("NameNode");
  3. return new NameNode(conf);
  4. }

核心就是NameNode的构造方法。其首先通过setClientNamenodeAddress方法设置NameNode的地址,默认的就是fs.defaultFS配置对应的值hdfs://localhost:9000。

接着初始化NameNode

  1. protected void initialize(Configuration conf) throws IOException {
  2. if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
  3. String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
  4. if (intervals != null) {
  5. conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
  6. intervals);
  7. }
  8. }
  9. UserGroupInformation.setConfiguration(conf);
  10. loginAsNameNodeUser(conf);
  11. NameNode.initMetrics(conf, this.getRole());
  12. StartupProgressMetrics.register(startupProgress);
  13. pauseMonitor = new JvmPauseMonitor();
  14. pauseMonitor.init(conf);
  15. pauseMonitor.start();
  16. metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
  17. if (NamenodeRole.NAMENODE == role) {
  18. startHttpServer(conf);
  19. }
  20. loadNamesystem(conf);
  21. rpcServer = createRpcServer(conf);
  22. initReconfigurableBackoffKey();
  23. if (clientNamenodeAddress == null) {
  24. // This is expected for MiniDFSCluster. Set it now using
  25. // the RPC server's bind address.
  26. clientNamenodeAddress =
  27. NetUtils.getHostPortString(getNameNodeAddress());
  28. LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
  29. + " this namenode/service.");
  30. }
  31. if (NamenodeRole.NAMENODE == role) {
  32. httpServer.setNameNodeAddress(getNameNodeAddress());
  33. httpServer.setFSImage(getFSImage());
  34. }
  35. startCommonServices(conf);
  36. startMetricsLogger(conf);
  37. }

几个比较重要的步骤,其中startHttpServer会启动一个httpServer,默认地址是http://0.0.0.0:50070。HDFS的默认httpserver是一个Jetty服务器,启动httpserver后,打开页面可以看到整个hdfs的监控情况。然后加载Namesystem,先check参数,由于本地启动,会收到这样两个警告:

  1. 2017-02-11 21:59:28,765 WARN org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Only one image storage directory (dfs.namenode.name.dir) configured. Beware of data loss due to lack of redundant storage
  2. directories!
  3. 2017-02-11 21:59:28,765 WARN org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Only one namespace edits storage directory (dfs.namenode.edits.dir) configured. Beware of data loss due to lack of redunda
  4. nt storage directories!

无视存储和editlog的存储单目录问题,接下来和format逻辑一样,要构造FSNamesystem。接着就是loadFSImage,FSImage加载后需要判断是否保存,其逻辑上是

  1. final boolean needToSave = staleImage && !haEnabled && !isRollingUpgrade();

由于单机模式,这几个值都是false,因此needToSave也是false,所以不会进行fsImage的saveNamespace方法。

结束后会看到一行日志:

  1. 2017-02-11 21:59:29,472 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Finished loading FSImage in 349 msecs

表示FSImage加载完毕。

后面跟着初始化RPC server。具体对应的类是RPC.Server,基于Protobuf的一个客户端rpc服务器。

方法的最后两行,startCommonServices会启动所有的*manager和httpServer以及rpcServer,还有如果有配置ServicePlugin,每个plugin也会启动。而startMetricsLogger开启日志记录

DataNode启动

启动脚本

  1. #---------------------------------------------------------
  2. # datanodes (using default workers file)
  3. echo "Starting datanodes"
  4. hadoop_uservar_su hdfs datanode "${HADOOP_HDFS_HOME}/bin/hdfs" \
  5. --workers \
  6. --config "${HADOOP_CONF_DIR}" \
  7. --daemon start \
  8. datanode ${dataStartOpt}
  9. (( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? ))

执行无参数的hdfs datanode。DataNode存储了一系列的block来存放实际的文件数据。DataNode会和NameNode通信,且也会和其他DataNode甚至客户端来通信。DataNode只维护了一个关系block到bytes流的映射关系。

具体DataNode的初始化,首先先初始MetricSystem。接着进入核心的代码段——DataNode的构造函数:

  1. DataNode(final Configuration conf,
  2. final List<StorageLocation> dataDirs,
  3. final StorageLocationChecker storageLocationChecker,
  4. final SecureResources resources) throws IOException {
  5. super(conf);
  6. this.tracer = createTracer(conf);
  7. this.tracerConfigurationManager =
  8. new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
  9. this.fileIoProvider = new FileIoProvider(conf, this);
  10. this.blockScanner = new BlockScanner(this);
  11. this.lastDiskErrorCheck = 0;
  12. this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
  13. DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
  14. this.usersWithLocalPathAccess = Arrays.asList(
  15. conf.getTrimmedStrings(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY));
  16. this.connectToDnViaHostname = conf.getBoolean(
  17. DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
  18. DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
  19. this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
  20. DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
  21. this.isPermissionEnabled = conf.getBoolean(
  22. DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
  23. DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
  24. this.pipelineSupportECN = conf.getBoolean(
  25. DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED,
  26. DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED_DEFAULT);
  27. confVersion = "core-" +
  28. conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +
  29. ",hdfs-" +
  30. conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED");
  31. this.volumeChecker = new DatasetVolumeChecker(conf, new Timer());
  32. // Determine whether we should try to pass file descriptors to clients.
  33. if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
  34. HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT)) {
  35. String reason = DomainSocket.getLoadingFailureReason();
  36. if (reason != null) {
  37. LOG.warn("File descriptor passing is disabled because " + reason);
  38. this.fileDescriptorPassingDisabledReason = reason;
  39. } else {
  40. LOG.info("File descriptor passing is enabled.");
  41. this.fileDescriptorPassingDisabledReason = null;
  42. }
  43. } else {
  44. this.fileDescriptorPassingDisabledReason =
  45. "File descriptor passing was not configured.";
  46. LOG.debug(this.fileDescriptorPassingDisabledReason);
  47. }
  48. this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
  49. try {
  50. hostName = getHostName(conf);
  51. LOG.info("Configured hostname is " + hostName);
  52. startDataNode(dataDirs, resources);
  53. } catch (IOException ie) {
  54. shutdown();
  55. throw ie;
  56. }
  57. final int dncCacheMaxSize =
  58. conf.getInt(DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY,
  59. DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT) ;
  60. datanodeNetworkCounts =
  61. CacheBuilder.newBuilder()
  62. .maximumSize(dncCacheMaxSize)
  63. .build(new CacheLoader<String, Map<String, Long>>() {
  64. @Override
  65. public Map<String, Long> load(String key) throws Exception {
  66. final Map<String, Long> ret = new HashMap<String, Long>();
  67. ret.put("networkErrors", 0L);
  68. return ret;
  69. }
  70. });
  71. initOOBTimeout();
  72. this.storageLocationChecker = storageLocationChecker;
  73. }

而其中最重要的就是startDataNode方法。其核心步骤摘要如下:
1. 注册MBean
2. 创建一个TcpPeerServer,监听50010端口。该server负责和Client和其他DataNode通信。此server不使用Hadoop的IPC机制
3. 启动JvmPauseManager,用于记录Jvm的暂停,发现则log一条
4. 初始化IpcServer,监听50020端口。
5. 构造一个BPOfferService线程,然后启动线程。BPServiceActor是这样一个线程,它会先和NameNode进行握手做预注册,接下来注册DataNode到NameNode,然后周期性的发送心跳给NameNode,并处理接收到的response命令。
具体描述步骤5,就是如下代码:

  1. public void run() {
  2. LOG.info(this + " starting to offer service");
  3. try {
  4. while (true) {
  5. // init stuff
  6. try {
  7. // setup storage
  8. connectToNNAndHandshake();
  9. break;
  10. } catch (IOException ioe) {
  11. // Initial handshake, storage recovery or registration failed
  12. runningState = RunningState.INIT_FAILED;
  13. if (shouldRetryInit()) {
  14. // Retry until all namenode's of BPOS failed initialization
  15. LOG.error("Initialization failed for " + this + " "
  16. + ioe.getLocalizedMessage());
  17. sleepAndLogInterrupts(5000, "initializing");
  18. } else {
  19. runningState = RunningState.FAILED;
  20. LOG.error("Initialization failed for " + this + ". Exiting. ", ioe);
  21. return;
  22. }
  23. }
  24. }
  25. runningState = RunningState.RUNNING;
  26. if (initialRegistrationComplete != null) {
  27. initialRegistrationComplete.countDown();
  28. }
  29. while (shouldRun()) {
  30. try {
  31. offerService();
  32. } catch (Exception ex) {
  33. LOG.error("Exception in BPOfferService for " + this, ex);
  34. sleepAndLogInterrupts(5000, "offering service");
  35. }
  36. }
  37. runningState = RunningState.EXITED;
  38. } catch (Throwable ex) {
  39. LOG.warn("Unexpected exception in block pool " + this, ex);
  40. runningState = RunningState.FAILED;
  41. } finally {
  42. LOG.warn("Ending block pool service for: " + this);
  43. cleanUp();
  44. }
  45. }

下面具体分析一下BPServiceActor线程做的几件事:
1. 发送versionRequest请求给NameNode,来获取NameNode的namespace和版本信息。响应得到一个NamespaceInfo。
2. 利用NamespaceInfo初始化Storage,初始化之前先做格式化format。初始化后生成一个uuid,具体可以看到如下的日志:

2017-02-11 21:59:33,901 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Setting up storage: nsid=537369943;bpid=BP-503975772-192.168.0.109-1486821555429;lv=-56;nsInfo=lv=-60;cid=CID-c79cc043-b282-435c-a0f6-d5a55b23e87e;nsid=537369943;c=0;bpid=BP-503975772-192.168.0.109-1486821555429;dnuuid=null
2017-02-11 21:59:33,902 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Generated and persisted new Datanode UUID 43ed99d1-20c6-4d71-919c-e9a70cb75c6e

3. 真实握手,发送registerDatanode请求给NameNode。这时NameNode会处理这个请求,利用DataNodeManager来进行registerDatanode。这时在NameNode日志会看到如下的日志:

2017-02-11 21:59:34,090 INFO org.apache.hadoop.hdfs.StateChange: BLOCK* register
Datanode: from DatanodeRegistration(127.0.0.1, datanodeUuid=43ed99d1-20c6-4d71-9
19c-e9a70cb75c6e, infoPort=50075, ipcPort=50020, storageInfo=lv=-56;cid=CID-c79c
c043-b282-435c-a0f6-d5a55b23e87e;nsid=537369943;c=0) storage 43ed99d1-20c6-4d71-
919c-e9a70cb75c6e
2017-02-11 21:59:34,099 INFO org.apache.hadoop.hdfs.server.blockmanagement.Datan
odeDescriptor: Number of failed storage changes from 0 to 0
2017-02-11 21:59:34,100 INFO org.apache.hadoop.net.NetworkTopology: Adding a new
 node: /default-rack/127.0.0.1:50010
 2017-02-11 21:59:34,189 INFO org.apache.hadoop.hdfs.server.blockmanagement.Datan
odeDescriptor: Number of failed storage changes from 0 to 0
2017-02-11 21:59:34,189 INFO org.apache.hadoop.hdfs.server.blockmanagement.Datan
odeDescriptor: Adding new storage ID DS-7d302778-acd6-4366-be5e-9dbf7ad22c4d for
 DN 127.0.0.1:50010

4. 调用offerService方法,开始周期性发送心跳。每个心跳包都包含几个内容:DataNode名字、数据传输端口、总容量和剩余bytes。然后NameNode接受到心跳后开始handleHeartbeat。

至此,整个NameNode和DataNode都开始正常工作,整个HDFS的启动结束。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注