Learning Spark 中文版 -- 第五章 -- 加载保存数据(2)

本贴最后更新于 2188 天前,其中的信息可能已经沧海桑田

SequenceFiles(序列文件)

  SequenceFile 是 Hadoop 的一种由键值对小文件组成的流行的格式。SequenceFIle 有同步标记,Spark 可以寻找标记点,然后与记录边界重新同步。Spark 还可以从多个节点并行高效地读取 SequenceFile。SequenceFile 也是 Hadoop MapReduce 中 job 的常用输入输出格式,如果你正使用着 Hadoop 系统,数据很有可能就是 SequenceFile 格式的。

  SequenceFile 是由实现 Hadoop Writabble 接口的元素组成的,Hadoop 使用的是自定义的序列化框架。表 5-2 列出了一些常用的类型和他们对应的 Writable 类。有个小经验,在你的类名后面加上 Wratable,然后检查这个类是否属于 org.apache.hadoop.io.Writable 包下的子类。如果你的输入数据不属于 Writable 子类,你可以继承 org.apache.hadoop.io.Writable 类并重写 readFieldswrite 方法就可以了。

Hadoop 的 RecordReader 为每条记录重用相同的对象,因此直接调用您读取 RDD 上的缓存可能会失败;相反,添加一个 map() 操作并缓存其结果。除此之外,很多 Hadoop 的 Writable 类没有实现 java.io.Serializable,所以,为了让他们可以使用我们需要用 map() 转换他们。

image

  在 Spark1.0 或更早的版本,只有 Java 和 Scala 可以使用 SequenceFile,但是从 Spark1.1 开始,添加了 Python 也能加载保存 SequenceFile 的功能。注意一点,你在使用 Java 和 Scala 时需要自定义 Writable 类型。Python Spark API 只知道如何将 Hadoop 中可用的基本 Writable 转换为 Python 所使用,对于其他类,API 基于可用的 getter 方法尽最大努力使其正常运行。

Loading SequenceFiles(加载 SequenceFile)

  对于 SequenceFile,Spark 定制了一套 API。我们可以在 SparkContext 上调用 sequenceFile(path, keyClass, valueClass, minPartitions)。之前提到过,SequenceFile 必须结合 Writable 类使用,所以我们键和值的类必须都是正确的 Writable 类。想象一个从 SequenceFile 加载人和他们看过熊猫数量的程序,这个例子中,键的类是 Text,值的类是 IntWritableVIntWritable,为了简化工作,就使用 IntWritable 。示例如下:

Example 5-20. Loading a SequenceFile in Python

val data = sc.sequenceFile(inFile,
    "org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")
    
Example 5-21. Loading a SequenceFile in Scala

val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]).
    map{case (x, y) => (x.toString, y.get())}

Example 5-22. Loading a SequenceFile in Java

public static class ConvertToNativeTypes implements
    PairFunction<Tuple2<Text, IntWritable>, String, Integer> {
    public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> record) {
        return new Tuple2(record._1.toString(), record._2.get());
    }
}
JavaPairRDD<Text, IntWritable> input = sc.sequenceFile(fileName, Text.class,IntWritable.class);
JavaPairRDD<String, Integer> result = input.mapToPair(
    new ConvertToNativeTypes());

在 Scala 中,有一个很方便的函数能把 Writable 转换成 Scala 中对应的类型。而不是如样例那样标明键类型和值类型,我们可以调用 SequenceFile[Key,Value](path,minPartitions),这回返回一个 Scala 原生类型的 RDD。

Saving SequenceFiles(保存 SequenceFile)

  把数据写入一个 SequenceFile 也很类似。首先,因为 SequenceFile 是键值对类型的,我们需要一个键值对 RDD 便于 SequenceFile 写入数据。对于 Scala 很多本地类型,存在在 Scala 和 Hadoop Writable 类型之间的隐式转换,所以如果你正写出一个 Scala 原生类型数据你可以通过调用 saveAsSequenceFile(path) 直接保存你的键值对 RDD。如果我们使用的键和值不能自动转换成 Wratble,或者我们想使用变长类型(如,VIntWritable),我们可以在保存之前遍历数据对其进行映射成 Writable 类型。思考一下之前的例子(人和看熊猫数量的例子)。示例如下:

Example 5-23. Saving a SequenceFile in Scala
val data = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2)))
data.saveAsSequenceFile(outputFile)

  在 Java 中保存 SequenceFile 更麻烦一点,因为 Java 的键值对 RDD 没有 saveAsSequenceFile() 方法。我们需要使用 Spark 的特性去保存定制的 Hadoop 输出格式,我们会在 84 页“Hadoop 输入输出格式中”展示如何在 Java 中保存 SequenceFile。

Object Files(对象文件)

  对象文件看起来是对 SequenceFile 简单地包装,它允许我们保存只包含值的 RDD。和 SequenceFile 不同,对象文件值的写出是利用了 Java 的序列化。

如果你改变了类,例如添加或删除了字段,原来的对象文件就不可读了。对象文件使用序列化有一个好处,它可以跨类的版本保持兼容性,但是需要程序员做一些工作。

  和 SequenceFile 不同,同一个对象,对象文件的输出结果可能和 Hadoop 的输出结果不同。和其他输出格式也不同,对象文件最主要的用处就是用于 Spark job 之间的通信。而且 Java 的序列化也可能非常慢。

  保存对象文件很简单,直接调用 RDD 的 saveAsObjectFile 就可以了。读取对象文件也很简单:SparkContext 有一个函数 objectFile(),它需要接收一个路径参数,返回一个 RDD。

  上面讲了很多对象文件的弊病,你可能会好奇为什么会有人使用它们。一个主要原因就是它几乎不用费任何操作就可以保存任意的对象。

  对象文件在 Python 中无法使用,但是 Python 的 RDD 和 SparkContext 支持名为 saveAsPickleFile()pickleFile() 的方法。这两个方法是用了 Python 的 pickle 序列化库。pickle 文件和对象文件的弊病是相同的,pickle 库很慢,并且如果对类做了改动,原来的类可能无法读取。

Hadoop Input and Output Format(Hadoop 输入输出格式)

  除了 Spark 包装的格式,我们还可以与所有 Hadoop 支持的格式进行交互。Spark 支持新版和老版的 Hadoop 文件 API,提供了很高的灵活性。

loading with other Hadoop input formats(加载其他 Hadoop 格式)

  为了使用 Hadoop 的新版 API 读取文件我们需要告诉 Spark 一些事情。newAPIHadoopFile 需要接收一个路径,和三个类作为参数。第一个类是“格式”类,用来代表我们输入的格式。有一个类似的函数 hadoopFile(),用来处理旧版 API 实现的 Hadoop 输入格式。第二个类是键的类,第三个类是值的类。如果我们需要指定其他的 Hadoop 配置文件属性,我们也可以传入一个 conf 对象。

  一个最简单的 Hadoop 输入格式就是 KeyValueTextInputFormat,可以用来从文本文件中读取键值对数据(Example5-24 中有示例)。每一行都是单独处理的,键和值用 tab 键隔开。这种格式由 Hadoop 提供,所以我们项目中不必添加额外的依赖。

Example 5-24. Loading KeyValueTextInputFormat() with old-style API in Scala
val input = sc.hadoopFile[Text, Text, KeyValueTextInputFormat](inputFile).map{
    case (x, y) => (x.toString, y.toString)
}

  之前看到的加载 JSON 是通过加载文本文件然后再对其转换,但是我们也可以使用 Hadoop 的输入格式加载 JSON。下面这个例子需要对要文件做一点特别设置,你选择跳过也没关系。Twitter 的 Elephant Bird 库支持很多数据格式,包括 JSON,Lucene,Protocol Buffer 相关的格式等等。这个库可以使用新版和老版 API。为了展示如何而在 Spark 中使用新版 API,我们在下面的示例中演示使用 jsonInputFormat 加载 LZO-compressed JSON 数据:

Example 5-25. Loading LZO-compressed JSON with Elephant Bird in Scala

val input = sc.newAPIHadoopFile(inputFile, classOf[LzoJsonInputFormat],
classOf[LongWritable], classOf[MapWritable], conf)
// Each MapWritable in "input" represents a JSON object
//每一个"输出"中的MapWritable都代表一个JSON对象

你需要下载 hadoop-lzo 包并且设置 Spark 的本地库才能使用 LZO 包。如果你下载了 Debian 包,在 Spark 的 submit 调用中加入 --driver-library-path /usr/lib/hadoop/lib/native/ --driver-class-path /usr/lib/hadoop/lib/ 就能做到这一点。

  从使用的角度来看,使用旧版 Hadoop 的 API 读取文件几乎是没有区别,除了我们提供的那个旧式 inputFormat 类。Spark 很多内置便捷的函数(如 sequenceFile())都是实现的旧版 Hadoop API。

Saving with Hadoop output formats(使用 Hadoop 输出格式保存)

  我们已经在某种程度上检验了 SequenceFIle,但是在 Java 中我们没有同样方便的函数来保存键值对 RDD。我们会在 Example5-26 中演示如何使用老版 Hadoop 格式 API,新版本的(saveAsNewAPIHadoopFile)调用也一样。

Example 5-26. Saving a SequenceFile in Java

public static class ConvertToWritableTypes implements
    PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
    public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) {
        return new Tuple2(new Text(record._1), new IntWritable(record._2));
    }
}
JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(input);
JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes());
result.saveAsHadoopFile(fileName, Text.class, IntWritable.class,
    SequenceFileOutputFormat.class);

Non-filesystem data sources(非文件系统数据源)

  除了 hadoopFile()saveAsHadoopFile() 一系列函数,你可以使用 hadoopDataset/saveAsHadoopDataSetnewAPIHadoopDataset/saveAsNewAPIHadoopDataset 来访问 Hadoop 支持的非文件系统的存储格式。举例来讲,很多键值对存储系统(如 HBase 和 MongoDB)提供了直接从键值对存储中读取的 Hadoop 输入格式。在 Spark 中使用也很简单。

  hadoopDataset() 系列函数只需要一个 Configuration 对象参数,你可以设置其访问数据源所需要的 Hadoop 属性。你按照执行 Hadoop MapReduce job 的配置配置即可,所以你可以按照说明访问 MapReduce 的数据源,然后将对象传递给 Spark。在 96 页的“HBase”中展示了如何使用 newAPIHadoopDataset 加载数据。

Example:Protocol buffers (例子:协议缓冲区)

  Protocol buffers 是由 Google 首先开发用来内部的远程过程调用(RPC)并且此后开源了。Protocol buffers(PBs)是结构化数据,其字段和字段类型是明确定义的。他们被优化地便于编码和解码并且占用空间很小。与 XML 相比,PBs 占用空间小 3 到 10 倍,编码解码速度快 20 到 100 倍。尽管 PB 有一致的编码,但是有多种方式可以创建一个由很多 PB 消息组成的文件。

  Protocol buffers 是由特定领域语言定义的,并且 protocol buffer 编译器可以用多种语言来生成访问器方法(Spark 支持的所有语言都可以)。由于 PB 以最小化占用空间为目标,所以它不是"自我解释型的",因为对数据描述编码会占用额外的空间。这意味着解析 PB 格式的数据,我们需要 protocol buffer 的定义来理解它。

  PB 包含 optional(可选的),required(必须的)或者 repeated(重复的)字段。当你解析数据的时候如果丢失了可选的字段不会造成解析失败,但是如果丢失了必须的字段会导致失败。所以,当你在已有的 protocol buffer 中添加新字段,把新字段设为可选是一个好习惯。因为不是每个人都会同时升级(即使他们这样做了,你也可能需要读取老版的数据)。

  PB 字段可能是预定义的类型,也可能是其他的 PB 消息。包括 String,int32,enums 等等类型。这只是对 protocol buffer 做了很简单的介绍,所以如果你感兴趣,最好去浏览一下 Protocol Buffer 的网站。

  在 Example5-27 中,我们展示了从 protocol buffer 格式中加载一些 VenueResponse 对象。例子中 VenueResponse 对象是一个具有单个重复字段的简单格式,而且还包含另一个有 required(必须),optional(可选)和 enumeration(枚举)字段的消息。

Example 5-27. Sample protocol buffer definition

message Venue {
    required int32 id = 1;
    required string name = 2;
    required VenueType type = 3;
    optional string address = 4;
    enum VenueType {
        COFFEESHOP = 0;
        WORKPLACE = 1;
        CLUB = 2;
        OMNOMNOM = 3;
        OTHER = 4;
    }
}
message VenueResponse {
    repeated Venue results = 1;
}

  我们在之前章节用来加载 JSON 数据的推特的 Elephant Bird 库,也支持加载保存 protocol buffer 格式的数据。下面例子演示将 Venues 写出。

Example 5-28. Elephant Bird protocol buffer writeout in Scala

val job = new Job()
val conf = job.getConfiguration
LzoProtobufBlockOutputFormat.setClassConf(classOf[Places.Venue], conf);
val dnaLounge = Places.Venue.newBuilder()
dnaLounge.setId(1);
dnaLounge.setName("DNA Lounge")
dnaLounge.setType(Places.Venue.VenueType.CLUB)
val data = sc.parallelize(List(dnaLounge.build()))
val outputData = data.map{ pb =>
    val protoWritable = ProtobufWritable.newInstance(classOf[Places.Venue]);
    protoWritable.set(pb)
    (null, protoWritable)
}
outputData.saveAsNewAPIHadoopFile(outputFile, classOf[Text],
    classOf[ProtobufWritable[Places.Venue]],
    classOf[LzoProtobufBlockOutputFormat[ProtobufWritable[Places.Venue]]], conf)

  该例代码的完整版本在本书的源码中。(本书 github 地址 databricks/learning-spark)

当你构建项目时,确保使用的 Protocol buffer 库版本与 Spark 相同,撰写本书时,版本是 2.5

package com.oreilly.learningsparkexamples.proto;

message Venue {
  required int32 id = 1;
  required string name = 2;
  required VenueType type = 3;
  optional string address = 4;

  enum VenueType {
    COFFEESHOP = 0;
    WORKPLACE = 1;
    CLUB = 2;
    OMNOMNOM = 3;
    OTHER = 4;
  }
}

message VenueResponse {
  repeated Venue results = 1;
}
/**
 * Saves a sequence file of people and how many pandas they have seen.
 */
package com.oreilly.learningsparkexamples.scala

import com.oreilly.learningsparkexamples.proto.Places

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.io.Text
import com.twitter.elephantbird.mapreduce.io.ProtobufWritable
import com.twitter.elephantbird.mapreduce.output.LzoProtobufBlockOutputFormat
import org.apache.hadoop.conf.Configuration

object BasicSaveProtoBuf {
    def main(args: Array[String]) {
      val master = args(0)
      val outputFile = args(1)
      val sc = new SparkContext(master, "BasicSaveProtoBuf", System.getenv("SPARK_HOME"))
      val conf = new Configuration()
      LzoProtobufBlockOutputFormat.setClassConf(classOf[Places.Venue], conf);
      val dnaLounge = Places.Venue.newBuilder()
      dnaLounge.setId(1);
      dnaLounge.setName("DNA Lounge")
      dnaLounge.setType(Places.Venue.VenueType.CLUB)
      val data = sc.parallelize(List(dnaLounge.build()))
      val outputData = data.map{ pb =>
        val protoWritable = ProtobufWritable.newInstance(classOf[Places.Venue]);
        protoWritable.set(pb)
        (null, protoWritable)
      }
      outputData.saveAsNewAPIHadoopFile(outputFile, classOf[Text], classOf[ProtobufWritable[Places.Venue]],
        classOf[LzoProtobufBlockOutputFormat[ProtobufWritable[Places.Venue]]], conf)
    }
}

File Compression(文件压缩)

  当我们处理大数据时,我们经常发现需要压缩数据来节省存储空间和网络开销。对于大多数大数据格式,我们可以制定压缩编码器来压缩数据。我们之前已经见识到,Spark 原生的输入格式可以自动为我们处理压缩类型。当你读取压缩文件时,有些压缩编码器可以自动判断压缩类型。

  这些压缩选项仅适用于支持压缩的 Hadoop 格式,即写入文件系统的格式。数据库 Hadoop 格式通常不支持压缩,或者它们在数据库自身配置了对数据的压缩。

  选择一个输出压缩编码器可能会对以后的数据使用人员造成很大的影响。像 Spark 这种分布式系统,我们通常会从多个不同的机器中读取数据。为了满足分布式的需求,每个工作节点需要找到新数据记录的开始点。有些压缩格式让人无从下手,因为如果需要单个节点读取数据,就很容易导致性能瓶颈。能够很容易从多个机器中读取的数据叫做“可分割性(splitable)”。表 5-3 罗列了可用的压缩选择。

格式 可分割 平均压缩速度 文本效率 Hadoop 压缩编码器
JAVA
原生 评价
gzip N org.apache.hadoop
.io.compress.GzipCodec
lzo Y 非常快 中等 com.hadoop.compression
.lzo.LzoCodec
每个工作节点都
需要下载 LZO 库
bzip2 Y 很快 org.apache.hadoop.io.compress
.BZip2Codec
对于 java
使用可分割版本
zlib N 中等 org.apache.hadoop.io.compress
.DefaultCodec
hadoop 的
默认压缩编码器
Snappy N 非常快 org.apache.hadoop.io.compress
.SnappyCodec
Snappy 有一个
纯 Java 端口,但在
Spark 和 Hadoop 中尚不可用

Spark 的 textFile() 方法可以处理压缩的输入,即使在输入被压缩以致其可以以分割的形式读取的情况下,它也会自动禁用可分割性。如果你发现你需要读取一个很大的单个压缩输入数据,最好跳过 Spark 的包装,使用 newAPIHadoopFile 或者 hadoopFIle 并且明确指定正确的压缩编码器。

  有些输入格式(如 SequenceFile)允许我们压缩键值对数据的值,这样可以在查找时提高效率。其他输入格式有自己对压缩控制的方式:例如,推特的 Elephant Bird 包中的许多格式使用 LZO 压缩数据。

Filesystems(文件系统)

  Spark 支持很多文件系统的读取写出,我们可以使用我们想用的各种文件格式。

Local/"Regular" FS(本地或常规文件系统)

  尽管 Spark 支持从本地文件系统加载文件,但需要文件在集群所有节点的路径都相同才能够使用。

  一些网络文件系统,如 NFS,AFS 和 MapR 的 NFS 层,作为常规文件系统向用户公开。如果你的数据已经在上述某个文件系统中,指定一个路径

Example 5-29. Loading a compressed text file from the local filesystem in Scala

val rdd = sc.textFile("file:///home/holden/happypandas.gz")

  如果文件还没有在集群的所有节点上,你可以不同通过 Spark 在驱动程序上本地加载,然后调用 parallelize 将内容分发给 worker。这种方式速度很慢,所以我们建议把文件放在 HDFS,NFS 或 S3 等共享文件系统中。

Amazon S3(亚马逊的 s3)

  Amazon S3 是一个日渐流行的存储大数据的选择。当您的计算节点位于 Amazon EC2 内时,S3 非常快速,但是如果你必须在公网上传输,那可能会有很糟糕的表现。

  要访问 Spark 中的 S3,您应该首先将 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY 环境变量设置为您的 S3 凭证。你可以从 Amazon Web Service 控制台创建这些凭证。然后传递一个以 s3n://开头的路径给 Spark 的读取文件方法,格式为 s3n://bucket/pathwithin-bucket。和其他文件系统一样,Spark 支持 S3 的通配符路径,如

  如果你从 Amazon 获得 S3 访问权限错误,请确保为其指定了访问密钥的帐户同时具有“read”和“list”权限。Spark 需要能够列出存储 bucket 中的对象以识别您想要读取的对象。

HDFS

  HDFS 是非常流行的分布式文件系统,Spark 与其对接的效果也很好。HDFS 被设计为在商用硬件上工作的系统,并且在提供高吞吐量的情况下还能保证对故障处理的弹性。Spark 和 HDFS 可以布置在同一台机器上,并且 Spark 可以借此避免网络通信开销。

  对输入输出的数据设置 port/path 就可以在 Spark 上使用 HDFS 了,非常简单。

HDFS 的协议在 Hadoop 不同版本之间存在差异,所以如果你运行一个针对不同版本编译的 Spark 版本,会导致失败。默认情况下,Spark 是针对
Hadoop 1.0.4 构建的。如果从源代码构建,则可以设置 SPARK_HADOOP_VERSION =指定为环境变量,以针对不同的版本构建;或者你可以下载一个不同的 Spark 预编译版本。你可以在运行时决定 hadoop 的版本。

Structured Data with Spark SQL(Spark SQL 处理结构化数据)

  Spark SQL 是 Spark1.0 加入的组件,很快变成 Spark 处理结构化和半结构化数据最受欢迎的方式。对于结构化数据,我们意思是固定模式的数据,就是数据记录之间具有一致的字段集。Spark SQL 支持多个结构化数据源作为输入,并且因其能够理解他们的格式,所以可以高效地读取数据源必须的字段。第九章我们会详细介绍 Spark SQL,现在我们会简要介绍少量常用数据源的使用。

  在所有的例子中,我们让 Spark SQL 在数据源上运行一个 SQL 查询(选择一些字段或字段的函数),并且会返回给我们 Row 对象的 RDD,每条记录一个。在 Java 和 Scala 中,允许 Row 对象基于列号进行访问。每个 Row 象都有一个 get() 方法,用来返回我们可以转换的一般类型,并且有对于常见的基本类型特殊的 get() 方法(如, getFloat(), getInt(), getLong(), getString(), getShort(), and getBoolean())。在 Python 中我们直接使用 row[column_number]row.column_name 访问元素。

Apache Hive(Apache 的 Hive)

  Hadoop 中常用的一个结构数据就是 Apache 的 Hive。Hive 可以以各种格式存储表格,在 HDFS 或这其他存储系统中,从纯文本到列向格式的格式都可以。Spark SQL 可以支持任何 Hive 支持的表格。

  要将 Spark SQL 连接现有的 Hive,你需要提供 Hive 的配置。你可以通过复制你的 hive-site.xml 到 Spark 的./conf/directory 中进行连接。一旦你这样做了,你就创建了个 HiveContext 对象,它是 Spark SQL 的入口点,你可以写 Hive 的查询语句(HQL)来查询数据表,会返回一个行 RDD。示例如下:

Example 5-30. Creating a HiveContext and selecting data in Python

from pyspark.sql import HiveContext

hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name, age FROM users")
firstRow = rows.first()
print firstRow.name

Example 5-31. Creating a HiveContext and selecting data in Scala

import org.apache.spark.sql.hive.HiveContext

val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
val rows = hiveCtx.sql("SELECT name, age FROM users")
val firstRow = rows.first()
println(firstRow.getString(0)) // Field 0 is the name

Example 5-32. Creating a HiveContext and selecting data in Java

import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SchemaRDD;

HiveContext hiveCtx = new HiveContext(sc);
SchemaRDD rows = hiveCtx.sql("SELECT name, age FROM users");
Row firstRow = rows.first();
System.out.println(firstRow.getString(0)); // Field 0 is the name

170 页“Apache Hive”会详细介绍 Hive 如何加载数据。

JSON

  如果你数据记录的 JSON 数据结构一致,Spark SQL 可以推断它们的结构并且加载数据为行,使你可以非常容易地获取需要的字段。为了加载 JSON 数据,首先在需要使用 Hive 的时候创建一个 HiveContext 对象。(在这种情况下,不需要安装 Hive,也就是说,您不需要 hivesite.xml 文件。)然后使用 HiveContext.jsonFIle 方法来获得整个文件的 Row 对象 RDD。除了使用整个 Row 对象之外,你也可以将这个 RDD 注册为一个表并从中搜索特定字段。举例来讲,假如我们有一个 JSON 文件格式如下,并且每行一条数据:

Example 5-33. Sample tweets in JSON
{"user": {"name": "Holden", "location": "San Francisco"}, "text": "Nice day out today"}
{"user": {"name": "Matei", "location": "Berkeley"}, "text": "Even nicer here :)"}

  我们可以加载这段数据并且查询 username 和 text 字段,示例如下:

Example 5-34. JSON loading with Spark SQL in Python

tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
results = hiveCtx.sql("SELECT user.name, text FROM tweets")

Example 5-35. JSON loading with Spark SQL in Scala

val tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
val results = hiveCtx.sql("SELECT user.name, text FROM tweets")

Example 5-36. JSON loading with Spark SQL in Java

SchemaRDD tweets = hiveCtx.jsonFile(jsonFile);
tweets.registerTempTable("tweets");
SchemaRDD results = hiveCtx.sql("SELECT user.name, text FROM tweets");

  我们会在 172 页的“JSON”中详细讨论如何使用 Spark SQL 加载 JSON 数据和访问其模式。另外,
Spark SQL 支持的数据不仅仅是加载数据,包括查询数据,以更复杂的方式与 RDD 结合,以及在其上运行自定义函数,我们将在第 9 章中介绍。

Databases (数据库)

  Spark 可以使用他们的 Hadoop 连接器或自定义的 Spark 连接器访问多种流行的数据库。 在本节中,我们将展示四个通用连接器。

Java Database Connectivity(Java 数据库连接)

  Spark 可以从任何支持 Java 数据库连接(JDBC)的关系型数据库加载数据,包括 MySQL, Postgres 和其他系统。为了访问这些数据,我们构建一个 org.apache.spark.rdd.JdbcRDD 并为其提供了我们的 SparkContext 和其他参数。Example5-37 展示了使用 JdbcRDD 连接 MySQL 数据库。

Example 5-37. JdbcRDD in Scala

def createConnection() = {
    Class.forName("com.mysql.jdbc.Driver").newInstance();
    DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
}

def extractValues(r: ResultSet) = {
    (r.getInt(1), r.getString(2))
}

val data = new JdbcRDD(sc,
    createConnection, "SELECT * FROM panda WHERE ? <= id AND id <= ?",
    lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues)
println(data.collect().toList)

  JdbcRDD 接收的几个参数:

  • 首先,我们提供一个函数来建立数据库连接。这使得每个节点在执行任何连接所需的配置后,都可以创建自己的连接来加载数据。
  • 然后我们提供一个读取一定范围数据的查询,以及此查询参数的上限和下限。这些参数允许 Spark 在不同机器上查询不同范围的数据,所以我们不会因在单个节点上加载所有数据而产生性能瓶颈。(如果您不知道有多少记录,则可以先手动进行计数查询并使用其结果来确定 upperBound 和 lowerBound。)
  • 最后一个参数是一个函数,用来将 java.sql.ResultSet 的每行输出转换为对操作数据有用的格式。在 Example5-37 中,我们会拿到(Int,String)键值对。如果省略此参数,Spark 会自动将每行转换为一个对象数组。

  如其他的数据源,当使用 JdbcRDD 时,确保你的数据库可以处理从 Spark 并行读取的负载。如果你想离线查询数据而不是实时的数据库,你可以使用数据库的导出功能导出文本文件。

Cassandra

  随着从 DataStax 引入开源 Spark Cassandra 连接器,Spark 对 Cassandra 的支持已经大大改善。由于连接器目前不是 Spark 的一部分,所以你需要在项目中引入额外的依赖。Cassandra 目前还不能使用 Spark SQL,但是可以返回 CassandraRow 对象,它和 Spark SQL 的 Row 对象具有一些相同的方法,Eample5-38 和 5-39 展示了用法。Spark Cassandra 连接器目前只能在 Java 和 Scala 中使用。

Example 5-38. sbt requirements for Cassandra connector
//sbt配置
"com.datastax.spark" %% "spark-cassandra-connector" % "1.0.0-rc5",
"com.datastax.spark" %% "spark-cassandra-connector-java" % "1.0.0-rc5"

Example 5-39. Maven requirements for Cassandra connector
//maven配置
<dependency> <!-- Cassandra -->
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector</artifactId>
    <version>1.0.0-rc5</version>
</dependency>
<dependency> <!-- Cassandra -->
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector-java</artifactId>
    <version>1.0.0-rc5</version>
</dependency>

  很像 Elasticsearch,Cassandra 连接器读取一个 job 的属性然后确定连接哪个集群。通过设置 spark.cassandra.connection.host 来连接 Cassandra 集群并且如果有用户名和密码的话可以通过 spark.cassandra.auth.usernamespark.cassandra.auth.password 设置。假如你只有一个可供连接的 Cassandra 集群,我们可以在创建 SparkContext 的时候设置。示例如下:

Example 5-40. Setting the Cassandra property in Scala

val conf = new SparkConf(true)
        .set("spark.cassandra.connection.host", "hostname")

val sc = new SparkContext(conf)


Example 5-41. Setting the Cassandra property in Java

SparkConf conf = new SparkConf(true)
    .set("spark.cassandra.connection.host", cassandraHost);
JavaSparkContext sc = new JavaSparkContext(
    sparkMaster, "basicquerycassandra", conf);

  Datastax Cassandra 连接器使用 Scala 的隐式转换在 SparkContext 和 RDD 上提供额外的函数,来引入一些隐式转换并加载一些数据(Example5-42)。

Example 5-42. Loading the entire table as an RDD with key/value data in Scala

// Implicits that add functions to the SparkContext & RDDs.
//导入隐式转换使用额外函数
import com.datastax.spark.connector._

// Read entire table as an RDD. Assumes your table test was created as
// CREATE TABLE test.kv(key text PRIMARY KEY, value int);
//引入整个表作为RDD。假设你创建的测试表结构是(key text PRIMARY KEY, value int)
val data = sc.cassandraTable("test" , "kv")
// Print some basic stats on the value field.
//打印对值字段的基本统计
data.map(row => row.getInt("value")).stats()

  在 Java 中没有隐式转换,所以我们需要显式地将我们的 SparkContext 和 RDD 转换为这个功能。示例如下:

Example 5-43. Loading the entire table as an RDD with key/value data in Java

import com.datastax.spark.connector.CassandraRow;
import static com.datastax.spark.connector.CassandraJavaUtil.javaFunctions;

// Read entire table as an RDD. Assumes your table test was created as
// CREATE TABLE test.kv(key text PRIMARY KEY, value int);
JavaRDD<CassandraRow> data = javaFunctions(sc).cassandraTable("test" , "kv");

// Print some basic stats.
System.out.println(data.mapToDouble(new DoubleFunction<CassandraRow>() {
    public double call(CassandraRow row) { return row.getInt("value"); }
}).stats());

  除了加载整个表,我们还可以查询数据的子集。我们可以通过在 cassandraTable() 调用中添加 where 子句来限制数据,例如,sc.cassandraTable(…).where("key=?", "panda")

  Cassandra 连接器支持保存不同类型的 RDD。我们可以直接保存 CassandraRow 对象 RDD,这在表之间复制数据很有用。通过指定列映射,我们可以将非行形式的 RDD 保存为元组和列表。示例如下:

Example 5-44. Saving to Cassandra in Scala

val rdd = sc.parallelize(List(Seq("moremagic", 1)))
rdd.saveToCassandra("test" , "kv", SomeColumns("key", "value"))

  这一部分只是很简要地介绍了 Cassandra 的连接器。如果想更深入地了解,可以查看连接器的 GitHub。

HBase

  Spakr 可以在实现了 org.apache.hadoop.hbase.mapreduce.TableInputFormat 的类中通过 Hadoop 的输入格式访问 HBase。这种输入格式返回键值对,键是 org.apache.hadoop.hbase.io.ImmutableBytesWritable 类型,值是 org.apache.hadoop.hbase.client.Result 类型。如 API 文档中所述,Result 类包含根据列族获取值的各种方法。

  为了在 Spark 中使用 HBase,你可以对正确的输入格式调用 SparkContext.newAPIHadoopRDD。Scala 示例如下:

Example 5-45. Scala example of reading from HBase

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "tablename") // which table to scan

val rdd = sc.newAPIHadoopRDD(
conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

  为了优化 HBase 的读取,TableInputFormat 包含多个设置,例如将扫描限制为只有一组列,并限制扫描的时间范围。你可以在 TableInputFormat API 文档中找到这些选项,并在将它们传递给 Spark 之前,对 HBaseConfiguration 设置。

Elasticsearch

  Spark 可以使用 Elasticsearch-Hadoop 从 Elasticsearch 读取和写入数据。Elasticsearch 是基于 Lucene 的开源搜索系统。

  Elasticsearch 连接器和我们之前介绍的连接器有些不同,因为它不需要提供路径信息,而是取决于 SparkContext 设置的配置信息。Elasticsearch 输出格式连接器也不需要使用 Spark 的包装,我们使用 saveAsHadoopDataSet,这意味着我们需要手动设置更多参数。让我们看看在 Elasticsearch 中读取写入简单数据的例子。

最新的 Elasticsearch Spark 连接器用起来很简单,支持返回 Spark SQL 行。之所以还介绍了这个连接器,因为行转换还不支持 Elasticsearch 中所有的原生类型。

Example 5-46. Elasticsearch output in Scala

val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("mapred.output.format.class", "org.elasticsearch.hadoop.mr.EsOutputFormat")
jobConf.setOutputCommitter(classOf[FileOutputCommitter])
jobConf.set(ConfigurationOptions.ES_RESOURCE_WRITE, "twitter/tweets")
jobConf.set(ConfigurationOptions.ES_NODES, "localhost")
FileOutputFormat.setOutputPath(jobConf, new Path("-"))
output.saveAsHadoopDataset(jobConf)

Example 5-47. Elasticsearch input in Scala

def mapWritableToInput(in: MapWritable): Map[String, String] = {
    in.map{case (k, v) => (k.toString, v.toString)}.toMap
}
val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args(1))
jobConf.set(ConfigurationOptions.ES_NODES, args(2))
val currentTweets = sc.hadoopRDD(jobConf,
    classOf[EsInputFormat[Object, MapWritable]], classOf[Object],
    classOf[MapWritable])
// Extract only the map

// Convert the MapWritable[Text, Text] to Map[String, String]
val tweets = currentTweets.map{ case (key, value) => mapWritableToInput(value) }

  与其他连接器相比,Elasticsearch 连接器稍微有些费解,但它对于如何使用这种类型的连接器提供了有用的参考。

在写出方面,Elasticsearch 可以进行映射推断,但这可能偶尔会错误推断,所以如果要存储除字符串以外的数据类型,则最好明确设置映射。

Conclusion(总结)

  随着本章的结尾,你现在应该能够将数据存入 Spark 中,并以对你有用的格式存储计算结果。我们分析了可用于数据的多种不同格式,以及压缩选项及其对使用数据方式的影响。随后的章节将研究如何编写更高效,更强大的 Spark 程序,以便我们可以加载和保存大型数据集。

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 556 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...