Spark Shuffle 过程及原理剖析

本贴最后更新于 2594 天前,其中的信息可能已经天翻地覆

Spark Shuffle 是什么?

Spark 中的某些操作会触发一个称之为"Shuffle"的事件,Shuffle 是 Spark 用来重新分配数据,使其用来分组到不同的分区的一种机制。这种机制通常涉及到 Executor机器 之间复制数据,因此 Shuffle 是一种复杂而昂贵的操作。

想要了解 Shuffle 操作发生了什么,我们可以通过 reduceByKey 这个算子实例来看。reduceBykey 操作产生一个全新的 RDD,这个 RDD 包含单个 Key 的所有 Value 的和。但是单个 key 的 Value 不一定能够在同一个 Paratition,甚至不在同一个机器上。但是他们必须拉去倒相同的位置才能计算结果。

在 Spark 中数据通常不会分布在同一个 Paritation,一个 Task 会操作一个 Paritation 数据,但是一个 reduceBeKey 必须组织所有的数据到几个不同的分区中读取所有 Key 和所有 Value,然后将这些不同分区的数据组合起来进行计算每个 Key 的 Value,这个过程称之为 Shuffle。--官方文档

触发 Shuffle 的操作算子有哪些?

可以触发 Shuffle 操作的算子:
repartition算子 如:repartition,repartitionAndSortWithinPartitions 和 coalesce;`

ByKey算子(除了 countByKey)如:groupByKey,reduceByKey,sortBeyKey,

join算子 如:cogroup,join.

Spark Shuffle 过程

Spark Shuffle 过程主要分为两个部分:Shuffle WriteShuffle Read

一、Shuffle Write:

我们 Shuffle 的前半部分输出叫做 Shuffle Write,前半部分的 Map 过程会将数据结果写到内存中,以供 Reduce 操作拉取数据,这个写数据的过程就是 Shuffle Write。Shuffle Write 有两种算法(hash-basedsort-based
⬜️ hash-based Shuffle:

22fe0d9b182f40a9b6eca0fa288bbcd5-image.png
假设 Executor 有两个 Core,每次调度两个 Task,每一个 Task 任务操作一个 Partition 数据,执行完都会产生 R(R 表示 rReduceTask 的数量)个文件写入到 Buffer 中,这个 Buffer 在内存中缓冲数据直到操作完成之后开始写入到本地磁盘,最初的 Buffer 并未设置限制,因此当数据量过大超过 Buffer 承受的极限的时候,就会出现 OOM(现在的版本已经对 Buffer 进行了控制)。M 个 Task 就会产生 M*R 个小文件。当这两个 Task 执行完毕之后还有其他 Task 就会继续被调度过来执行。最终生成M*R个文件。ReduceTask 执行的时候会从每个 parition 中拉取一个文件,最后进行聚合操作。

海量的小文件带来的是大量的耗时的磁盘 I/O。同时内存需要保存这些文件的元信息(大小,路径。。。)也会消耗掉相当大的内存。

⬜️ 优化后 hash-based Shuffle:
38db3847da7543e9b5b606211a9dcf33-image.png
多个 Task 产生文件追加在上一个 Task 产生的文件的末尾,这样不管有多少个 mapTask,只会产生 R(ReduceTask 的数量)个文件。
这个优化后的 HashShuffle 叫 ConsolidatedShuffle,在实际生产环境下可以调以下参数:spark.shuffle.consolidateFiles=true

虽然优化之后解决了 MapTask 过多造成的文件数量太多,但是这并不能解决 ReduceTask 数量过多造成的影响,当任务的并发度特别高的时候 ReduceTask 数量就会很大最终导致文件数量同样过多。这时候就出现了下面的 Sort-based Shuffle

⬜️ sort-based Shuffle:
fc1c935f2a9d43ccbc232dc3d96a97be-image.png
这里的每一个 mapTask 只会产生一个文件(解决了 ReduceTask 导致文件过多的问题,文件数只跟 maptask 相关)这些数据是有序的,同时为这个文件建立一个索引,他会指定相同 key 的数据的索引位置,最终每个 Reducetask 会从每个文件中读取所需要的那一片数据(即它所要聚集的 key 的数据)。

二、Shuffle Read 和 Aggregator:

不管是 Hash-based Shuffle 还是 Sort-based shuffle 他们的 shuffle Read 实现都是相同的。Shuffle Read 通过网络从每个 mapTask 产生的数据中拉取需要的部分进入自己的 Buffer 内存。在拉取的过程中会进行归并排序,所以每个 Pasrition 的数据是有序的。获取到数据之后会进行数据的聚集 Aggregator 操作。
72f9d215cdfc405c87ed1718de39d6d9-image.png
这里是以 wordcount 的 reducebykey 操作为例,通过内存的数据结构 HaashMap 实现相同 key 的映射,做函数累加操作。
f1c9aff81b754ac39e110f7f49bd416c-image.png
当这个 HashMap 存储满了之后会全部写入到磁盘,清空 HashMap,然后继续映射操作。
f2ed22b3bb6b4ac0ab66325c91a5b72c-image.png
映射完成之后将所有的数据进行归并排序,在进行全局的聚合操作。最终得到我们需要的 RDD,我们可以进行数据的输出,存储等操作。

参考文档:

小象学院《spark2.1》
彻底解密 Spark 的 HashShuffle

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 560 关注
  • Shuffle
    2 引用

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
rzx
此生最怕深情被辜负,最怕兄弟成陌路。对世界充满善意,同时又充满深深的恨意,我渴望天降甘霖福泽众生,又渴望灭世洪水重创世纪。 广州