[spark] Checkpoint 源码解析

本贴最后更新于 2601 天前,其中的信息可能已经斗转星移

前言

在 spark 应用程序中,常常会遇到运算量很大经过很复杂的 Transformation 才能得到的 RDD 即 Lineage 链较长、宽依赖的 RDD,此时我们可以考虑将这个 RDD 持久化。

cache 也是可以持久化到磁盘,只不过是直接将 partition 的输出数据写到磁盘,而 checkpoint 是在逻辑 job 完成后,若有需要 checkpoint 的 RDD,再单独启动一个 job 去完成 checkpoint,这样该 RDD 就被计算了两次,所以建议在有 checkpoint 的时候先将该 RDD cache 到内存,到时候直接写到磁盘就行了。

checkpoint 的实现

需要使用 checkpoint 都需要通过 sparkcontext 的 setCheckpointDir 方法设置一个目录以存 checkpoint 的各种信息数据,下面我们来看看该方法:

def setCheckpointDir(directory: String) {
    if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
      logWarning("Spark is not running in local mode, therefore the checkpoint directory " +
        s"must not be on the local filesystem. Directory '$directory' " +
        "appears to be on the local filesystem.")
    }
    checkpointDir = Option(directory).map { dir =>
      val path = new Path(dir, UUID.randomUUID().toString)
      val fs = path.getFileSystem(hadoopConfiguration)
      fs.mkdirs(path)
      fs.getFileStatus(path).getPath.toString
    }
  }

在非 local 模式下,directory 必须是 HDFS 的目录;在该目录下创建一个以 UUID 生成的一个唯一的目录名的目录。
通过 rdd.checkpoint()即可 checkpoint 此 RDD

def checkpoint(): Unit = RDDCheckpointData.synchronized { 
    if (context.checkpointDir.isEmpty) {
      throw new SparkException("Checkpoint directory has not been set in the SparkContext")
    } else if (checkpointData.isEmpty) {
      checkpointData = Some(new ReliableRDDCheckpointData(this))
    }
  }

先判断是否设置了 checkpointDir,再判断 checkpointData.isEmpty 是否成立,checkpointData 的定义是这样的:

private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None

RDDCheckpointData 和 RDD 一一对应,保存着和 checkpoint 相关的信息。这里通过 new ReliableRDDCheckpointData(this)实例化了 checkpointData ,ReliableRDDCheckpointData 是其子类,这里相当于是 checkpoint 的一个标记,并没有真正执行 checkpoint。

什么时候 checkpoint

在有 action 动作时,会触发 sparkcontext 对 runJob 的调用:

def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

我们可以看到在执行完 job 后会执行 rdd.doCheckpoint(),这里就是对前面标记了的 RDD 的 checkpoint,我们继续看这个方法:

private[spark] def doCheckpoint(): Unit = {
    RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
      if (!doCheckpointCalled) {
        doCheckpointCalled = true
        if (checkpointData.isDefined) {
          if (checkpointAllMarkedAncestors) {
              dependencies.foreach(_.rdd.doCheckpoint())
          }
          checkpointData.get.checkpoint()
        } else {
      dependencies.foreach(_.rdd.doCheckpoint())
        }
      }
    }
  }

先判断是否已经被处理过 checkpoint,没有才执行,并将 doCheckpointCalled 设为 true,因为前面已经初始化过了 checkpointData,所以 checkpointData.isDefined 也满足,若想要把 checkpointData 定义过的 RDD 的 parents 也进行 checkpoint 的话,那么我们需要先对 parents checkpoint。因为,如果 RDD 把自己 checkpoint 了,那么它就将 lineage 中它的 parents 给切除了。继续跟进 checkpointData.get.checkpoint()

final def checkpoint(): Unit = {
    // Guard against multiple threads checkpointing the same RDD by
    // atomically flipping the state of this RDDCheckpointData
    RDDCheckpointData.synchronized {
      if (cpState == Initialized) {
        cpState = CheckpointingInProgress
      } else {
        return
      }
    }

    val newRDD = doCheckpoint()

    // Update our state and truncate the RDD lineage
    RDDCheckpointData.synchronized {
      cpRDD = Some(newRDD)
      cpState = Checkpointed
      rdd.markCheckpointed()
    }
  }

先将 checkpoint 的状态改为 CheckpointingInProgress,再执行 doCheckpoint,返回一个 newRDD,看 doCheckpoint 做了什么:

protected override def doCheckpoint(): CheckpointRDD[T] = {
    val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
    if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
      rdd.context.cleaner.foreach { cleaner =>
        cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
      }
    }
    logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
    newRDD
  }

ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir),将一个 RDD 写入到多个 checkpoint 文件,并返回一个 ReliableCheckpointRDD 来代表这个 RDD

def writeRDDToCheckpointDirectory[T: ClassTag](
      originalRDD: RDD[T],
      checkpointDir: String,
      blockSize: Int = -1): ReliableCheckpointRDD[T] = {
    val sc = originalRDD.sparkContext
    // Create the output path for the checkpoint
    val checkpointDirPath = new Path(checkpointDir)
    val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
    if (!fs.mkdirs(checkpointDirPath)) {
      throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
    }
    // Save to file, and reload it as an RDD
    val broadcastedConf = sc.broadcast(
      new SerializableConfiguration(sc.hadoopConfiguration))
    // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
    sc.runJob(originalRDD,
      writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
    if (originalRDD.partitioner.nonEmpty) {
      writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
    }
    val newRDD = new ReliableCheckpointRDD[T](
      sc, checkpointDirPath.toString, originalRDD.partitioner)
    if (newRDD.partitions.length != originalRDD.partitions.length) {
      throw new SparkException(
        s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +
          s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})")
    }
    newRDD
  }

获取一些配置信息广播输出等操作,然后启动一个 Job 去写 Checkpint 文件,主要由 ReliableCheckpointRDD.writeCheckpointFile 来实现写操作,写完 checkpoint 后 new 一个 ReliableCheckpointRDD 实例返回,看看具体的 writePartitionToCheckpointFile 实现:

def writePartitionToCheckpointFile[T: ClassTag](
      path: String,
      broadcastedConf: Broadcast[SerializableConfiguration],
      blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
    val env = SparkEnv.get
    val outputDir = new Path(path)
    val fs = outputDir.getFileSystem(broadcastedConf.value.value)

    val finalOutputName = ReliableCheckpointRDD.checkpointFileName(ctx.partitionId())
    val finalOutputPath = new Path(outputDir, finalOutputName)
    val tempOutputPath =
      new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")

    if (fs.exists(tempOutputPath)) {
      throw new IOException(s"Checkpoint failed: temporary path $tempOutputPath already exists")
    }
    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)

    val fileOutputStream = if (blockSize < 0) {
      fs.create(tempOutputPath, false, bufferSize)
    } else {
      // This is mainly for testing purpose
      fs.create(tempOutputPath, false, bufferSize,
        fs.getDefaultReplication(fs.getWorkingDirectory), blockSize)
    }
    val serializer = env.serializer.newInstance()
    val serializeStream = serializer.serializeStream(fileOutputStream)
    Utils.tryWithSafeFinally {
      serializeStream.writeAll(iterator)
    } {
      serializeStream.close()
    }

    if (!fs.rename(tempOutputPath, finalOutputPath)) {
      if (!fs.exists(finalOutputPath)) {
        logInfo(s"Deleting tempOutputPath $tempOutputPath")
        fs.delete(tempOutputPath, false)
        throw new IOException("Checkpoint failed: failed to save output of task: " +
          s"${ctx.attemptNumber()} and final output path does not exist: $finalOutputPath")
      } else {
        // Some other copy of this task must've finished before us and renamed it
        logInfo(s"Final output path $finalOutputPath already exists; not overwriting it")
        if (!fs.delete(tempOutputPath, false)) {
          logWarning(s"Error deleting ${tempOutputPath}")
        }
      }
    }
  }

这里的代码就是普通的对 HDFS 写文件的操作,将一个 RDD partition 的数据写到 checkpoint 目录下。

doCheckpoint()操作已经完成,返回了一个 new RDD:ReliableCheckpointRDD 引用给 cpRDD,接着标记 checkpoint 的状态为 Checkpointed,rdd.markCheckpointed()干了什么呢?

private[spark] def markCheckpointed(): Unit = {
    clearDependencies()
    partitions_ = null
    deps = null    // Forget the constructor argument for dependencies too
  }

最后再清除 RDD 的所有依赖。

写 checkpoint 总结

  • Initialized
  • marked for checkpointing
  • checkpointing in progress
  • checkpointed

什么时候读 checkpoint

在需要读取一个 partition 的数据时,会通过 rdd.iterator() 去计算该 rdd 的 partition 的,我们来看 RDD 的 iterator()实现:

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      getOrCompute(split, context)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

在 cache 中没有读到数据时再判断该 RDD 是否被 checkpoint 过,isCheckpointedAndMaterialized 就是在 checkpoint 成功时的一个状态标记:cpState = Checkpointed。

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
  {
    if (isCheckpointedAndMaterialized) {
      firstParent[T].iterator(split, context)
    } else {
      compute(split, context)
    }
  }

当该 RDD 被成功 checkpoint 了,直接使用 parent rdd 的 iterator() 也就是 CheckpointRDD.iterator(),否则直接调用该 RDD 的 compute 方法。

final def dependencies: Seq[Dependency[_]] = {
    checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
      if (dependencies_ == null) {
        dependencies_ = getDependencies
      }
      dependencies_
    }
  }

获取 RDD 的依赖时,会先尝试从 checkpointRDD 中获取依赖,若成功则返回被 OneToOneDependency 包装过的 ReliableCheckpointRDD 对象,否则获取真正的依赖。

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 552 关注
  • checkpoint
    3 引用
  • 代码
    466 引用 • 631 回帖 • 9 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...