在Hadoop中处理输入的CSV文件

本贴最后更新于 2820 天前,其中的信息可能已经东海扬尘

在Hadoop中,InputFormat类用来生成可供Mapper处理的<key, value>键值对。当数据传送给Mapper时,Mapper会将输入分片传送到InputFormat上,InputFormat调用getRecordReader()方法生成RecordReader,RecordReader再创建可供map函数处理的键值对<K1, V1>。

Hadoop预定义了多种方法将不同类型的输入数据转化为map能够处理的键值对。比如,TextInputFormat,Hadoop中默认的输入方法,会将每行数据生成一条记录,其中key值为每条记录在分片中的字节偏移量,value则为每行的内容。

在Hadoop预定义的InputFormat中,并没有处理CSV文件的方法。CSV文件的本质其实是用逗号分隔开的文本文件。一种很直观的处理方法是:将CSV文件作为文本文件处理,使用TextInputFormat将文件按行传入map函数,在map函数中再按照CSV文件的格式进行处理。但这样很容易将数据格式的处理逻辑与业务处理逻辑混淆在一起,并且出现很多copy-and-pasted的代码。

实际上,可以写一个自己的InputFormat以及RecordReader类,专门用来处理CSV文件的输入,直接传递给map函数解析后的数据。

1 数据结构

我们传递给map函数一个ArrayWritable(A Writable for arrays containing instances of a class),元素类型为Text,即CSV文件每一行各个字段的数据。数据结构如下:

代码1:TextArrayWritable.java

public class TextArrayWritable extends ArrayWritable {
    public TextArrayWritable() {
        super(Text.class);
    }
public TextArrayWritable(Text[] strings) {
    super(Text.class, strings);
}

}

2 CSVInputFormat

FileInputFormat是所有使用文件作为其数据源的InputFormat实现的基类。它提供了两个功能:一是定义哪些文件包含在一个作业的输入中,另一个是为输入文件生成分片(Input Splits)。而把分片分割成记录的事情交由其子类来完成。所以CSVInputFormat类的实现上,同样是继承InputFormat类,并只需要简单的重写createRecordReader和isSplitable即可。

代码2:CSVInputFormat.java

public class CSVInputFormat extends FileInputFormat<LongWritable, TextArrayWritable> {
    public static final String CSV_TOKEN_SEPARATOR_CONFIG = "csvinputformat.token.delimiter";
@Override
protected boolean isSplitable(JobContext context, Path filename) {
    CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename);

    return codec == null;
}

@Override
public RecordReader&lt;LongWritable, TextArrayWritable&gt; createRecordReader(
    InputSplit split, TaskAttemptContext context)
    throws IOException, InterruptedException {
    String csvDelimiter = context.getConfiguration()
                                 .get(CSV_TOKEN_SEPARATOR_CONFIG);
    Character separator = null;

    if ((csvDelimiter != null) &amp;&amp; (csvDelimiter.length() == 1)) {
        separator = csvDelimiter.charAt(0);
    }

    return new CSVRecordReader(separator);
}

}

其中csvinputformat.token.delimiter是可在配置文件中配置的CSV输入文件分隔符,createRecordReader完成的工作只是从配置文件中得到分隔符,调用真正对CSV文件分片进行处理,并生成键值对的CSVRecordReader函数,并返回RecordReader对象。

3 CSVRecordReader

对于CSVRecordReader,要实现的功能无非就是将CSV文件中每一行的各字段提取出来,并将各字段作为TextArrayWritable类型的数据结构传递给map函数。

在Hadoop中有一个LineRecordReader类,它将文本文件每一行的内容作为值返回,类型为Text。所以可以直接在CSVRecordReader中使用LineRecordReader,将LineRecordReader返回的每一行再次进行处理。在CSV文件的处理上,这里用到了OpenCSV对CSV文件的每一行进行解析,具体可参见这里。

下面是CSVRecordReader的实现代码。除了CSV文件的解析、nextKeyValue()方法和getCurrentValue()方法外,大部分方法都直接调用LineRecordReader实例的相应方法。毕竟我们是踩在巨人的肩膀上继续前进嘛。O(∩_∩)O~

代码3:CSVRecordReader.java

public class CSVRecordReader extends RecordReader<LongWritable, TextArrayWritable> {
    private LineRecordReader lineReader;
    private TextArrayWritable value;
    private CSVParser parser;
// 新建CSVParser实例,用来解析每一行CSV文件的每一行
public CSVRecordReader(Character delimiter) {
    this.lineReader = new LineRecordReader();

    if (delimiter == null) {
        this.parser = new CSVParser();
    } else {
        this.parser = new CSVParser(delimiter);
    }
}

// 调用LineRecordReader的初始化方法,寻找分片的开始位置
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
    throws IOException, InterruptedException {
    lineReader.initialize(split, context);
}

// 使用LineRecordReader来得到下一条记录(即下一行)。
// 如果到了分片(Input Split)的尾部,nextKeyValue将返回NULL
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    if (lineReader.nextKeyValue()) {
        //如果有新记录,则进行处理
        loadCSV();

        return true;
    } else {
        value = null;

        return false;
    }
}

@Override
public LongWritable getCurrentKey()
    throws IOException, InterruptedException {
    return lineReader.getCurrentKey();
}

@Override
public TextArrayWritable getCurrentValue()
    throws IOException, InterruptedException {
    return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
    return lineReader.getProgress();
}

@Override
public void close() throws IOException {
    lineReader.close();
}

// 对CSV文件的每一行进行处理
private void loadCSV() throws IOException {
    String line = lineReader.getCurrentValue().toString();

    // 通过OpenCSV将解析每一行的各字段
    String[] tokens = parser.parseLine(line);
    value = new TextArrayWritable(convert(tokens));
}

// 将字符串数组批量处理为Text数组
private Text[] convert(String[] tokens) {
    Text[] t = new Text[tokens.length];

    for (int i = 0; i &lt; t.length; i++) {
        t[i] = new Text(tokens[i]);
    }

    return t;
}

}

4 简单的应用

用于处理CSV文件输入的InputFormat已经写完了,现在构造一个简单的应用场景,来试验下这个CSVInputFormat。

假设有这样一些数据,每一列第一个字段为一个标识,后面为随机产生的数字,标识各不相同,求每一行标识后的数字之和并输出,输出格式为:每一行为标识和数字和。

 

由于标识没有重复,并且逻辑比较简单,这里只写一个Mapper即可,不需要Reducer。

代码4:CSVMapper.java

public class CSVMapper extends Mapper<LongWritable, TextArrayWritable, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, TextArrayWritable value,
        Context context) throws IOException, InterruptedException {
        String[] values = value.toStrings();
        int sum = 0;
        Text resultKey = new Text(values[0]);
    for (int i = 1; i &lt; values.length; i++) {
        sum = sum + Integer.valueOf(values[i].trim());
    }

    IntWritable resultValue = new IntWritable(sum);
    context.write(resultKey, resultValue);
}

}

在作业的提交部分,由于没有Reducer,所以将ReduceTask设置为了0

代码5:JustRun.java

public class JustRun extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
    Job job = new Job(conf);
    job.setJobName("CSVTest");
    job.setJarByClass(JustRun.class);

    job.setMapperClass(CSVMapper.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(CSVInputFormat.class);

    job.setNumReduceTasks(0);

    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception {
    int ret = ToolRunner.run(new JustRun(), args);
    System.exit(ret);
}

}

执行完毕后,输出如下,跟预想是一致的。

好了,这就是利用InputFormat对CSV文件的处理过程。除了CSV文件,还可根据处理数据的类型,写出更多的InputFormat。同时,我们还可以利用OutputFormat输出需要的格式。

  • Hadoop

    Hadoop 是由 Apache 基金会所开发的一个分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。

    82 引用 • 122 回帖 • 620 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • SendCloud

    SendCloud 由搜狐武汉研发中心孵化的项目,是致力于为开发者提供高质量的触发邮件服务的云端邮件发送平台,为开发者提供便利的 API 接口来调用服务,让邮件准确迅速到达用户收件箱并获得强大的追踪数据。

    2 引用 • 8 回帖 • 440 关注
  • V2EX

    V2EX 是创意工作者们的社区。这里目前汇聚了超过 400,000 名主要来自互联网行业、游戏行业和媒体行业的创意工作者。V2EX 希望能够成为创意工作者们的生活和事业的一部分。

    17 引用 • 236 回帖 • 418 关注
  • 笔记

    好记性不如烂笔头。

    304 引用 • 777 回帖
  • 周末

    星期六到星期天晚,实行五天工作制后,指每周的最后两天。再过几年可能就是三天了。

    14 引用 • 297 回帖
  • Swagger

    Swagger 是一款非常流行的 API 开发工具,它遵循 OpenAPI Specification(这是一种通用的、和编程语言无关的 API 描述规范)。Swagger 贯穿整个 API 生命周期,如 API 的设计、编写文档、测试和部署。

    26 引用 • 35 回帖 • 13 关注
  • QQ

    1999 年 2 月腾讯正式推出“腾讯 QQ”,在线用户由 1999 年的 2 人(马化腾和张志东)到现在已经发展到上亿用户了,在线人数超过一亿,是目前使用最广泛的聊天软件之一。

    45 引用 • 557 回帖 • 221 关注
  • TextBundle

    TextBundle 文件格式旨在应用程序之间交换 Markdown 或 Fountain 之类的纯文本文件时,提供更无缝的用户体验。

    1 引用 • 2 回帖 • 48 关注
  • Redis

    Redis 是一个开源的使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,并提供多种语言的 API。从 2010 年 3 月 15 日起,Redis 的开发工作由 VMware 主持。从 2013 年 5 月开始,Redis 的开发由 Pivotal 赞助。

    284 引用 • 247 回帖 • 175 关注
  • 安全

    安全永远都不是一个小问题。

    189 引用 • 813 回帖 • 1 关注
  • 互联网

    互联网(Internet),又称网际网络,或音译因特网、英特网。互联网始于 1969 年美国的阿帕网,是网络与网络之间所串连成的庞大网络,这些网络以一组通用的协议相连,形成逻辑上的单一巨大国际网络。

    96 引用 • 330 回帖
  • 思源笔记

    思源笔记是一款隐私优先的个人知识管理系统,支持完全离线使用,同时也支持端到端加密同步。

    融合块、大纲和双向链接,重构你的思维。

    18722 引用 • 69932 回帖
  • JVM

    JVM(Java Virtual Machine)Java 虚拟机是一个微型操作系统,有自己的硬件构架体系,还有相应的指令系统。能够识别 Java 独特的 .class 文件(字节码),能够将这些文件中的信息读取出来,使得 Java 程序只需要生成 Java 虚拟机上的字节码后就能在不同操作系统平台上进行运行。

    180 引用 • 120 回帖 • 1 关注
  • 设计模式

    设计模式(Design pattern)代表了最佳的实践,通常被有经验的面向对象的软件开发人员所采用。设计模式是软件开发人员在软件开发过程中面临的一般问题的解决方案。这些解决方案是众多软件开发人员经过相当长的一段时间的试验和错误总结出来的。

    198 引用 • 120 回帖
  • Electron

    Electron 基于 Chromium 和 Node.js,让你可以使用 HTML、CSS 和 JavaScript 构建应用。它是一个由 GitHub 及众多贡献者组成的活跃社区共同维护的开源项目,兼容 Mac、Windows 和 Linux,它构建的应用可在这三个操作系统上面运行。

    15 引用 • 136 回帖 • 6 关注
  • App

    App(应用程序,Application 的缩写)一般指手机软件。

    90 引用 • 383 回帖 • 1 关注
  • Rust

    Rust 是一门赋予每个人构建可靠且高效软件能力的语言。Rust 由 Mozilla 开发,最早发布于 2014 年 9 月。

    57 引用 • 22 回帖 • 5 关注
  • Lute

    Lute 是一款结构化的 Markdown 引擎,支持 Go 和 JavaScript。

    25 引用 • 191 回帖 • 21 关注
  • WebComponents

    Web Components 是 W3C 定义的标准,它给了前端开发者扩展浏览器标签的能力,可以方便地定制可复用组件,更好的进行模块化开发,解放了前端开发者的生产力。

    1 引用 • 25 关注
  • Laravel

    Laravel 是一套简洁、优雅的 PHP Web 开发框架。它采用 MVC 设计,是一款崇尚开发效率的全栈框架。

    19 引用 • 23 回帖 • 686 关注
  • CloudFoundry

    Cloud Foundry 是 VMware 推出的业界第一个开源 PaaS 云平台,它支持多种框架、语言、运行时环境、云平台及应用服务,使开发人员能够在几秒钟内进行应用程序的部署和扩展,无需担心任何基础架构的问题。

    5 引用 • 18 回帖 • 153 关注
  • MongoDB

    MongoDB(来自于英文单词“Humongous”,中文含义为“庞大”)是一个基于分布式文件存储的数据库,由 C++ 语言编写。旨在为应用提供可扩展的高性能数据存储解决方案。MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。它支持的数据结构非常松散,是类似 JSON 的 BSON 格式,因此可以存储比较复杂的数据类型。

    90 引用 • 59 回帖
  • Vue.js

    Vue.js(读音 /vju ː/,类似于 view)是一个构建数据驱动的 Web 界面库。Vue.js 的目标是通过尽可能简单的 API 实现响应的数据绑定和组合的视图组件。

    261 引用 • 662 回帖
  • WiFiDog

    WiFiDog 是一套开源的无线热点认证管理工具,主要功能包括:位置相关的内容递送;用户认证和授权;集中式网络监控。

    1 引用 • 7 回帖 • 545 关注
  • Swift

    Swift 是苹果于 2014 年 WWDC(苹果开发者大会)发布的开发语言,可与 Objective-C 共同运行于 Mac OS 和 iOS 平台,用于搭建基于苹果平台的应用程序。

    34 引用 • 37 回帖 • 499 关注
  • SQLite

    SQLite 是一个进程内的库,实现了自给自足的、无服务器的、零配置的、事务性的 SQL 数据库引擎。SQLite 是全世界使用最为广泛的数据库引擎。

    4 引用 • 7 回帖 • 2 关注
  • DevOps

    DevOps(Development 和 Operations 的组合词)是一组过程、方法与系统的统称,用于促进开发(应用程序/软件工程)、技术运营和质量保障(QA)部门之间的沟通、协作与整合。

    40 引用 • 24 回帖
  • 正则表达式

    正则表达式(Regular Expression)使用单个字符串来描述、匹配一系列遵循某个句法规则的字符串。

    31 引用 • 94 回帖