简介
ContainerManager 主要负责 NM 中管理所有 Container 生命周期,其主要包含启动 Container、恢复 Container、停止 Container 等功能。
主要功能由 ContainerManagerImpl 类实现,具体代码可以参考当前类。
初始化
初始化主要分为两部分:
ContainerManagerImpl 实例的构造函数和 serviceInit 函数。
构造函数
当前函数为构造函数,主要初始化必须要的一些变量等。
- dispatcher : 事件的中央调度器,主要用于管理单个 Container 的生命周期。在当前函数里面会通过
dispatcher.register()
注册支持的事件。 - containersLauncher: 主要用于启动 Container,可以通过配置项 yarn.nodemanager.containers-launcher.class 指定。默认为 containersLauncher.class
serviceInit 函数
主要是服务启动时的初始化函数,ContainerManager 在 NodeManager 内部属于一个服务。所以初始化的时候会调用这个函数初始化一些服务相关的东西。
在这个函数里面总结下来主要做了几件事:
- 继续初始化一些事件,主要包含 LogHandlerEventType、SharedCacheUploadEventType。
- 初始化 AMRMProxyService。
- 从本地的 LevelDB 恢复 Container 信息。
恢复当前 NodeManager 的所有作业信息
第一步:恢复 Application 信息
首先是从 LevelDB 里面加载 Application 信息。循环加载。
RecoveredApplicationsState appsState = stateStore.loadApplicationsState();
try (RecoveryIterator<ContainerManagerApplicationProto> rasIterator =
appsState.getIterator()) {
while (rasIterator.hasNext()) {
ContainerManagerApplicationProto proto = rasIterator.next();
LOG.debug("Recovering application with state: {}", proto);
recoverApplication(proto);
}
}
加载 Application 的时候会将 Application 的上下文信息从 LevelDB 里面读出来,通过上下文信息等初始化新的 ApplicationImpl,并且触发 ApplicationInitEvent 事件。
会根据当前作业上下文中实际的状态等信息跳转到实际的状态。
ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), fc,
appId, creds, context, p.getAppLogAggregationInitedTime());
context.getApplications().put(appId, app);
metrics.runningApplication();
app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
第二步:恢复所有的 Container 信息
第二步是从 LevelDB 里面加载 Container 信息。循环加载。
try (RecoveryIterator<RecoveredContainerState> rcsIterator =
stateStore.getContainerStateIterator()) {
while (rcsIterator.hasNext()) {
RecoveredContainerState rcs = rcsIterator.next();
LOG.debug("Recovering container with state: {}", rcs);
recoverContainer(rcs);
}
}
recoverContainer 函数用于恢复单个 Container 信息。对于已经存在的 Application 对应的 Container 会通过 LevelDB 里面加载到的信息初始化 Container 对象,
将其加到所有 Container 的列表里面并且触发 ApplicationContainerInitEvent,后续会根据实际状态信息跳转到指定状态继续处理。
Container container = new ContainerImpl(getConfig(), dispatcher,
launchContext, credentials, metrics, token, context, rcs);
context.getContainers().put(token.getContainerID(), container);
containerScheduler.recoverActiveContainer(container, rcs);
app.handle(new ApplicationContainerInitEvent(container));
如果发现作业的状态为 KILL 状态,则会为当前 Container 重新触发 KILL 事件,保证 Container 已经停止。
对于 Application 找不见的 Container,认为作业已经结束了,直接标记为已经完成。
在恢复完成之后会触发事件: ContainerSchedulerEventType.RECOVERY_COMPLETED
此状态会重新拉起所有的 Container。
启动 Containers
获取 NMToken
在 Container 启动之前需要获取 NMToken,可以通过下面命令获取,一般情况下获取第一个 NMTokenIdentifier 类型的 Token。
Set<TokenIdentifier> tokenIdentifiers = remoteUgi.getTokenIdentifiers();
开始启动 Container
启动之前需要做的就是初始化 ContainerImpl 信息,方便后续启动 Container。
Container container =
new ContainerImpl(getConfig(), this.dispatcher,
launchContext, credentials, metrics, containerTokenIdentifier,
context, containerStartTime);
如果是第一次启动(满足条件:!context.getApplications().containsKey(applicationID))
,也就是 AM,会通过下面命令触发作业的启动:
context.getNMStateStore().storeApplication(applicationID,
buildAppProto(applicationID, user, credentials, appAcls,
logAggregationContext, flowContext));
dispatcher.getEventHandler().handle(new ApplicationInitEvent(
applicationID, appAcls, logAggregationContext));
满足下面条件则是恢复 Application:
containerTokenIdentifier.getContainerType() == ContainerType.APPLICATION_MASTER && context.getApplications().containsKey(applicationID))
启动 Container 主要是触发 Container 的 Init 事件:
this.context.getNMStateStore().storeContainer(containerId,
containerTokenIdentifier.getVersion(), containerStartTime, request);
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于