distributedShell 样例源码详解

简介

distributedShell 是 Yarn 自带的应用程序,和 MR 类似,当前工具可以用来对 Yarn 进行压测。

使用示例

参考命令如下:

./bin/hadoop jar ./share/hadoop/yarn/hadoop-yarn-applications-distributedshell-3.4.1.jar \ -jar ./share/hadoop/yarn/hadoop-yarn-applications-distributedshell-3.4.1.jar -shell_command \ '/bin/date' -num_containers 5

可以提交一个样例作业到 Yarn 上面。

源码阅读

当前样例的入口类是 org.apache.hadoop.yarn.applications.distributedshell.Client ,在 pom 文件里面默认定义了当前类为主类。所以在提交的时候可以不用指定主类。

<plugin> <artifactId>maven-jar-plugin</artifactId> <executions> <!-- 省略部分参数 --> </executions> <configuration> <archive> <manifest> <mainClass>org.apache.hadoop.yarn.applications.distributedshell.Client</mainClass> </manifest> </archive> </configuration> </plugin>

核心流程主要包含下面 3 个:

  • 初始化 CLient 对象
  • 初始化 Client
  • 提交作业到 yarn

其中前面两个主要在客户端,第 3 个主要是在 yarn 上面。

客户端提交核心代码

初始化

初始化阶段包括下面两部分:

  • 初始化 Client 对象,主要是创建 Yarn 的连接以及初始化支持的参数列表
  • 初始化 Client

下面是初始化 Client 对象的核心代码。

Client(String appMasterMainClass, Configuration conf) { this.conf = conf; this.conf.setBoolean( YarnConfiguration.YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER, true); this.appMasterMainClass = appMasterMainClass; // 创建和RM的连接 yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); opts = new Options(); // 初始化支持的参数列表 stopSignalReceived = new AtomicBoolean(false); isRunning = new AtomicBoolean(false); }

初始化 Client,在初始化 Client 阶段主要是读取命令行参数。

// 初始化Client函数入口 boolean doRun = client.init(args);

运行作业

首先还是建立和 Yarn 服务端的连接,为作业提交做准备。

isRunning.set(true); yarnClient.start();

在连接建立之后会查询并且在控制台打印 Yarn 服务端的一些信息。主要包含下面内容:

  • 当前集群 NM 的个数,通过 yarnClient.getYarnClusterMetrics() 查询到并且显示。
  • 当前集群中运行中 NM 的详细信息,通过 yarnClient.getNodeReports(NodeState.RUNNING) 查询到。
  • 当前任务提交的队列的详细信息,通过 yarnClient.getQueueInfo(this.amQueue) 查询到。
  • 当前集群的 ACL 信息,通过 yarnClient.getQueueAclsInfo() 查询。
  • 当前集群的 ResourceProfile 信息,通过 yarnClient.getResourceProfiles() 查询。

在打印完集群信息之后才是作业提交的开始。

提交作业之前,是需要先向 RM 申请 AppId 的。AppId 可以通过 YarnClientApplication app = yarnClient.createApplication(); 获取。作业提交信息一般都在 ApplicationSubmissionContext 里面,包含下面信息:

  • AM 申请资源的请求。通过 appContext.setAMContainerResourceRequests(amResourceRequests); 设置。

  • AM 的上下文信息:

    • 访问 hdfs 等所需要的 token。当前 token 会伴随着整个作业,直到作业结束才会异步销毁。
    • 需要本地话的文件。
    • AM 或者 Container 所需要的环境变量。
    • AM 的启动命令,AM 启动的类也是在这里指定的。类似于 java 运行 jar 或者某个主类。
  • App 名称。通过 appContext.setApplicationName(appName); 设置。

  • app tag 信息。

  • 资源标签信息。

  • 作业的优先级。

  • 作业提交的队列信息。

  • 日志聚合相关配置。主要是和日志归集的 Rolling 模式有关系。可以设置需要通过 rolling 的方式归集哪些日志。通过 appContext.setLogAggregationContext(logAggregationContext); 设置。

作业真正提交的代码只有一行:

yarnClient.submitApplication(appContext);

当前样例做到了作业所需要的信息可配置。是一个比较适合开发作业的样例。

AM 核心代码

AM 的核心代码是在 ApplicationMaster.java 里面的。在启动 AM 的时候会调用到当前函数的 main 函数。

在构造函数里面和 init 函数里面,主要是加载配置项以及命令行参数。真正运行的函数是 run,核心在 run 函数里面,

首先需要创建和 RM 以及 NM 的连接。

amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); amRMClient.init(conf); amRMClient.start(); containerListener = createNMCallbackHandler(); nmClientAsync = new NMClientAsyncImpl(containerListener); nmClientAsync.init(conf); nmClientAsync.start(); startTimelineClient(conf);

在 AM 启动 OK 了第一件事就是需要去 RM 上面注册,证明当前 AM 已经启动完成了。

RegisterApplicationMasterResponse response = amRMClient .registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl, placementConstraintMap);

普通 Container 的申请是在 AM 里面处理的,类似下面代码,下面代码是异步申请的。

ContainerRequest containerAsk = setupContainerAskForRM(); amRMClient.addContainerRequest(containerAsk);

当 Container 申请好之后,可以通过下面代码获取,在样例中触发 onContainerAllocated 事件。

List<Container> allocated = response.getAllocatedContainers(); if (!allocated.isEmpty()) { handler.onContainersAllocated(allocated); }

通过下面代码启动 Container.

ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( localResources, myShellEnv, commands, null, allTokens.duplicate(), null, containerRetryContext); nmClientAsync.startContainerAsync(container, ctx);

在作业结束的时候,AM 需要做下面事:

  • 停止 nmClient。
  • 从 RM 上取消 AppMaster
  • 停止 amClient。
nmClientAsync.stop(); try { amRMClient.unregisterApplicationMaster(appStatus, message, null); } catch (YarnException | IOException ex) { LOG.error("Failed to unregister application", ex); } amRMClient.stop();
  • Hadoop

    Hadoop 是由 Apache 基金会所开发的一个分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。

    93 引用 • 122 回帖 • 618 关注
  • Yarn
    13 引用 • 5 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
zeekling
应无所住,而生其心。 --《金刚经》 吾生也有涯,而知也无涯。 --《庄子》 宝鸡