MapReduce 简介:
Hadoop MapReduce 是一个很容易在平行大集群(数千个节点)以一个可靠的商品硬件容错的方式执行大数据应用程序的框架。
一个 MapReduce Job 通常会将输入数据集分成多个任务快,这些任务块由 Map 以完全并行的方式执行。MapReduce 框架对于 Map 的输出进行排序,并把结果输入给 Reduce 操作。通常输入输出结果都存储在 HDFS 文件系统中,MapReduce 负责任务的调度和监控,并负责重启失败的任务。
通常情况下,计算节点和存储结点是相同的,这也符合Yarn的设计目标“移动计算比移动数据好”,避免了数据传输的性能损耗。
即是 MapReduce 和 HDFS 运行在同一组节点上。
MapReduce 框架由唯一的一个 Master RecourseManager 和每个 Cluster Node 上的一个 Worker NodeManager 和每个应用上的 MRAppMaster
组成。
一个应用至少通过特定的接口或者抽象类提供 Map,Reduce 函数并指定输入输出的位置,以及 Job 的相关参数和配置。
Hadoop 的 Job client 提交 Job 之后,由 ResourceManager 提供程序分发,和分配 worker 的职责,并且进行任务调度和监控工作状态和诊断信息给 Job client。
虽然 Hadoop 框架是™️java 实现,MapReduce 应用程序不一定需要用 java 写。
Hadoop Streming 是一种实用工具,允许用户创建和运行任何可执行程序的工作(例如 shell 实用程序)的 Mapper 和 Reducer。
Hadoop 是一个 Pipes 兼容 C++ API 实现的 MapReduce 应用程序(非 JNI™️为主)
MapReduce INput 和 Output
MapReduce 框架只在 <K、V> 对上操作,也就是说,框架将作业的输入视为一组 <k、V> 对,并生成一组 <K、V> 对作为作业的输出,可以想象为不同的类型。K,V 的 class 必须被序列化的,因此需要实现 Writable 接口。此外,重点是 K 必须要实现 writablecomparable 接口方便的排序。
一个 MapReduce 作业的输入,输出类型:
(input)< K1,V1> -> map -> < K2,V> ->combine-> < K2,V2 > ->reduce-> < K3,V3 >(output)
由此可见,一个 MapReduce 程序有三个过程 map,combine,output.
Example:MapReduce 的 WordCount
通过 WordCount(统计单词出现的次数)这样一个示例先来了解 MapReduce 程序的执行过程和编写流程。下面是官方源代码:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//args[0]输入的文件路径
FileInputFormat.addInputPath(job, new Path(args[0]));
//args[1]输出的文件路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我们将这个程序保存为 wordcount.java,拥有了一个源文件如何编译运行它呢?
Compile WordCount.java 并且打成 Jar 包:
//使用javac编译成字节码文件(wordcount.class)
$ bin/hadoop com.sun.tools.javac.Main WordCount.java
//使用jar打包class为Jar.
$ jar cf wc.jar WordCount*.class
如此我们就得到了一个打包好的 Jar 文件,然后就可以运行它.
创建输入文件及内容
源代码中可以看到他需要从参数中输入两个路径,args[0]输入的文件路径,args[1]输出的结果文件路径,因此首先我们在需要在 HDFS 中创建两个文件,或者上传两个文件如:
- /user/joe/wordcount/input - input directory in HDFS
- /user/joe/wordcount/output - output directory in HDFS
文件中的内容可以自行填写,以及文件路径也可自行创建,如下:
//在/user/joe/wordcount/input/这个路径下放了两个文件file01,file02.
$ bin/hadoop fs -ls /user/joe/wordcount/input/
/user/joe/wordcount/input/file01
/user/joe/wordcount/input/file02
// cat查看file01
$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
//file01的内容如下
Hello World Bye World
// cat查看file02
$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
//file02的内容如下
Hello Hadoop Goodbye Hadoop
运行 wc.Jar
//使用Hadoop的jar命令运行wc.jar
//第一个参数:WordCount是程序的类名称,
//第二个参数是:/user/joe/wordcount/input输入的文件目录路径,
//第三个参数:/user/joe/wordcount/output输出结果存放的目录路径,因为存放的都是HDFS文件,会自己生成文件名称。
$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output
去对应目录查看输出结果:
//查看output目录下是否生成了文件(应该是生成了三个,一个是结果)
$ bin/hadoop fs -ls /user/joe/wordcount/output/
//cat查看结果的信息如下
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于