概述
Spark Application 只有遇到 action 操作时才会真正的提交任务并进行计算,DAGScheduler 会根据各个 RDD 之间的依赖关系形成一个 DAG,并根据 ShuffleDependency 来进行 stage 的划分,stage 包含多个 tasks,个数由该 stage 的 finalRDD 决定,stage 里面的 task 完全相同,DAGScheduler 完成 stage 的划分后基于每个 Stage 生成 TaskSet,并提交给 TaskScheduler,TaskScheduler 复杂具体的 task 的调度,在 Worker 节点上启动 task。
Job 的提交
以 count 为例,直接看源码都有哪些步骤:
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
DAGScheduler#runJob
DAGScheduler#runJob
DAGScheduler#runJob
DAGScheduler#dagScheduler.runJob
DAGScheduler#submitJob
eventProcessLoop.post(JobSubmitted(**))
eventProcessLoop 是一个 DAGSchedulerEventProcessLoop(this)对象,可以把 DAGSchedulerEventProcessLoop 理解成 DAGScheduler 的对外的功能接口。它对外隐藏了自己内部实现的细节。无论是内部还是外部消息,DAGScheduler 可以共用同一消息处理代码,逻辑清晰,处理方式统一。
eventProcessLoop 接收各种消息并进行处理,处理的逻辑在其 doOnReceive 方法中:
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
......
}
当提交的是 JobSubmitted,便会通过 dagScheduler.handleJobSubmitted 处理此事件。
Stage 的划分
在 handleJobSubmitted 方法中第一件事情就是通过 finalRDD 向前追溯对 Stage 的划分。
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
//Stage划分过程是从最后一个Stage开始往前执行的,最后一个Stage的类型是ResultStage
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
//为此job生成一个ActiveJob对象
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job //记录该job处于active状态
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post( //向LiveListenerBus发送Job提交事件
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage) //提交Stage
submitWaitingStages()
}
跟进 newResultStage 方法:
private def newResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId) //获取stage的parentstage
val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
stageIdToStage(id) = stage //将Stage和stage_id关联
updateJobIdStageIdMaps(jobId, stage) //跟新job所包含的stage
stage
}
直接实例化一个 ResultStage,但需要 parentStages 作为参数,我们看看 getParentStagesAndId 做了什么:
private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
val parentStages = getParentStages(rdd, firstJobId)
val id = nextStageId.getAndIncrement()
(parentStages, id)
}
获取 parentStages,并返回一个与 stage 关联的唯一 id,由于是递归的向前生成 stage,所以最先生成的 stage 是最前面的 stage,越往前的 stageId 就越小,即父 Stage 的 id 最小。继续跟进 getParentStages:
private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
val parents = new HashSet[Stage] // 当前Stage的所有parent Stage
val visited = new HashSet[RDD[_]] // 已经访问过的RDD
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]] //等待访问的RDD
def visit(r: RDD[_]) {
if (!visited(r)) { //若未访问过
visited += r //标记已被访问
// Kind of ugly: need to register RDDs with the cache here since
// we can't do it in its constructor because # of partitions is unknown
for (dep <- r.dependencies) { //遍历其所有依赖
dep match {
case shufDep: ShuffleDependency[_, _, _] => //若为宽依赖,则生成新的Stage,shuffleMapstage
parents += getShuffleMapStage(shufDep, firstJobId)
case _ => //若为窄依赖(归为当前Stage),压入栈,继续向前循环,直到遇到宽依赖或者无依赖
waitingForVisit.push(dep.rdd)
}
}
}
}
waitingForVisit.push(rdd) //将当前rdd压入栈
while (waitingForVisit.nonEmpty) { //等待访问的rdd不为空时继续访问
visit(waitingForVisit.pop())
}
parents.toList
}
通过给定的 RDD 返回其依赖的 Stage 集合。通过 RDD 每一个依赖进行遍历,遇到窄依赖就继续往前遍历,遇到 ShuffleDependency 便通过 getShuffleMapStage 返回一个 ShuffleMapStage 对象添加到父 Stage 列表中。可见,这里的 parentStage 是 Stage 直接依赖的父 stages(其 Stage 也有自己的 parentStage),而不是整个 DAG 的所有 stages。继续跟进 getShuffleMapStage 的实现:
private def getShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage //若已经在shuffleToMapStage存在直接返回Stage
case None => //不存在需要生成新的Stage
//为当前shuffle的父shuffle都生成一个ShuffleMapStage
getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
if (!shuffleToMapStage.contains(dep.shuffleId)) {
shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) //跟新shuffleToMapStage映射
}
}
// 为当前shuffle生成新的Stage
val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage
stage
}
}
先从 shuffleToMapStage 根据 shuffleid 获取 Stage,若未获取到再去计算,第一次都肯定为 None,我们先看 getAncestorShuffleDependencies 干了什么:
private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
val parents = new Stack[ShuffleDependency[_, _, _]] // 当前shuffleDependency所有的祖先ShuffleDependency(不是直接ShuffleDependency)
val visited = new HashSet[RDD[_]] // 已经被访问过的RDD
// 等待被访问的RDD
val waitingForVisit = new Stack[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) { //未被访问过
visited += r //标记已被访问
for (dep <- r.dependencies) { //遍历直接依赖
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
if (!shuffleToMapStage.contains(shufDep.shuffleId)) { // 若为shuffleDependency并且还没有映射,则添加到parents
parents.push(shufDep)
}
case _ =>
}
waitingForVisit.push(dep.rdd) //即使是shuffleDependency的rdd也要继续遍历
}
}
}
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
parents
}
貌似和 getParentStages 方法很像,区别是这里获取的所有祖先 ShuffleDependency,而不是直接父 ShuffleDependency。
为当前 shuffle 的父 shuffle 都生成一个 ShuffleMapStage 后再通过 newOrUsedShuffleStage 获取当前依赖的 shuffleStage,再和 shuffleid 关联起来,看 newOrUsedShuffleStage 的实现:
private def newOrUsedShuffleStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd //依赖对应的rdd
val numTasks = rdd.partitions.length //分区个数
val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite) //返回当前rdd的shufflestage
if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
//如果当前shuffle已经在MapOutputTracker中注册过,也就是Stage已经被计算过,从MapOutputTracker中获取计算结果
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
(0 until locs.length).foreach { i => // 更新Shuffle的Shuffle Write路径
if (locs(i) ne null) {
// locs(i) will be null if missing
stage.addOutputLoc(i, locs(i))
}
}
} else { //还没有被注册计算过
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) //注册
}
stage
}
继续看 newShuffleMapStage:
private def newShuffleMapStage(
rdd: RDD[_],
numTasks: Int,
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int,
callSite: CallSite): ShuffleMapStage = {
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId) //获取parentstages即stageid
val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
firstJobId, callSite, shuffleDep) //实例化一个shuffleStage对象
stageIdToStage(id) = stage //Stage和id关联
updateJobIdStageIdMaps(firstJobId, stage) //跟新job所有的Stage
stage
}
怎么和 newResultStage 极其的相似?是的没错,这里会生成 ShuffleStage,getParentStagesAndId 里面的实现就是一个递归调用。
由 finalRDD 往前追溯递归生成 Stage,最前面的 ShuffleStage 先生成,最终生成 ResultStage,至此,DAGScheduler 对 Stage 的划分已经完成。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于