Spark Shuffle 是什么?
Spark 中的某些操作会触发一个称之为"Shuffle
"的事件,Shuffle 是 Spark 用来重新分配数据,使其用来分组到不同的分区的一种机制。这种机制通常涉及到 Executor
和 机器
之间复制数据,因此 Shuffle
是一种复杂而昂贵的操作。
想要了解 Shuffle
操作发生了什么,我们可以通过 reduceByKey 这个算子实例来看。reduceBykey 操作产生一个全新的 RDD,这个 RDD 包含单个 Key 的所有 Value 的和。但是单个 key 的 Value 不一定能够在同一个 Paratition,甚至不在同一个机器上。但是他们必须拉去倒相同的位置才能计算结果。
在 Spark 中数据通常不会分布在同一个 Paritation,一个 Task 会操作一个 Paritation 数据,但是一个 reduceBeKey 必须组织所有的数据到几个不同的分区中读取所有 Key 和所有 Value,然后将这些不同分区的数据组合起来进行计算每个 Key 的 Value,这个过程称之为 Shuffle
。--官方文档
触发 Shuffle 的操作算子有哪些?
可以触发 Shuffle 操作的算子:
repartition算子
如:repartition,repartitionAndSortWithinPartitions 和 coalesce;`
ByKey算子
(除了 countByKey)如:groupByKey,reduceByKey,sortBeyKey,
join算子
如:cogroup,join.
Spark Shuffle 过程
Spark Shuffle 过程主要分为两个部分:Shuffle Write
和 Shuffle Read
一、Shuffle Write:
我们 Shuffle 的前半部分输出叫做 Shuffle Write,前半部分的 Map 过程会将数据结果写到内存中,以供 Reduce 操作拉取数据,这个写数据的过程就是 Shuffle Write。Shuffle Write 有两种算法(hash-based
和 sort-based
)
⬜️ hash-based Shuffle:
假设 Executor 有两个 Core,每次调度两个 Task,每一个 Task 任务操作一个 Partition 数据,执行完都会产生 R(R 表示 rReduceTask 的数量)个文件写入到 Buffer 中,这个 Buffer 在内存中缓冲数据直到操作完成之后开始写入到本地磁盘,最初的 Buffer 并未设置限制,因此当数据量过大超过 Buffer 承受的极限的时候,就会出现 OOM(现在的版本已经对 Buffer 进行了控制)。M 个 Task 就会产生 M*R 个小文件。当这两个 Task 执行完毕之后还有其他 Task 就会继续被调度过来执行。最终生成M*R个文件
。ReduceTask 执行的时候会从每个 parition 中拉取一个文件,最后进行聚合操作。
海量的小文件带来的是大量的耗时的磁盘 I/O。同时内存需要保存这些文件的元信息(大小,路径。。。)也会消耗掉相当大的内存。
⬜️ 优化后 hash-based Shuffle:
多个 Task 产生文件追加在上一个 Task 产生的文件的末尾,这样不管有多少个 mapTask,只会产生 R(ReduceTask 的数量)个文件。
这个优化后的 HashShuffle 叫 ConsolidatedShuffle,在实际生产环境下可以调以下参数:spark.shuffle.consolidateFiles=true
虽然优化之后解决了 MapTask 过多造成的文件数量太多,但是这并不能解决 ReduceTask 数量过多造成的影响,当任务的并发度特别高的时候 ReduceTask 数量就会很大最终导致文件数量同样过多。这时候就出现了下面的 Sort-based Shuffle
⬜️ sort-based Shuffle:
这里的每一个 mapTask 只会产生一个文件(解决了 ReduceTask 导致文件过多的问题,文件数只跟 maptask 相关)这些数据是有序的,同时为这个文件建立一个索引,他会指定相同 key 的数据的索引位置,最终每个 Reducetask 会从每个文件中读取所需要的那一片数据(即它所要聚集的 key 的数据)。
二、Shuffle Read 和 Aggregator:
不管是 Hash-based Shuffle 还是 Sort-based shuffle 他们的 shuffle Read 实现都是相同的。Shuffle Read 通过网络从每个 mapTask 产生的数据中拉取需要的部分进入自己的 Buffer 内存。在拉取的过程中会进行归并排序,所以每个 Pasrition 的数据是有序的。获取到数据之后会进行数据的聚集 Aggregator 操作。
这里是以 wordcount 的 reducebykey 操作为例,通过内存的数据结构 HaashMap 实现相同 key 的映射,做函数累加操作。
当这个 HashMap 存储满了之后会全部写入到磁盘,清空 HashMap,然后继续映射操作。
映射完成之后将所有的数据进行归并排序,在进行全局的聚合操作。最终得到我们需要的 RDD,我们可以进行数据的输出,存储等操作。
参考文档:
小象学院《spark2.1》
彻底解密 Spark 的 HashShuffle
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于