一、MapReduce 任务过程
一个 MR 程序执行要经历以下五个步骤:input
=>Map
=>Shuffle
=>Reduce
=>output
其中 map 和 reduce 需要我们根据业务逻辑编写代码,在 Map 和 reduce 之间存在一种自动执行的操作 Shuffle
。同样的 Shuffle 也可以划分为 Map端的Shuffle
和 Reduce端的Shuffle
,Map 操作之后的的数据如何转换成 Reduce 的输入,这个过程和操作由 Shuffle
决定。
二、Shuffle 流程图
三、Shuffle 操作
Map 端的 Shuffle:
1、Map 处理完数据之后将数据写入到内存缓冲区(buffer in memory),这个缓冲区被称为 环形缓冲区
。环形缓冲区大小为 100M(默认,用户可自行调整).
2、当环形缓冲区的内存占用 >80%
的时候就会进行 溢写磁盘操作(spill to disk)
,即将数据写入到磁盘。
3、为什么要将数据先在 Memory 存贮?因为在写入磁盘之前要对数据进行 分区(Partition)
和 排序(Sort)
,分区的目的是将数据进行划分,以便 Reduce 可以拉取整个 partition 的数据。然后会对每个分区的数据进行排序。【内存中排序速度较快】
4、当分区和排序操作完成之后才是真正的溢写操作,在磁盘生成溢写文件。
5、多次的溢写文件会进行合并成大的文件。(合并之后会进行排序)
直到所有的 Map 端数据输出完成,此时,Map 端的 Shuffle 过程就完成了。
Reduce 端的 Shuffle:
1、去磁盘中拉取数据到内存,[从多个节点的磁盘中拉取相同的分区的数据]
进行合并。
2、将数据进行分组 Group,把相同 key 的 vlaue 存储在一起(<K,list(v1,v,2,v3...)>)
四、自定义 Shuffle 过程:
如果我们需要修改分区,排序的结果,胡总和对这个过程进行优化,可以通过自定义 Shuffle 的各个操作类,如下:
//分区:partition
job.setPartitionerClass(myParitioner.class)//myParitioner是自定义的一个分区算法的类
//排序:sort
job.setSortComParatorClass(mySortComParator.class)//mySortComParator是自定义的一个排序算法算法的类
//分组:group
job.setGroupingComParatorClass(myGroupingComParator.class)//myGroupingComParator是自定义的一个分组算法算法的类
另外一个可选项是:combiner
在 wordCount 中设置了这一项
//本地聚合 job.setCombinerClass(IntSumReducer.class);
他可以利用 reduce 的操作,在每个分区上执行 reduce 的操作,可以减少磁盘 I/O,作为一种优化策略,但是并不是大多数时候都能用。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于