本章介绍了 Spark 用于数据处理的核心抽象概念,具有弹性的分布式数据集(RDD)。一个 RDD 仅仅是一个分布式的元素集合。在 Spark 中,所有工作都表示为创建新的 RDDs、转换现有的 RDD,或者调用 RDD 上的操作来计算结果。在底层,Spark 自动将数据中包含的数据分发到你的集群中,并将你对它们执行的操作进行并行化。数据科学家和工程师都应该阅读这一章,因为 RDD 是 Spark 的核心概念。我们强烈建议你在这些例子中尝试一些
交互式 shell(参见“Spark 的 Python 和 Scala shell 的介绍”在第 11 页)。
此外,本章中的所有代码都可以在该书的 GitHub 库中找到
RDD Basics(基本 RDD)
在 Spark 中,一个 RDD 仅仅是一个不可变的分布式对象集合.每个 RDD 被切分成多个可以在不同集群节点上进行计算的分区(patition)。RDD 可以包含 Python,Java 和 Scala 任何类型的对象,包括用户自定义的 class 文件.
>>> lines = sc.textFile("README.md")
用户创建 RDD 的两种方法:通过加载外部数据集或者通过在运行的程序中分配一个对象集合。我们在样例 3-1 中已经见识到了这种方法,通过使用 SparkContext.textFile()
加载文本文件生成字符串 RDD。
一旦创建,RDD 提供两种操作类型:transformation(转换)和 action(开工)。Transformation 会根据之前的 RDD 构造一个新的 RDD。比如,一个常见转换是过滤匹配断言函数的数据。在 3-2 展示的文本文件例子中,我们通过 filter 创建了一个新的只保存那些包含 Python 单词的行的 RDD.
>>> pythonLines = lines.filter(lambda line: "Python" in line)
另一方面,Action 是基于 RDD 计算结果的,要么将其返回给驱动程序(main 函数或 shell),要么将其保存到外部存储系统中(例如,HDFS).我们前面提到的一个 action 是 first()
,如示例 3-3 中演示的那样,他返回一个 RDD 中的第一个元素.
>>> pythonLines.first()
u'## Interactive Python Shell'
Transformation 和 action 的区别在于 Spark 计算 RDD 的方式。尽管你可以随时定义新的 RDD,但 Spark 是以一种懒惰的方式计算他们--就是只在 RDD 第一次被 action 使用的时候进行计算。这种方法看起来很不常见,但在大数据处理中意义非凡。举个例子,思考一下 Example3-2 和 3-3,我们定义了一个文本文件并且过滤掉不包含 Python 的每一行。如果在我们写 lines = sc.textFile()时 Spark 就加载了文件的所有行,那么将会浪费大量的存储空间而且我们马上会过滤掉很多行。相反,一旦 Spark 洞览整个转换链,他可以计算需要作为结果的数据。实际上,对于 first()
action,Spark 只扫描文件直到找到第一个匹配的行,而不会读取整个文件。
最终,每当你在 Spark 的 RDD 上运行一个 action,默认会重新计算。如果你想在多个 action 上重复使用一个 RDD,RDD.persist()
方法可以进行保存。我们可以让 Spark 在许多不同的地方持久化我们的数据,这些将在表 3-6 中讨论。在第一次计算之后,Spark 会把 RDD 内容存储在内存中,并且可以在以后的 action 中复用他们。在硬盘上持久化 RDD 也是可以的。默认不持久化的行为看起来可能很不寻常,但是这对大数据意义重大:如果你不重新使用 RDD,并且数据流通过 Spark 一次就计算出了结果,这样看来浪费存储空间没什么必要。
实际上,你会经常使用 persist()
来加载数据的子集到内存中用来反复查询。举个例子,如果我们知道想要计算包含 Python 单词的 README 文本行中的多个结果,我们可以写个 Example3-4 中展示的脚本。
Example3-4.Persisting an RDD in memory
>>> pythonLines.persist
>>> pythonLines.count()
2
>>> pythonLines.first()
u'## Interactive Python Shell'
总的来讲,每个 Spark 项目或者 shell 对话都包含以下步骤:
- 1.从外部数据创建一些输入 RDD
- 2.通过使用 transformation 如
filter()
来转换 RDD 成为新的 RDD - 3.使用
persist()
来持久化任何需要被重用的中间 RDD - 4.启动
action
如count()
和first()
来启动并行计算,然后 Spark 会对这些计算进行优化和执行
在本章剩下的部分,我们会详细讲解这些步骤的细节,并且会包含一些 Spark 中最常用的 RDD 操作。
Creating RDDs(创建 RDD)
Spark 提供了创建 RDD 的两种方式:加载外部数据和在驱动程序中将集合并行化。
最简单的创建 RDD 的方式是在程序中获取一个存在的集合并且将它传入 SparkContext
中的 parallelize()
方法,Example3-5 到 3-7 都展示了这种方法。这种方法在你学习 Spark 时非常有用,因为你可以快速在 shell 上创建你的 RDD 并且在自创的 RDD 上演示一些操作。需要注意的是,在样本和测试之外,这种方式并没有大量使用,因为它需要你将整个数据集保存在一台机器的内存中。
Example 3-5. parallelize() method in Python
lines = sc.parallelize(["pandas", "i like pandas"])
Example 3-6. parallelize() method in Scala
val lines = sc.parallelize(List("pandas", "i like pandas"))
Example 3-7. parallelize() method in Java
JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "i like pandas"));
更常用的创建 RDD 的方法是加载外部数据。加载外部数据在第五章中有详细介绍。我们早就看到了加载文本文件作为字符串 RDD 的方法,SparkContext.textFile(),Example3-8 到 3-10 中展示了:
Example 3-8. textFile() method in Python
lines = sc.textFile("/path/to/README.md")
Example 3-9. textFile() method in Scala
val lines = sc.textFile("/path/to/README.md")
Example 3-10. textFile() method in Java
JavaRDD<String> lines = sc.textFile("/path/to/README.md");
RDD Operations (RDD 操作)
我们之前讨论过,RDD 支持两种操作:transformation(转换)和 action(开工)。Transformation 会返回一个新的 RDD,如 map()
和 filter()
。Action 是返回一个结果给驱动程序或者把结果写入存储并且驱动一个计算的操作,如 count()
和 first()
。Spark 对待 transformation 和 action 很不同,所以理解你使用的操作是哪种类型非常重要。如果你曾对给定的函数是 transformation 或 action 感到迷惑,你可以看他们的返回类型:transformation 返回 RDD,actions 返回其他数据类型。
Transformations (转换)
Transformation(转换)是使用 RDD 时返回一个新 RDD 的操作。如在 29 页“Lazy Evaluation(惰性求值)”中讨论的,转换的 RDD 是延迟计算的,只有在 action 中使用他们的时候才进行真正的计算。许多 transformation 是 element-wize(逐元素的),也就是说,他们一次处理一个元素,但是不是所有的 transformation(转换)都这样。
举个例子,假如我们有一个记录一定数量信息的日志文件,log.txt。我们想挑选出错误的信息。我们可以使用之前看到的 filter()进行转换。下面是展示:
Example 3-11. filter() transformation in Python
inputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lambda x: "error" in x)
Example 3-12. filter() transformation in Scala
val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("error"))
Example 3-13. filter() transformation in Java
JavaRDD<String> inputRDD = sc.textFile("log.txt");
JavaRDD<String> errorsRDD = inputRDD.filter(
new Function<String, Boolean>() {
public Boolean call(String x) { return x.contains("error"); }
}
});
注意,filter()不会改变现有的输入 RDD。相反,他会返回一个指向全新 RDD 的指针。输入 RDD 仍然可以在程序后面的处理中重复使用,比如使用这个 RDD 搜索其他单词。例如,我们使用这个 RDD 搜索 warning 单词,然后使用另一个 transformation(转换),union(),将包含 error 和 warning 的内容结合进行输出。下面有示例:
Example 3-14. union() transformation in Python
errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD = inputRDD.filter(lambda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)
union()方法和 filter 方法有些不同,它操作两个 RDD。Transformation(转换)实际上可以对任意数量的输入 RDD 进行操作。
实际上 Example3-14 更好的完成方式是直接用 filter()一次过滤包含 error 或 warning 的行。
最后,当你使用 transformation(转换)派生出新的 RDD,Spark 会跟踪不同的 RDD 之间的依赖关系集,称为依赖关系图(lineage graph)。使用依赖关系图的信息来计算每个需要的 RDD 并且可以恢复那些丢失的持久化 RDD 信息。下图就是 Example3-14 的依赖关系图
Actions(开工)
我们了解了怎么使用 transformation 来创建 RDD,但是某些情况,我们想通过数据集合做一些事情。Action 是第二种 RDD 操作。Action 会返回一个最终的值给驱动程序或者将数据写入外部存储系统。Action 强制要求对 RDD 所需要的 transformation(转换)进行求值,因为 action 需要实际地生成输出。
回到之前的日志例子,我们可能想要打印一些关于 badLinesRDD 的信息。为此,我们将使用两个 action,count()
用来返回总数,take(num)
用来收集 RDD 中(参数)指定个数的数据(take(2)
收集两个)。下面有示例:
Example 3-15. Python error count using actions
print "Input had " + badLinesRDD.count() + " concerning lines"
print "Here are 10 examples:"
for line in badLinesRDD.take(10):
print line
Example 3-16. Scala error count using actions
println("Input had " + badLinesRDD.count() + " concerning lines")
println("Here are 10 examples:")
badLinesRDD.take(10).foreach(println)
Example 3-17. Java error count using actions
System.out.println("Input had " + badLinesRDD.count() + " concerning lines")
System.out.println("Here are 10 examples:")
for (String line: badLinesRDD.take(10)) {
System.out.println(line);
}
在这个例子中,我们在驱动程序使用 take()
来取得一些 RDD 中的元素。然后本地遍历他们并在驱动程序打印一些信息。RDD 还有 collect()
方法来收集整个 RDD。如果你的程序把 RDD 筛选到很小的范围,并且你希望本地处理它,collect()
就很有用了。记住一点,collect()
方法会使你的整个数据集放在一台机器上,所以不要在大数据集中使用 collect()
。
大多数情况下的 RDD 不能直接在驱动程序中被 collect()
,因为他们太大了。这些情况下,把数据写入分布式存储系统是比较常见的,如 HDFS 和亚马逊的 S3。你可以使用 saveAsTextFile
,saveAsSequenceFile()
,或者各式各样的内置格式的 action 来保存 RDD 的内容。我们会在第五章介绍导出数据的不同选择。
需要注意的是,每次调用一个新的 action,整个 RDD 必须从头开始计算。为了避免这种低效率操作,可以持久化中间结果,44 页的“Persistence(Caching)”会详细介绍。
Lazy Evaluation (惰性求值)
如你之前读到的,RDD 上的 transformation(转换)是惰性求值,意味着 Spark 除非看到一个 action(开工),否则不会开始开工。对于新的使用者来说这可能有点违反直觉,但是对于那些使用过函数式语言(如 Haskell)或 LINQ 之类的数据处理框架的用户会非常熟悉。
惰性求值意味着当我们在 RDD 上调用一个 transformation(转换)(如调用 map()),这个操作不是立即执行。相反,Spark 内部记录了元数据来标明这个操作被请求过。与其把 RDD 当做一个包含特定数据的集合,不如把他看做一个通过一系列 transformation 来计算数据的指令集合。惰性求值把 RDD 打造成了指令而不是集合,action 按照顺序执行这些指令,得到最终的结果。把数据加载到 RDD 中和 transformation 的惰性求值是一样的。所以,你可能明白了,当我们调用 sc.textFile()
,数据没有真正加载直到必要的时候。与 transformation(转换)一样,(在本例中,读取数据的操作)可以多次发生。
尽管 transformation(转换)是惰性的,你可以强制 Spark 随时通过运行 action(动作)去执行他们,如 count()。这是很便捷的测试你程序一部分的方法。
Spark 使用惰性求值来减少通过把操作分组处理数据传递的次数。像 Hadoop 的 MapReduce 系统,开发者经常花费大量时间思考怎么把操作分组来最小化 MapReduce 的次数。在 Spark 中,把数据处理编写成一个复杂的 map(Hadoop 中的计算组件)不如写成多个简单操作并将其串联在一起。因此,用户可以自由地将他们的程序组织成更小,更易于管理的操作。(这段话挺不好理解的,大体解释一下,因为 hadoop 中,你写的 map 函数是类似加工站的东西,数据通过它必须受到它的逻辑处理,即时求值。有时候我们不想写太多 map 类,我们会把一个 map 写的非常复杂,并且这种复杂的 map 可能复用性比较差,所以我们会去思考如何把某些操作归纳到一个 map 提高他的复用性,然后把 map 进行分组,是一件很麻烦的事情,但是 transformation(转换)特性是惰性求值,我们只表示出这个 RDD 会怎么操作数据,但是只有在 action 的时候才会真正处理数据,并且 filter()之类的函数高度抽象,我们不用将 RDD 进行分组,只需要根据逻辑去组合 map()、filter()这些提供好的函数,这样会大大减少了对操作进行分组的编码任务)
Passing Functions to Spark (把函数传递给 Spark)
大多数 Spark 的 transformation(转换)和一些 action(动作)依赖于使用函数来计算数据。每个核心语言都有一种稍微不同的机制来将函数传递给 Spark。
Python
在 Python 中,我们有三种操作来将函数传递给 Spark。对于短一些的函数,我们可以用 lambda 表达式,正如 Example3-2 和 3-18 中演示的那样。或者,我们可以传递顶级函数或局部定义的函数。
//Example 3-18. Passing functions in Python
word = rdd.filter(lambda s: "error" in s)
def containsError(s):
return "error" in s
word = rdd.filter(containsError)
在传递函数时有一个问题需要注意,函数中包含的对象会无意间序列化了。当你传递的函数是一个对象的成员或者包含对象中引用的字段(如 self.field),Spark 会将整个对象传递给工作节点,有可能比你需要的信息大很多。如果你的类(class)包含 Python 不知道如何 pickle(python 用来序列化的包)的对象,这可能导致你的程序运行失败。
示例:
Example 3-19. Passing a function with field references (don’t do this!)
class SearchFunctions(object):
def __init__(self, query):
self.query = query
def isMatch(self, s):
return self.query in s
def getMatchesFunctionReference(self, rdd):
# Problem: references all of "self" in "self.isMatch"
#问题:在self.isMatch方法中引用了整个self对象(就是对象自身)
return rdd.filter(self.isMatch)
def getMatchesMemberReference(self, rdd):
# Problem: references all of "self" in "self.query"
#问题:在self.iquery方法中引用了整个self对象(就是对象自身)
return rdd.filter(lambda x: self.query in x)
相反,直接把你需要的字段从对象中提取到局部变量然后再将函数传递,如 Example3-20。
Example 3-20. Python function passing without field references
class WordFunctions(object):
...
def getMatchesNoReference(self, rdd):
# Safe: extract only the field we need into a local variable
#安全:把我们需要的字段导入了局部变量
query = self.query
return rdd.filter(lambda x: query in x)
Scala
在 scala 中,我们可以将函数定义为内联函数,对方法的引用,或者静态函数,就像我们使用其他 Scala 函数 API 那样。还有一些注意事项,尽管我们需要传递的函数和数据引用是可序列化的(实现 java 的序列化接口)。而且,就像在 Python 中一样,传递一个对象的方法或字段会包含对整个对象的引用,尽管这不是很明显,因为我们不需要用 self(python 中类似 java 的 this 关键字)来编写这些引用。如 Example3-20 中的 Python 示例,我们可以将需要的字段导入局部变量中来避免传递整个对象。示例 Example3-21:
Example 3-21. Scala function passing
class SearchFunctions(val query: String) {
def isMatch(s: String): Boolean = {
s.contains(query)
}
def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {
// Problem: "isMatch" means "this.isMatch", so we pass all of "this"
//问题:isMatch意味着this.isMatch,所以我们传递了整个this
rdd.map(isMatch)
}
def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {
// Problem: "query" means "this.query", so we pass all of "this"
//差不多意思。。不翻了
rdd.map(x => x.split(query))
}
def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
// Safe: extract just the field we need into a local variable
//安全:把需要的字段导入了局部变量
val query_ = this.query
rdd.map(x => x.split(query_))
}
}
如果在 Scala 中产生了 NotSerializableException
,通常的原因就是引用了不可序列化的类中一个方法或者字段。注意传递的局部可序列化变量和函数如果是高层对象的成员就是安全的(top-level object,怎么解释呢- -,抓瞎。就是你直接在 REPL 中定义一个函数而不是类中定义,这个函数就是 top-level 函数,类似的,top-level object 应该就是直接定义的类不是内部类之类的(不一定准确,仅供参考))。
Java
在 java 中,函数是很明确地实现了 Spark 的 org.apach.spark.api.java.function 包的 function 接口。里面有很多不同的基于函数返回类型的接口。我们在表 3-1 中展示了大多数基本函数接口,43 页“Java”中会详细介绍特别的返回数据类型如 key、value 形式其他函数接口。
Table 3-1.标准 Java 函数接口
函数名 | 实现方法 | 用法 |
---|---|---|
Function<T,R> | R call(T) |
输入一个输入返回一个输出,类似 map 和 filter 操作 |
Function<T1,T2,R> | R call(T1,T2) |
输入两个输入返回一个输出,类似 aggregate 或 fold |
FlatMapFunction<T,R> | Iterable<R> call(T) |
输入一个输入返回 0 或多个输出,类似 flatMap |
我们可以像匿名内部类一样定义我们自己的内联函数类(Example3-22)或创建一个实现类(Example3-23)
Example 3-22. Java function passing with anonymous inner class
RDD<String> errors = lines.filter(new Function<String, Boolean>() {
public Boolean call(String x) { return x.contains("error"); }
});
Example 3-23. Java function passing with named class
class ContainsError implements Function<String, Boolean>() {
public Boolean call(String x) { return x.contains("error"); }
}
RDD<String> errors = lines.filter(new ContainsError());
选择哪种方式属于个人偏好,但是我们发现匿名内部类(顶级命名函数)这种方式在大型程序中更整洁。顶级函数的另一个优点就是你可以设置构造器参数,示例:
Example 3-24. Java function class with parameters
class Contains implements Function<String, Boolean>() {
private String query;
public Contains(String query) { this.query = query; }
public Boolean call(String x) { return x.contains(query); }
}
RDD<String> errors = lines.filter(new Contains("error"));
在 Java8 中,你可以使用 lambda 表达式简介地实现函数接口。由于编写本书时,Java8 刚出没多久,所以我们的样例使用老版本的冗长的语法。但是,使用 lambda 表达式的话,我们的搜索例子会变成 Example3-25 那样:
Example 3-25. Java function passing with lambda expression in Java 8
RDD<String> errors = lines.filter(s -> s.contains("error"));
如果你对 Java8 的 lambda 表达式,可以参考 Oracle’s documen‐
tation(Oracle 的文档) 和 the Databricks blog post on how to use lambdas with Spark(Databricks(Spark 的公司)发布的如何在 Spark 中使用 lambda 的博客)。
匿名内部类和 lambda 表达式都可以引用方法中的任何 final 变量,所以你可以像在 Python 和 Scala 中一样将这些变量传递给 Spark。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于