在Hadoop中处理输入的CSV文件

本贴最后更新于 3165 天前,其中的信息可能已经东海扬尘

在Hadoop中,InputFormat类用来生成可供Mapper处理的<key, value>键值对。当数据传送给Mapper时,Mapper会将输入分片传送到InputFormat上,InputFormat调用getRecordReader()方法生成RecordReader,RecordReader再创建可供map函数处理的键值对<K1, V1>。

Hadoop预定义了多种方法将不同类型的输入数据转化为map能够处理的键值对。比如,TextInputFormat,Hadoop中默认的输入方法,会将每行数据生成一条记录,其中key值为每条记录在分片中的字节偏移量,value则为每行的内容。

在Hadoop预定义的InputFormat中,并没有处理CSV文件的方法。CSV文件的本质其实是用逗号分隔开的文本文件。一种很直观的处理方法是:将CSV文件作为文本文件处理,使用TextInputFormat将文件按行传入map函数,在map函数中再按照CSV文件的格式进行处理。但这样很容易将数据格式的处理逻辑与业务处理逻辑混淆在一起,并且出现很多copy-and-pasted的代码。

实际上,可以写一个自己的InputFormat以及RecordReader类,专门用来处理CSV文件的输入,直接传递给map函数解析后的数据。

1 数据结构

我们传递给map函数一个ArrayWritable(A Writable for arrays containing instances of a class),元素类型为Text,即CSV文件每一行各个字段的数据。数据结构如下:

代码1:TextArrayWritable.java

public class TextArrayWritable extends ArrayWritable {
    public TextArrayWritable() {
        super(Text.class);
    }
public TextArrayWritable(Text[] strings) { super(Text.class, strings); }

}

2 CSVInputFormat

FileInputFormat是所有使用文件作为其数据源的InputFormat实现的基类。它提供了两个功能:一是定义哪些文件包含在一个作业的输入中,另一个是为输入文件生成分片(Input Splits)。而把分片分割成记录的事情交由其子类来完成。所以CSVInputFormat类的实现上,同样是继承InputFormat类,并只需要简单的重写createRecordReader和isSplitable即可。

代码2:CSVInputFormat.java

public class CSVInputFormat extends FileInputFormat<LongWritable, TextArrayWritable> {
    public static final String CSV_TOKEN_SEPARATOR_CONFIG = "csvinputformat.token.delimiter";
@Override protected boolean isSplitable(JobContext context, Path filename) { CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename); return codec == null; } @Override public RecordReader&lt;LongWritable, TextArrayWritable&gt; createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { String csvDelimiter = context.getConfiguration() .get(CSV_TOKEN_SEPARATOR_CONFIG); Character separator = null; if ((csvDelimiter != null) &amp;&amp; (csvDelimiter.length() == 1)) { separator = csvDelimiter.charAt(0); } return new CSVRecordReader(separator); }

}

其中csvinputformat.token.delimiter是可在配置文件中配置的CSV输入文件分隔符,createRecordReader完成的工作只是从配置文件中得到分隔符,调用真正对CSV文件分片进行处理,并生成键值对的CSVRecordReader函数,并返回RecordReader对象。

3 CSVRecordReader

对于CSVRecordReader,要实现的功能无非就是将CSV文件中每一行的各字段提取出来,并将各字段作为TextArrayWritable类型的数据结构传递给map函数。

在Hadoop中有一个LineRecordReader类,它将文本文件每一行的内容作为值返回,类型为Text。所以可以直接在CSVRecordReader中使用LineRecordReader,将LineRecordReader返回的每一行再次进行处理。在CSV文件的处理上,这里用到了OpenCSV对CSV文件的每一行进行解析,具体可参见这里。

下面是CSVRecordReader的实现代码。除了CSV文件的解析、nextKeyValue()方法和getCurrentValue()方法外,大部分方法都直接调用LineRecordReader实例的相应方法。毕竟我们是踩在巨人的肩膀上继续前进嘛。O(∩_∩)O~

代码3:CSVRecordReader.java

public class CSVRecordReader extends RecordReader<LongWritable, TextArrayWritable> {
    private LineRecordReader lineReader;
    private TextArrayWritable value;
    private CSVParser parser;
// 新建CSVParser实例,用来解析每一行CSV文件的每一行 public CSVRecordReader(Character delimiter) { this.lineReader = new LineRecordReader(); if (delimiter == null) { this.parser = new CSVParser(); } else { this.parser = new CSVParser(delimiter); } } // 调用LineRecordReader的初始化方法,寻找分片的开始位置 @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { lineReader.initialize(split, context); } // 使用LineRecordReader来得到下一条记录(即下一行)。 // 如果到了分片(Input Split)的尾部,nextKeyValue将返回NULL @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (lineReader.nextKeyValue()) { //如果有新记录,则进行处理 loadCSV(); return true; } else { value = null; return false; } } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return lineReader.getCurrentKey(); } @Override public TextArrayWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return lineReader.getProgress(); } @Override public void close() throws IOException { lineReader.close(); } // 对CSV文件的每一行进行处理 private void loadCSV() throws IOException { String line = lineReader.getCurrentValue().toString(); // 通过OpenCSV将解析每一行的各字段 String[] tokens = parser.parseLine(line); value = new TextArrayWritable(convert(tokens)); } // 将字符串数组批量处理为Text数组 private Text[] convert(String[] tokens) { Text[] t = new Text[tokens.length]; for (int i = 0; i &lt; t.length; i++) { t[i] = new Text(tokens[i]); } return t; }

}

4 简单的应用

用于处理CSV文件输入的InputFormat已经写完了,现在构造一个简单的应用场景,来试验下这个CSVInputFormat。

假设有这样一些数据,每一列第一个字段为一个标识,后面为随机产生的数字,标识各不相同,求每一行标识后的数字之和并输出,输出格式为:每一行为标识和数字和。

 

由于标识没有重复,并且逻辑比较简单,这里只写一个Mapper即可,不需要Reducer。

代码4:CSVMapper.java

public class CSVMapper extends Mapper<LongWritable, TextArrayWritable, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, TextArrayWritable value,
        Context context) throws IOException, InterruptedException {
        String[] values = value.toStrings();
        int sum = 0;
        Text resultKey = new Text(values[0]);
for (int i = 1; i &lt; values.length; i++) { sum = sum + Integer.valueOf(values[i].trim()); } IntWritable resultValue = new IntWritable(sum); context.write(resultKey, resultValue); }

}

在作业的提交部分,由于没有Reducer,所以将ReduceTask设置为了0

代码5:JustRun.java

public class JustRun extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
Job job = new Job(conf); job.setJobName("CSVTest"); job.setJarByClass(JustRun.class); job.setMapperClass(CSVMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(CSVInputFormat.class); job.setNumReduceTasks(0); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new JustRun(), args); System.exit(ret); }

}

执行完毕后,输出如下,跟预想是一致的。

好了,这就是利用InputFormat对CSV文件的处理过程。除了CSV文件,还可根据处理数据的类型,写出更多的InputFormat。同时,我们还可以利用OutputFormat输出需要的格式。

  • Hadoop

    Hadoop 是由 Apache 基金会所开发的一个分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。

    88 引用 • 122 回帖 • 619 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Latke

    Latke 是一款以 JSON 为主的 Java Web 框架。

    71 引用 • 535 回帖 • 824 关注
  • IBM

    IBM(国际商业机器公司)或万国商业机器公司,简称 IBM(International Business Machines Corporation),总公司在纽约州阿蒙克市。1911 年托马斯·沃森创立于美国,是全球最大的信息技术和业务解决方案公司,拥有全球雇员 30 多万人,业务遍及 160 多个国家和地区。

    17 引用 • 53 回帖 • 149 关注
  • Flutter

    Flutter 是谷歌的移动 UI 框架,可以快速在 iOS 和 Android 上构建高质量的原生用户界面。 Flutter 可以与现有的代码一起工作,它正在被越来越多的开发者和组织使用,并且 Flutter 是完全免费、开源的。

    39 引用 • 92 回帖 • 2 关注
  • OAuth

    OAuth 协议为用户资源的授权提供了一个安全的、开放而又简易的标准。与以往的授权方式不同之处是 oAuth 的授权不会使第三方触及到用户的帐号信息(如用户名与密码),即第三方无需使用用户的用户名与密码就可以申请获得该用户资源的授权,因此 oAuth 是安全的。oAuth 是 Open Authorization 的简写。

    36 引用 • 103 回帖 • 29 关注
  • LeetCode

    LeetCode(力扣)是一个全球极客挚爱的高质量技术成长平台,想要学习和提升专业能力从这里开始,充足技术干货等你来啃,轻松拿下 Dream Offer!

    209 引用 • 72 回帖 • 1 关注
  • FlowUs

    FlowUs.息流 个人及团队的新一代生产力工具。

    让复杂的信息管理更轻松、自由、充满创意。

    1 引用 • 1 关注
  • 游戏

    沉迷游戏伤身,强撸灰飞烟灭。

    181 引用 • 821 回帖
  • CloudFoundry

    Cloud Foundry 是 VMware 推出的业界第一个开源 PaaS 云平台,它支持多种框架、语言、运行时环境、云平台及应用服务,使开发人员能够在几秒钟内进行应用程序的部署和扩展,无需担心任何基础架构的问题。

    5 引用 • 18 回帖 • 177 关注
  • DevOps

    DevOps(Development 和 Operations 的组合词)是一组过程、方法与系统的统称,用于促进开发(应用程序/软件工程)、技术运营和质量保障(QA)部门之间的沟通、协作与整合。

    57 引用 • 25 回帖 • 10 关注
  • golang

    Go 语言是 Google 推出的一种全新的编程语言,可以在不损失应用程序性能的情况下降低代码的复杂性。谷歌首席软件工程师罗布派克(Rob Pike)说:我们之所以开发 Go,是因为过去 10 多年间软件开发的难度令人沮丧。Go 是谷歌 2009 发布的第二款编程语言。

    498 引用 • 1395 回帖 • 254 关注
  • 快应用

    快应用 是基于手机硬件平台的新型应用形态;标准是由主流手机厂商组成的快应用联盟联合制定;快应用标准的诞生将在研发接口、能力接入、开发者服务等层面建设标准平台;以平台化的生态模式对个人开发者和企业开发者全品类开放。

    15 引用 • 127 回帖
  • Notion

    Notion - The all-in-one workspace for your notes, tasks, wikis, and databases.

    10 引用 • 76 回帖
  • 招聘

    哪里都缺人,哪里都不缺人。

    189 引用 • 1057 回帖
  • 分享

    有什么新发现就分享给大家吧!

    247 引用 • 1794 回帖 • 3 关注
  • Sandbox

    如果帖子标签含有 Sandbox ,则该帖子会被视为“测试帖”,主要用于测试社区功能,排查 bug 等,该标签下内容不定期进行清理。

    431 引用 • 1250 回帖 • 599 关注
  • Vue.js

    Vue.js(读音 /vju ː/,类似于 view)是一个构建数据驱动的 Web 界面库。Vue.js 的目标是通过尽可能简单的 API 实现响应的数据绑定和组合的视图组件。

    267 引用 • 666 回帖 • 2 关注
  • Node.js

    Node.js 是一个基于 Chrome JavaScript 运行时建立的平台, 用于方便地搭建响应速度快、易于扩展的网络应用。Node.js 使用事件驱动, 非阻塞 I/O 模型而得以轻量和高效。

    139 引用 • 269 回帖
  • Ant-Design

    Ant Design 是服务于企业级产品的设计体系,基于确定和自然的设计价值观上的模块化解决方案,让设计者和开发者专注于更好的用户体验。

    17 引用 • 23 回帖 • 1 关注
  • BookxNote

    BookxNote 是一款全新的电子书学习工具,助力您的学习与思考,让您的大脑更高效的记忆。

    笔记整理交给我,一心只读圣贤书。

    1 引用 • 1 回帖 • 1 关注
  • TensorFlow

    TensorFlow 是一个采用数据流图(data flow graphs),用于数值计算的开源软件库。节点(Nodes)在图中表示数学操作,图中的线(edges)则表示在节点间相互联系的多维数据数组,即张量(tensor)。

    20 引用 • 19 回帖
  • Laravel

    Laravel 是一套简洁、优雅的 PHP Web 开发框架。它采用 MVC 设计,是一款崇尚开发效率的全栈框架。

    20 引用 • 23 回帖 • 740 关注
  • RemNote
    2 引用 • 16 回帖 • 8 关注
  • Quicker

    Quicker 您的指尖工具箱!操作更少,收获更多!

    36 引用 • 155 回帖
  • MyBatis

    MyBatis 本是 Apache 软件基金会 的一个开源项目 iBatis,2010 年这个项目由 Apache 软件基金会迁移到了 google code,并且改名为 MyBatis ,2013 年 11 月再次迁移到了 GitHub。

    173 引用 • 414 回帖 • 368 关注
  • 电影

    这是一个不能说的秘密。

    122 引用 • 608 回帖 • 1 关注
  • VirtualBox

    VirtualBox 是一款开源虚拟机软件,最早由德国 Innotek 公司开发,由 Sun Microsystems 公司出品的软件,使用 Qt 编写,在 Sun 被 Oracle 收购后正式更名成 Oracle VM VirtualBox。

    10 引用 • 2 回帖 • 15 关注
  • HHKB

    HHKB 是富士通的 Happy Hacking 系列电容键盘。电容键盘即无接点静电电容式键盘(Capacitive Keyboard)。

    5 引用 • 74 回帖 • 495 关注