Hadoop 总结 (第一版—MapReduce 篇)

本贴最后更新于 2617 天前,其中的信息可能已经时移世易

mapreduce在hadoop中更多的承担的是计算的角色

什么是 MapReduce?

mapreduce 源于谷歌公司为研究大规模数据处理而研发出的一种并行计算模型和方法。
它将并行编程中的难点和有相对门槛的方法进行高度封装,而给开发者一套接口,让开发者理专注于自己的业务逻辑,即可让自己的代码运行在分布式集群中,大大降低了开发一些并发程序的门槛。从字面上就可以知道,MapReduce 分为 Map(映射)、Reduce(规约)

MapReduce 的核心思想

MapReduce 主要是两种经典函数:

  • 映射(Mapping)将一个整体按某种规则映射成 N 份。并对这 N 份进行同一种操作。
  • 规约(Reducing)将 N 份文件,按某种策略进行合并。

MapReduce 的角色与动作

  MapReduce 包含四个组成部分,分别为 ClientJobTrackerTaskTrackerTask,下面我们详细介绍这四个组成部分。

  • Client:作业提交的发起者, 每一个 Job 都会在用户端通过 Client 类将应用程序以及配置参数 Configuration 打包成 JAR 文件存储在 HDFS,并把路径提交到 JobTracker 的 master 服务,然后由 master 创建每一个 Task(即 MapTask 和 ReduceTask) 将它们分发到各个 TaskTracker 服务中去执行。

  • JobTracker:初始化作业,分配作业,与 TaskTracker 通信,协调整个作业 JobTracke 负责资源监控和作业调度。JobTracker 监控所有 TaskTracker 与 job 的健康状况,一旦发现失败,就将相应的任务转移到其他节点;同时,JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。在 Hadoop 中,任务调度器是一个可插拔的模块,用户可以根据自己的需要设计相应的调度器。

  • TaskTracker:TaskTracker 会周期性地通过 Heartbeat 将本节点上资源的使用情况和任务的运行进度汇报给 JobTracker,同时接收 JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)。TaskTracker 使用“slot”等量划分本节点上的资源量。“slot”代表计算资源(CPU、内存等)。一个 Task 获取到一个 slot 后才有机会运行,而 Hadoop 调度器的作用就是将各个 TaskTracker 上的空闲 slot 分配给 Task 使用。slot 分为 Map slot 和 Reduce slot 两种,分别供 Map Task 和 Reduce Task 使用。TaskTracker 通过 slot 数目(可配置参数)限定 Task 的并发度。

  • Task : Task 分为 Map Task 和 Reduce Task 两种,均由 TaskTracker 启动。HDFS 以固定大小的 block 为基本单位存储数据,而对于 MapReduce 而言,其处理单位是 split。

  • Map Task 执行过程如下图 所示:由该图可知,Map Task 先将对应的 split 迭代解析成一个个 key/value 对,依次调用用户 自定义的 map() 函数进行处理,最终将临时结果存放到本地磁盘上, 其中临时数据被分成若干个 partition,每个 partition 将被一个 Reduce Task 处理。
    09d902f773ff4e30b1cf995705d4f03c-image.png

    Reduce Task 执行过程下图所示。该过程分为三个阶段
    2f9ff84d09cf424cb8a1c4d24db637c0-image.png

提交作业

  • 在作业提交之前,需要对作业进行配置
  • 程序代码,主要是自己书写的 MapReduce 程序
  • 输入输出路径
  • 其他配置,如输出压缩等
  • 配置完成后,通过 JobClient 来提交

作业的初始化

  • 客户端提交完成后,JobTracker 会将作业加入队列,然后进行调度,默认的调度方法是 FIFO 调试方式
    任务的分配
  • TaskTracker 和 JobTracker 之间的通信与任务的分配是通过心跳机制完成的。
  • TaskTracker 会主动向 JobTracker 询问是否有作业要做,如果自己能做,那么就会申请作业任务,这个任务可以使 Map,也可能是 Reduce 任务
    任务的执行
  • 申请到任务后,TaskTracker 会做如下事情:
  • 拷贝代码到本地
  • 拷贝任务的信息到本地
  • 启动 Jvm 运行任务
    状态与任务的更新
  • 任务在运行过程中,首先会将自己的状态汇报给 TaskTracker,然后由 TaskTracker 汇总报给 JobTracker
  • 任务进度是通过计数器来实现的。

作业的完成

  • JobTracker 是在接受到最后一个任务运行完成后,才会将任务标志为成功
  • 此时会做删除中间结果等善后处理工作
    8fba570574f6491c96b51698743466b2-image.png

MapReduce 任务执行流程与数据处理流程详解

就任务流程而言,上文中有些或已经涉及,有些也介绍的比较详细了。但就整体而言,不是特别具体,或这个整体性,所以在此我们再集中总结一下,即使重复内容,就也当加深印象吧。

任务执行流程详解

  1. 通过 jobClient 提交当 mapreduce 的 job 提交至 JobTracker
    ,提交的具体信息大致会有,conf 配置内容,path,相关的 Map,Reduce 函数等。(数据的切片会在 client 上完成)
  2. JobTracker 中的 Master 服务会完成创建一个 Task(MapTask 与 ReduceTask),并将这个任务加载至任务队列中,等待 TaskTracker 来获取。同进 JobTracker 会处于一种监听状态,监听所有 TaskTracker 与 Job 的健康情况。面对故障时,提供服务转移等。同时它还会记录任务的执行进度,资源的使用情况,提交至任务调度器,由调度器进行资源的调度。
  3. TaskTracker 会主动向 jobTacker 去询问是否有任务,如果有,就会申请到作业,作业可以是 map,也可以是 reduce 任务。TaskTracker 获取的不仅是任务,还有相关的处理代码也会 copy 一份至本地,然后 TaskTracker 再针对所分配的 split 进行处理。TaskTracker 还会周期性地通过 HearBeat 将本节点上的资源的使用情况和任务的进行进度汇报给 JobTracker,同时接收 JobTracker 发过来的相关命令并执行,
  4. 当 TaskTracker 中的任务完成后会上报给 JobTracker,而当最后一个 TaskTracker 完成后,JobTracker 才会将任务标志为成功,并执行一些如删除中间结果等善后工作。

从这里我们知道了mapreduce中任务执行流程,但mapreduce作为hadoop的计算框架,它对数据的处理流程我们还没明确涉及,所以接下来我们再深入了解下具体的数据处理流程

数据处理流程详解

这里的数据处理流程,主要指的是 mapreduce 执行后,Task 中对 split 的任务具体处理的这一流程,其中还包括,数据的切片,mapTask 完成后将中间结果上传等动作。下面我们来具体讨论
当 jobClient 向 JobTracker 提交了任务后,数据处理流程也随之开始

  • 在 Client 端将输入源的数据进行切片(split),具体的切片机制参考后面
  • JobTracker 中的 MRAppMaster 将每个 Split 信息计算出需要的 MapTask 的实例数量,然后向集群申请机器启动相应数量的 mapTask 进程
  • 进程启动后,根据给定的数据切片范围进行数据处理,主要流程为
    a)通过 inputFormat 来获取 RecordReader 读取数据,并形成输入的 KV 对
    b)将输入 KV 对传递给用户定义的 map()方法,做逻辑处理,并 map()方法的输出的 kv 对手机到缓存中(这里用到了缓存机制)
    简而言之 map 中输入时要做的事是
    1.反射构造 InputFormat.
    2.反射构造 InputSplit.
    3.创建 RecordReader.
    4.反射创建 MapperRunner

3a461434b0604118aad45a4d38cf3649-image.png

而 map 输出时相对复杂,主要涉及到的有 Partitioner,shuffle,sort,combiner 等概念,我们就来一一讨论。
在 map()方法执行后,map 阶段是会有处理的数据输出,正常来说,就是每个 split 对应的每一行。如上图 MapRunner 的 next 为 false 时,对输入数据的 map 完成,这时对存内中这些 map 的数据,会对其进行 sort,如果我们事先设置的有 combiner 那么,还会对 sort 完的数据执行 combiner(一个类 reduce 操作,不过是对本地数据的 reduce,使用它的原则是 combiner 的输入不会影响到 reduce 计算的最终输入,例如:如果计算只是求总数,最大值,最小值可以使用 combiner,但是做平均值计算使用 combiner 的话,最终的 reduce 计算结果就会出错。),然后开始 map 的内容 spill 到磁盘中,如果频繁的 spill 对磁盘会带来较大的损耗和效率影响。所以引入了一个写缓冲区的概念,即每一个 Map Task 都拥有一个“环形缓冲区”作为 Mapper 输出的写缓冲区。写缓冲区大小默认为 100MB(通过属性 io.sort.mb 调整),当写缓冲区的数据量达到一定的容量限额时(默认为 80%,通过属性 io.sort.spill.percent 调整),后台线程开始将写缓冲区的数据溢写到本地磁盘。在数据溢写的过程中,只要写缓冲区没有被写满,Mappper 依然可以将数据写出到缓冲区中;否则 Mapper 的执行过程将被阻塞,直到溢写结束。
溢写以循环的方式进行(即写缓冲区的数据量大致限额时就会执行溢写),可以通过属性 mapred.local.dir 指定写出的目录。
spill 结束前 溢写线程将数据最终写出到本地磁盘之前,首先根据 Reducer 的数目对这部分数据进行分区(即每一个分区中的数据会被传送至同一个 Reducer 进行处理,分区数目与 Reducer 数据保持一致)即 partition,partition 是一个类 inputSplit()的操作,即根据有多少个 ReduceTask 生成多少个 partition,并通过 jobTracker 指定给相应的 ReduceTask。
当 spill 完成后,本地磁盘中会有多个溢出文件存在。在 MapTask 结束前,这些文件会根据相应的分区进行合并,并排序,合并可能发生多次,具体由 io.sort.factor 控制一次最多合并多少个文件。

如果溢写文件个数超过 3(通过属性 min.num.spills.for.combine 设置),会对合并且分区排序后的结果执行 Combine 过程(如果 MapReduce 有设置 Combiner),而且 combine 过程在不影响最终结果的前提下可能会被执行多次;否则不会执行 Combine 过程(相对而言,Combine 开销过大)。

注意:Map Task 执行过程中,Combine 可能出现在两个地方:写缓冲区溢写过程中、溢写文件合并过程中。

注意:Mapper 的一条输出结果(由 key、value 表示)写出到写缓冲区之前,已经提前计算好相应的分区信息,即分区的过程在数据写入写缓冲区之前就已经完成,溢写过程实际是写缓冲区数据排序的过程(先按分区排序,如果分区相同时,再按键值排序)。

这里涉及到 MapReduce 的两个组件:Comparator、Partitioner。
(由于篇幅的原因,这里暂不引入对这两个组件的源码分析,和自定义方式,后面有机会则单开文章讨论)

在将 map 输出结果作为 reducTask 中的输入时,会涉及到磁盘写入,网络传输等资源的限制,所以对出于节省资源的考虑,可以在对 map 的输出结果进行压缩。默认情况下,压缩是不被开启的,可以通过属性 mapred.compress.map.outputmapred.map.output.compression.codec 进行相应设置。

当 MapTask 任务结束后,被指定的分区 ReduceTask 会立即开始执行,即开始拷贝对应 MapTask 分区中的输出结果。
Reduce Task 的这个阶段被称为“Copy Phase”。Reduce Task 拥有少量的线程用于并行地获取 Map Tasks 的输出结果,默认线程数为 5,可以通过属性 mapred.reduce.parallel.copies 进行设置。
同样:如果 Map Task 的输出结果足够小,它会被拷贝至 Reduce Task 的缓冲区中;否则拷贝至磁盘。当缓冲区中的数据达到一定量(由属性 mapred.job.shuffle.merge.percent、mapred.inmem.merge.threshold),这些数据将被合并且溢写到磁盘。如果 Combine 过程被指定,它将在合并过程被执行,用来减少需要写出到磁盘的数据量。

随着拷贝文件中磁盘上的不断积累,一个后台线程会将它们合并为更大地、有序的文件,用来节省后期的合并时间。如果 Map Tasks 的输出结合使用了压缩机制,则在合并的过程中需要对数据进行解压处理。

当 Reduce Task 的所有 Map Tasks 输出结果均完成拷贝,Reduce Task 进入“Sort Phase”(更为合适地应该被称为“Merge Phase”,排序在 Map 阶段已经被执行),该阶段在保持原有顺序的情况下进行合并。这种合并是以循环方式进行的,循环次数与合并因子(io.sort.factor)有关。
sort phase 通常不是合并成一个文件,而是略过磁盘操作,直接将数据合并输入至 Reduce 方法中 这次合并的数据可以结合内存、磁盘两部分进行操作),即“Reduce Phase”。

通常这里还有一个“Group”的阶段,这个阶段决定着哪些键值对属于同一个键。如果没有特殊设置,只有在 Map Task 输出时那些键完全一样的数据属于同一个键,但这是可以被改变的。

描述至这里终于能引用 mapreduce 中相当重要的一个概念,即 shuffle。这个词在这里该怎么定义,我暂未找到个一个比较满意的答案,但我比较喜欢有人把这个比作是搓完牌一桌子人,在下一局开始前的整个过程。
即 Shuffle 操作,涉及到数据的 partition、sort、[combine]、spill、[comress]、[merge]、copy、[combine]、merge、group,而这些操作不但决定着程序逻辑的正确性,也决定着 MapReduce 的运行效率。

shuffle 完后进入了 ReduceTask 的 reduce()方法中
在 Reduce Phase 的过程中,它处理的是所有 Map Tasks 输出结果中某一个分区中的所有数据,这些数据整体表现为一个根据键有序的输入,对于每一个键都会相应地调用一次 Reduce Function(同一个键对应的值可能有多个,这些值将作为 Reduce Function 的参数)

至此 MapReduce 的逻辑过程基本描述完成,虽然洋洋洒洒可能会有数千字,但本文的出发点就不是简析,而更多是自我概念原理部份的总结,所以力求整个流程完整详细。后面我配上一些网络图片,方便大家快速理解,结合文字加深印象。
b26c54ff10ad4e2b9832a960ef4aab90-image.png

a179f15b88cc403b8bd84d7963823762-image.png

4a5b5498dd764087ade380db394e6f84-image.png

74ed215c305747a49348430782e5636a-image.png

b55be177bd514ce79b7444c2dd3ddcfb-image.png

286bb24da172471793924b2b9b7c857c-image.png
68bbc5f114f3496d886402cbb0da8fc1-image.png

38d8bf9498274367b22856f62a8f7fcf-image.png

7b7e5da815064fdfa0d060910f8dfb9b-image.png

a4ac8a6f758b4bdfb5e1a752a02f0654-image.png

9e91f8a6b0ab488abbad0714487fe10f-image.png

87a4d55fff15408c87f94029b8fb2aea-image.png

eb099e53d6fa44a5b76410040f41f3ae-image.png

98aac8b77be64d53b276f7a112aba12d-image.png

  • Hadoop

    Hadoop 是由 Apache 基金会所开发的一个分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。

    86 引用 • 122 回帖 • 627 关注
  • MapReduce
    8 引用 • 1 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...