在Hadoop中处理输入的CSV文件

本贴最后更新于 3423 天前,其中的信息可能已经东海扬尘

在Hadoop中,InputFormat类用来生成可供Mapper处理的<key, value>键值对。当数据传送给Mapper时,Mapper会将输入分片传送到InputFormat上,InputFormat调用getRecordReader()方法生成RecordReader,RecordReader再创建可供map函数处理的键值对<K1, V1>。

Hadoop预定义了多种方法将不同类型的输入数据转化为map能够处理的键值对。比如,TextInputFormat,Hadoop中默认的输入方法,会将每行数据生成一条记录,其中key值为每条记录在分片中的字节偏移量,value则为每行的内容。

在Hadoop预定义的InputFormat中,并没有处理CSV文件的方法。CSV文件的本质其实是用逗号分隔开的文本文件。一种很直观的处理方法是:将CSV文件作为文本文件处理,使用TextInputFormat将文件按行传入map函数,在map函数中再按照CSV文件的格式进行处理。但这样很容易将数据格式的处理逻辑与业务处理逻辑混淆在一起,并且出现很多copy-and-pasted的代码。

实际上,可以写一个自己的InputFormat以及RecordReader类,专门用来处理CSV文件的输入,直接传递给map函数解析后的数据。

1 数据结构

我们传递给map函数一个ArrayWritable(A Writable for arrays containing instances of a class),元素类型为Text,即CSV文件每一行各个字段的数据。数据结构如下:

代码1:TextArrayWritable.java

public class TextArrayWritable extends ArrayWritable {
    public TextArrayWritable() {
        super(Text.class);
    }
public TextArrayWritable(Text[] strings) {
    super(Text.class, strings);
}

}

2 CSVInputFormat

FileInputFormat是所有使用文件作为其数据源的InputFormat实现的基类。它提供了两个功能:一是定义哪些文件包含在一个作业的输入中,另一个是为输入文件生成分片(Input Splits)。而把分片分割成记录的事情交由其子类来完成。所以CSVInputFormat类的实现上,同样是继承InputFormat类,并只需要简单的重写createRecordReader和isSplitable即可。

代码2:CSVInputFormat.java

public class CSVInputFormat extends FileInputFormat<LongWritable, TextArrayWritable> {
    public static final String CSV_TOKEN_SEPARATOR_CONFIG = "csvinputformat.token.delimiter";
@Override
protected boolean isSplitable(JobContext context, Path filename) {
    CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename);

    return codec == null;
}

@Override
public RecordReader&lt;LongWritable, TextArrayWritable&gt; createRecordReader(
    InputSplit split, TaskAttemptContext context)
    throws IOException, InterruptedException {
    String csvDelimiter = context.getConfiguration()
                                 .get(CSV_TOKEN_SEPARATOR_CONFIG);
    Character separator = null;

    if ((csvDelimiter != null) &amp;&amp; (csvDelimiter.length() == 1)) {
        separator = csvDelimiter.charAt(0);
    }

    return new CSVRecordReader(separator);
}

}

其中csvinputformat.token.delimiter是可在配置文件中配置的CSV输入文件分隔符,createRecordReader完成的工作只是从配置文件中得到分隔符,调用真正对CSV文件分片进行处理,并生成键值对的CSVRecordReader函数,并返回RecordReader对象。

3 CSVRecordReader

对于CSVRecordReader,要实现的功能无非就是将CSV文件中每一行的各字段提取出来,并将各字段作为TextArrayWritable类型的数据结构传递给map函数。

在Hadoop中有一个LineRecordReader类,它将文本文件每一行的内容作为值返回,类型为Text。所以可以直接在CSVRecordReader中使用LineRecordReader,将LineRecordReader返回的每一行再次进行处理。在CSV文件的处理上,这里用到了OpenCSV对CSV文件的每一行进行解析,具体可参见这里。

下面是CSVRecordReader的实现代码。除了CSV文件的解析、nextKeyValue()方法和getCurrentValue()方法外,大部分方法都直接调用LineRecordReader实例的相应方法。毕竟我们是踩在巨人的肩膀上继续前进嘛。O(∩_∩)O~

代码3:CSVRecordReader.java

public class CSVRecordReader extends RecordReader<LongWritable, TextArrayWritable> {
    private LineRecordReader lineReader;
    private TextArrayWritable value;
    private CSVParser parser;
// 新建CSVParser实例,用来解析每一行CSV文件的每一行
public CSVRecordReader(Character delimiter) {
    this.lineReader = new LineRecordReader();

    if (delimiter == null) {
        this.parser = new CSVParser();
    } else {
        this.parser = new CSVParser(delimiter);
    }
}

// 调用LineRecordReader的初始化方法,寻找分片的开始位置
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
    throws IOException, InterruptedException {
    lineReader.initialize(split, context);
}

// 使用LineRecordReader来得到下一条记录(即下一行)。
// 如果到了分片(Input Split)的尾部,nextKeyValue将返回NULL
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    if (lineReader.nextKeyValue()) {
        //如果有新记录,则进行处理
        loadCSV();

        return true;
    } else {
        value = null;

        return false;
    }
}

@Override
public LongWritable getCurrentKey()
    throws IOException, InterruptedException {
    return lineReader.getCurrentKey();
}

@Override
public TextArrayWritable getCurrentValue()
    throws IOException, InterruptedException {
    return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
    return lineReader.getProgress();
}

@Override
public void close() throws IOException {
    lineReader.close();
}

// 对CSV文件的每一行进行处理
private void loadCSV() throws IOException {
    String line = lineReader.getCurrentValue().toString();

    // 通过OpenCSV将解析每一行的各字段
    String[] tokens = parser.parseLine(line);
    value = new TextArrayWritable(convert(tokens));
}

// 将字符串数组批量处理为Text数组
private Text[] convert(String[] tokens) {
    Text[] t = new Text[tokens.length];

    for (int i = 0; i &lt; t.length; i++) {
        t[i] = new Text(tokens[i]);
    }

    return t;
}

}

4 简单的应用

用于处理CSV文件输入的InputFormat已经写完了,现在构造一个简单的应用场景,来试验下这个CSVInputFormat。

假设有这样一些数据,每一列第一个字段为一个标识,后面为随机产生的数字,标识各不相同,求每一行标识后的数字之和并输出,输出格式为:每一行为标识和数字和。

 

由于标识没有重复,并且逻辑比较简单,这里只写一个Mapper即可,不需要Reducer。

代码4:CSVMapper.java

public class CSVMapper extends Mapper<LongWritable, TextArrayWritable, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, TextArrayWritable value,
        Context context) throws IOException, InterruptedException {
        String[] values = value.toStrings();
        int sum = 0;
        Text resultKey = new Text(values[0]);
    for (int i = 1; i &lt; values.length; i++) {
        sum = sum + Integer.valueOf(values[i].trim());
    }

    IntWritable resultValue = new IntWritable(sum);
    context.write(resultKey, resultValue);
}

}

在作业的提交部分,由于没有Reducer,所以将ReduceTask设置为了0

代码5:JustRun.java

public class JustRun extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
    Job job = new Job(conf);
    job.setJobName("CSVTest");
    job.setJarByClass(JustRun.class);

    job.setMapperClass(CSVMapper.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(CSVInputFormat.class);

    job.setNumReduceTasks(0);

    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception {
    int ret = ToolRunner.run(new JustRun(), args);
    System.exit(ret);
}

}

执行完毕后,输出如下,跟预想是一致的。

好了,这就是利用InputFormat对CSV文件的处理过程。除了CSV文件,还可根据处理数据的类型,写出更多的InputFormat。同时,我们还可以利用OutputFormat输出需要的格式。

  • Hadoop

    Hadoop 是由 Apache 基金会所开发的一个分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。

    95 引用 • 122 回帖 • 634 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Angular

    AngularAngularJS 的新版本。

    26 引用 • 66 回帖 • 578 关注
  • AWS
    11 引用 • 28 回帖 • 2 关注
  • Follow
    4 引用 • 13 回帖 • 19 关注
  • webpack

    webpack 是一个用于前端开发的模块加载器和打包工具,它能把各种资源,例如 JS、CSS(less/sass)、图片等都作为模块来使用和处理。

    43 引用 • 130 回帖 • 259 关注
  • CodeMirror
    2 引用 • 17 回帖 • 197 关注
  • 叶归
    25 引用 • 100 回帖 • 37 关注
  • Firefox

    Mozilla Firefox 中文俗称“火狐”(正式缩写为 Fx 或 fx,非正式缩写为 FF),是一个开源的网页浏览器,使用 Gecko 排版引擎,支持多种操作系统,如 Windows、OSX 及 Linux 等。

    7 引用 • 30 回帖 • 367 关注
  • HBase

    HBase 是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的 Google 论文 “Bigtable:一个结构化数据的分布式存储系统”。就像 Bigtable 利用了 Google 文件系统所提供的分布式数据存储一样,HBase 在 Hadoop 之上提供了类似于 Bigtable 的能力。

    17 引用 • 6 回帖 • 72 关注
  • WordPress

    WordPress 是一个使用 PHP 语言开发的博客平台,用户可以在支持 PHP 和 MySQL 数据库的服务器上架设自己的博客。也可以把 WordPress 当作一个内容管理系统(CMS)来使用。WordPress 是一个免费的开源项目,在 GNU 通用公共许可证(GPLv2)下授权发布。

    46 引用 • 114 回帖 • 139 关注
  • Q&A

    提问之前请先看《提问的智慧》,好的问题比好的答案更有价值。

    11154 引用 • 50649 回帖 • 52 关注
  • Solidity

    Solidity 是一种智能合约高级语言,运行在 [以太坊] 虚拟机(EVM)之上。它的语法接近于 JavaScript,是一种面向对象的语言。

    3 引用 • 18 回帖 • 456 关注
  • SVN

    SVN 是 Subversion 的简称,是一个开放源代码的版本控制系统,相较于 RCS、CVS,它采用了分支管理系统,它的设计目标就是取代 CVS。

    29 引用 • 98 回帖 • 694 关注
  • 尊园地产

    昆明尊园房地产经纪有限公司,即:Kunming Zunyuan Property Agency Company Limited(简称“尊园地产”)于 2007 年 6 月开始筹备,2007 年 8 月 18 日正式成立,注册资本 200 万元,公司性质为股份经纪有限公司,主营业务为:代租、代售、代办产权过户、办理银行按揭、担保、抵押、评估等。

    1 引用 • 22 回帖 • 838 关注
  • Windows

    Microsoft Windows 是美国微软公司研发的一套操作系统,它问世于 1985 年,起初仅仅是 Microsoft-DOS 模拟环境,后续的系统版本由于微软不断的更新升级,不但易用,也慢慢的成为家家户户人们最喜爱的操作系统。

    232 引用 • 484 回帖 • 2 关注
  • golang

    Go 语言是 Google 推出的一种全新的编程语言,可以在不损失应用程序性能的情况下降低代码的复杂性。谷歌首席软件工程师罗布派克(Rob Pike)说:我们之所以开发 Go,是因为过去 10 多年间软件开发的难度令人沮丧。Go 是谷歌 2009 发布的第二款编程语言。

    502 引用 • 1397 回帖 • 239 关注
  • Vditor

    Vditor 是一款浏览器端的 Markdown 编辑器,支持所见即所得、即时渲染(类似 Typora)和分屏预览模式。它使用 TypeScript 实现,支持原生 JavaScript、Vue、React 和 Angular。

    386 引用 • 1892 回帖 • 1 关注
  • App

    App(应用程序,Application 的缩写)一般指手机软件。

    91 引用 • 384 回帖
  • MyBatis

    MyBatis 本是 Apache 软件基金会 的一个开源项目 iBatis,2010 年这个项目由 Apache 软件基金会迁移到了 google code,并且改名为 MyBatis ,2013 年 11 月再次迁移到了 GitHub。

    174 引用 • 414 回帖 • 343 关注
  • 区块链

    区块链是分布式数据存储、点对点传输、共识机制、加密算法等计算机技术的新型应用模式。所谓共识机制是区块链系统中实现不同节点之间建立信任、获取权益的数学算法 。

    92 引用 • 752 回帖
  • SQLite

    SQLite 是一个进程内的库,实现了自给自足的、无服务器的、零配置的、事务性的 SQL 数据库引擎。SQLite 是全世界使用最为广泛的数据库引擎。

    4 引用 • 7 回帖
  • Google

    Google(Google Inc.,NASDAQ:GOOG)是一家美国上市公司(公有股份公司),于 1998 年 9 月 7 日以私有股份公司的形式创立,设计并管理一个互联网搜索引擎。Google 公司的总部称作“Googleplex”,它位于加利福尼亚山景城。Google 目前被公认为是全球规模最大的搜索引擎,它提供了简单易用的免费服务。不作恶(Don't be evil)是谷歌公司的一项非正式的公司口号。

    51 引用 • 200 回帖 • 2 关注
  • 小薇

    小薇是一个用 Java 写的 QQ 聊天机器人 Web 服务,可以用于社群互动。

    由于 Smart QQ 从 2019 年 1 月 1 日起停止服务,所以该项目也已经停止维护了!

    35 引用 • 468 回帖 • 768 关注
  • BookxNote

    BookxNote 是一款全新的电子书学习工具,助力您的学习与思考,让您的大脑更高效的记忆。

    笔记整理交给我,一心只读圣贤书。

    1 引用 • 1 回帖
  • 资讯

    资讯是用户因为及时地获得它并利用它而能够在相对短的时间内给自己带来价值的信息,资讯有时效性和地域性。

    56 引用 • 85 回帖
  • 学习

    “梦想从学习开始,事业从实践起步” —— 习近平

    176 引用 • 544 回帖
  • 程序员

    程序员是从事程序开发、程序维护的专业人员。

    599 引用 • 3541 回帖
  • WiFiDog

    WiFiDog 是一套开源的无线热点认证管理工具,主要功能包括:位置相关的内容递送;用户认证和授权;集中式网络监控。

    1 引用 • 7 回帖 • 633 关注