Apache Hadoop 核心模块 MapReduce

本贴最后更新于 2702 天前,其中的信息可能已经事过境迁

MapReduce 简介:

Hadoop MapReduce 是一个很容易在平行大集群(数千个节点)以一个可靠的商品硬件容错的方式执行大数据应用程序的框架。
一个 MapReduce Job 通常会将输入数据集分成多个任务快,这些任务块由 Map 以完全并行的方式执行。MapReduce 框架对于 Map 的输出进行排序,并把结果输入给 Reduce 操作。通常输入输出结果都存储在 HDFS 文件系统中,MapReduce 负责任务的调度和监控,并负责重启失败的任务。
通常情况下,计算节点和存储结点是相同的,这也符合Yarn的设计目标“移动计算比移动数据好”,避免了数据传输的性能损耗。 即是 MapReduce 和 HDFS 运行在同一组节点上。
MapReduce 框架由唯一的一个 Master RecourseManager 和每个 Cluster Node 上的一个 Worker NodeManager 和每个应用上的 MRAppMaster 组成。
一个应用至少通过特定的接口或者抽象类提供 Map,Reduce 函数并指定输入输出的位置,以及 Job 的相关参数和配置。
Hadoop 的 Job client 提交 Job 之后,由 ResourceManager 提供程序分发,和分配 worker 的职责,并且进行任务调度和监控工作状态和诊断信息给 Job client。
虽然 Hadoop 框架是™️java 实现,MapReduce 应用程序不一定需要用 java 写。
Hadoop Streming 是一种实用工具,允许用户创建和运行任何可执行程序的工作(例如 shell 实用程序)的 Mapper 和 Reducer。
Hadoop 是一个 Pipes 兼容 C++ API 实现的 MapReduce 应用程序(非 JNI™️为主)

MapReduce INput 和 Output

MapReduce 框架只在 <K、V> 对上操作,也就是说,框架将作业的输入视为一组 <k、V> 对,并生成一组 <K、V> 对作为作业的输出,可以想象为不同的类型。K,V 的 class 必须被序列化的,因此需要实现 Writable 接口。此外,重点是 K 必须要实现 writablecomparable 接口方便的排序。
一个 MapReduce 作业的输入,输出类型:
(input)< K1,V1> -> map -> < K2,V> ->combine-> < K2,V2 > ->reduce-> < K3,V3 >(output)
由此可见,一个 MapReduce 程序有三个过程 map,combine,output.

Example:MapReduce 的 WordCount

通过 WordCount(统计单词出现的次数)这样一个示例先来了解 MapReduce 程序的执行过程和编写流程。下面是官方源代码:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
	//args[0]输入的文件路径
    FileInputFormat.addInputPath(job, new Path(args[0]));
	//args[1]输出的文件路径
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

我们将这个程序保存为 wordcount.java,拥有了一个源文件如何编译运行它呢?

Compile WordCount.java 并且打成 Jar 包:

//使用javac编译成字节码文件(wordcount.class)
$ bin/hadoop com.sun.tools.javac.Main WordCount.java
//使用jar打包class为Jar.
$ jar cf wc.jar WordCount*.class

如此我们就得到了一个打包好的 Jar 文件,然后就可以运行它.

创建输入文件及内容

源代码中可以看到他需要从参数中输入两个路径,args[0]输入的文件路径,args[1]输出的结果文件路径,因此首先我们在需要在 HDFS 中创建两个文件,或者上传两个文件如:

  • /user/joe/wordcount/input - input directory in HDFS
  • /user/joe/wordcount/output - output directory in HDFS
    文件中的内容可以自行填写,以及文件路径也可自行创建,如下:
//在/user/joe/wordcount/input/这个路径下放了两个文件file01,file02.
$ bin/hadoop fs -ls /user/joe/wordcount/input/
/user/joe/wordcount/input/file01
/user/joe/wordcount/input/file02

// cat查看file01
$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
//file01的内容如下
Hello World Bye World

// cat查看file02
$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
//file02的内容如下
Hello Hadoop Goodbye Hadoop

运行 wc.Jar

//使用Hadoop的jar命令运行wc.jar 
//第一个参数:WordCount是程序的类名称,
//第二个参数是:/user/joe/wordcount/input输入的文件目录路径,
//第三个参数:/user/joe/wordcount/output输出结果存放的目录路径,因为存放的都是HDFS文件,会自己生成文件名称。
$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output

去对应目录查看输出结果:

//查看output目录下是否生成了文件(应该是生成了三个,一个是结果)
$ bin/hadoop fs -ls /user/joe/wordcount/output/
//cat查看结果的信息如下
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
  • MapReduce
    8 引用 • 1 回帖
  • Hadoop

    Hadoop 是由 Apache 基金会所开发的一个分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。

    86 引用 • 122 回帖 • 626 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
rzx
此生最怕深情被辜负,最怕兄弟成陌路。对世界充满善意,同时又充满深深的恨意,我渴望天降甘霖福泽众生,又渴望灭世洪水重创世纪。 广州