BPServiceActor 详解

本贴最后更新于 276 天前,其中的信息可能已经东海扬尘

简介

BPServiceActor 主要在 DataNode 中用于和 NameNode 沟通的类。主要功能如下:

  • 与 namenode 进行预注册握手。
  • 向 namenode 注册。
  • 定期向 namenode 发送心跳。
  • 处理从 namenode 收到的命令。

核心功能

BPServiceActor 的入口函数为 start 函数,当前类本身为 runnable 接口的实现类,所以在 start 函数里面新建了 BPServiceActor 线程,并且将其启动,
所以其真实的启动函数为 run()

在 run 函数里面主要做了连接 NameNode 并且注册当前 DataNode 的事。

与 NameNode 握手

首先要做的就是和 NameNode 建立连接,核心代码如下:

bpNamenode = dn.connectToNN(nnAddr);

建立连接之后需要做的就是获取获取版本信息并且检查版本信息,如果单次获取失败会进行重试。失败之后每次的重试间隔为 5s。

NamespaceInfo nsInfo = retrieveNamespaceInfo(); bpos.verifyAndSetNamespaceInfo(this, nsInfo);

到此与 NameNode 之间的握手结束。开始注册当前的 DataNode 到 NameNode。

注册 DataNode

DataNode 注册核心逻辑

注册 DataNode 的核心函数是 register

void register(NamespaceInfo nsInfo) throws IOException { // The handshake() phase loaded the block pool storage // off disk - so update the bpRegistration object from that info DatanodeRegistration newBpRegistration = bpos.createRegistration(); LOG.info(this + " beginning handshake with NN"); while (shouldRun()) { try { // 向NN注册DataNodde newBpRegistration = bpNamenode.registerDatanode(newBpRegistration); newBpRegistration.setNamespaceInfo(nsInfo); bpRegistration = newBpRegistration; break; // ... 省略部分异常处理 } } if (bpRegistration == null) { throw new IOException("DN shut down before block pool registered"); } // .. 省略 .. // reset lease id whenever registered to NN. // ask for a new lease id at the next heartbeat. fullBlockReportLeaseId = 0; // random short delay - helps scatter the BR from all DatanodeRegistration // 定时汇报block块信息给NN scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs, true); }

NameNode 处理注册信息

NameNode 处理 DataNode 的注册信息,主要接口是在 NameNodeRpcServer 的 ipc 接口 registerDatanode。
其主要的实现实在 DatanodeManager 中的函数 registerDatanode。

主要实现如下:

// update cluster map getNetworkTopology().remove(nodeS); if(shouldCountVersion(nodeS)) { decrementVersionCount(nodeS.getSoftwareVersion()); } nodeS.updateRegInfo(nodeReg); nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion()); nodeS.setDisallowed(false); // Node is in the include list // resolve network location if(this.rejectUnresolvedTopologyDN) { nodeS.setNetworkLocation(resolveNetworkLocation(nodeS)); nodeS.setDependentHostNames(getNetworkDependencies(nodeS)); } else { nodeS.setNetworkLocation( resolveNetworkLocationWithFallBackToDefaultLocation(nodeS)); nodeS.setDependentHostNames( getNetworkDependenciesWithDefault(nodeS)); } getNetworkTopology().add(nodeS); resolveUpgradeDomain(nodeS); // also treat the registration message as a heartbeat heartbeatManager.register(nodeS); incrementVersionCount(nodeS.getSoftwareVersion()); startAdminOperationIfNecessary(nodeS); success = true;

主要做了下面两件事:

  • 更新节点信息,比如网络拓扑信息。
  • 在心跳管理里面注册当前节点。只要将节点信息添加到节点列表里面即可。

发送心跳

处理 NN 的命令

  • Hadoop

    Hadoop 是由 Apache 基金会所开发的一个分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。

    89 引用 • 122 回帖 • 617 关注
1 操作
zeekling 在 2024-07-21 23:44:57 更新了该帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...