Learning Spark 中文版 -- 第三章 --RDD 编程(2)

本贴最后更新于 2311 天前,其中的信息可能已经物是人非

Common Transformations and Actions

  本章中,我们浏览了 Spark 中大多数常见的 transformation(转换)和 action(开工)。在包含特定数据类型的 RDD 上可以进行额外的操作,例如,可以对纯数字 RDD 使用统计函数,对键值对的 RDD 进行聚合操作。后面的章节我们会介绍这些特别的操作和 RDD 类型间的转换。

Basic RDD (基础 RDD)

  首先,在忽略数据的影响的前提下,我们将描述所有的 RDD 上可以执行的 transformation 和 action。

element-wise transformation(逐元素的转换)

  你可能会使用的两种最常见的转换是 map()filter()(见图 3-2)。map()转换接受一个函数参数,RDD 中的每个元素都会经过 map 中的函数处理,生成新的元素组成的 RDD。filter() 转换接受一个函数参数,并且返回一个能通过 filter() 函数的元素组成的 RDD。

image


图 3-2,输入 RDD 到 mapRDD 和 filterRDD

  map() 函数可以做太多太多事情了,如:抓取网站关联的每个 URL 放入集合,对数字进行平方。非常有用的一点是 map() 的返回值不必须和输入类型一样,这样如果你有个 String 类型的 RDD 并且 map() 函数会把字符串转换成 Double 类型返回,我们的输入 RDD 会是 RDD[String],结果 RDD 会是 RDD[Double]。

  看一个基础的例子,通过 map()对数字平方。(Example3-26 到 3-28):

Example 3-26. Python squaring the values in an RDD

nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
    print "%i " % (num)
    
Example 3-27. Scala squaring the values in an RDD

val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)
println(result.collect().mkString(","))

Example 3-28. Java squaring the values in an RDD

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() {
    public Integer call(Integer x) { return x*x; }
});
System.out.println(StringUtils.join(result.collect(), ","));

  有时我们想为每个输入元素产生多个输出元素。这种操作叫做 flatMap()。和
map() 一样,RDD 中的每个元素都会被 flatMap() 中的函数调用。这个操作不是返回一个元素,而是返回一个迭代器,其中包含要返回的值。我们没有生成迭代器的 RDD,而是返回一个由所有迭代器中元素组成的 RDD。flatMap() 一个简单的用处就是把输入的字符串切割成单词,示例:

Example 3-29. flatMap() in Python, splitting lines into words

lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first() # returns "hello"

Example 3-30. flatMap() in Scala, splitting lines into multiple words

val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
words.first() // returns "hello

Example 3-31. flatMap() in Java, splitting lines into multiple words

JavaRDD<String> lines = sc.parallelize(Arrays.asList("hello world", "hi"));
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    public Iterable<String> call(String line) {
    return Arrays.asList(line.split(" "));
    }
});
words.first(); // returns "hello"


  在图 3-3 中有 map()flatMap() 差异的图解。你可以把 flatMap() 看做返回压平了的迭代器(如 Example 中展示,flatMap 中的函数把传入的字符串切割成了单词数组,但是返回的是一个个单词组成的 RDD。传入 flatMap() 的函数虽然看起来是把数据转换成列表,但列表被"压平"了,所以返回的 RDD 是列表元素组成的 RDD),所以当操作结束返回的不是一个列表 RDD,而是列表中每个元素组成的 RDD。

image

Pseudo set operations(伪集操作)

  RDD 本身不是很标准的集合,但 RDD 支持很多数学集合操作,如交集(intersection)和联合(union)。图 3-4 有展示了四个操作。很重要的一点是,所有的操作都需要使用相同类型的 RDD。

image


  我们说 RDD 不是标准的集合是因为不能满足集合的元素唯一性,因为 RDD 中经常有元素的重复。如果想要去重,我们可以使用 RDD.distinc() 这个 transformation(转换)来产生一个新的没有重复元素的 RDD。注意 distinct()的代价非常高昂,因为它会在网络上整理(shuffling)所有的数据来确保我们只收到每个元素的唯一副本。我们会在第四章来详细讨论洗牌(shuffling)和如何避免洗牌。

  最简单的集合操作是 union(other),会返回由两个源数据组成的 RDD。这在许多用例中都很有用,例如从多个来源处理日志文件。不像数学中的并集操作,如果在输入 RDD 中有重复的元素,Spark 的 union 操作会包含重复的元素(当然我们可以使用 distinct 来修正)。

  Spark 也提供了 intersection(other) 方法,会返回两个 RDD 中共有的元素。intersection() 运行时会删除所有重复的元素(包括单个 RDD 中的重复元素)。尽管 intersection()和 union()是很相似的概念,但是 intersection()性能差很多,因为他需要整理整个网络的数据去确定共有元素。

  有时候经过考虑我们需要删除一些数据。subtract(other) 函数可以从一个 RDD 中删除另一个 RDD 中包含的元素,即第一个 RDD 减去第二个 RDD 的差。与 intersection() 一样,subtract 会进行耗时耗力的数据洗牌(shuffle)。

  我们还可以计算两个 RDD 的笛卡尔积,如图 3-5。a 是一个源 RDD 中的元素,b 是另一个源 RDD 中的元素,两个 RDD 做笛卡尔积 transformation(转换)会返回所有可能的(a,b)元素对儿。当我们想要思考所有可能的元素对儿的相似性的时候笛卡尔积就很有用,比如计算每个用户对所提供价格的期望兴趣。我们还可以对 RDD 自身做笛卡尔积,来实现很有用的需求如用户相似度。需要注意的是笛卡尔积在较大数据的处理上代价非常高。

解释一下笛卡尔积,笛卡尔积就是两个集合的乘积。如果两个集合 c(数学,语文),s(Alice,Bob,Carl),一个课程集合,一个学生集合,两个集合的笛卡尔积就是((数学,Alice),(数学,Bob),(数学,Carl),(语文,Alice),(语文,Bob),(语文,Carl)),这就是学生集合所有选课的可能性。

image


表 3-2 和 3-3 总结了上述的和常用的 RDD 转换。

image


image

贴完图发现时英文的,我再手打一遍吧。

表 3-2 包含{1,2,3,3}的 RDD 的基本转换

函数名 目的 示例 结果
map() 对 RDD 中的每个元素应用一个函数,返回一个 RDD rdd.map(x => x+1) {2,3,4,4}
flatMap() 对 RDD 中的每个元素应用一个函数返回迭代器中的内容。经常用来提取单词 rdd.flatMap(x => x.to(3) {1,2,3,2,3,3,3}
filter() 返回由每个通过 filter()条件的元素组成的 RDD rdd.filter(x => x!=1) {2,3,3}
distinct() 删除重复元素 rdd.distinct() {1,2,3}
sample(withReplacement,fraction,[seed]) 对 RDD 进行替换或不替换采样 rdd.sample,0.5) Nondeterministic

Sample 是对 rdd 中的数据集进行采样,并生成一个新的 RDD,这个新的 RDD 只有原来 RDD 的部分数据,这个保留的数据集大小由 fraction 来进行控制。

参数说明

withReplacement,这个值如果是 true 时,采用 PoissonSampler 取样器(Poisson 分布),
否则使用 BernoulliSampler 的取样器。

Fraction,一个大于 0,小于或等于 1 的小数值,用于控制要读取的数据所占整个数据集的概率。

Seed,这个值如果没有传入,默认值是一个 0~Long.maxvalue 之间的整数。

表 3-2 包含{1,2,3}和{3,4,5}两个 RDD 的转换

函数名 目的 示例 结果
union() 产生由两个 RDD 元素组成的 RDD rdd.union(other) {1,2,3,3,4,5}
intersection() 产生两个 RDD 共有元素的 RDD rdd.intersection(other) {3}
subtract() 删除一个 RDD 的内容(如,删除训练数据) rdd.subtract(other) {1,2}
cartesian() 求与另一个 RDD 的笛卡尔积 rdd.cartesian(othre) {(1,3)(1,4)...(3,5)}

Actions

  在基本的 RDD 上最常见的 action 是 reduce(),它会使用一个函数操作同一 RDD 上两个的元素,然后返回一个新的和 RDD 类型相同元素。一个简单的例子就是相加函数,可以用来计算 RDD 的总和。使用 reduce()函数,我们可以非常简单的求得 RDD 上元素之和,计算元素数量,或者执行其他聚合操作。示例:

Example 3-32. reduce() in Python
sum = rdd.reduce(lambda x, y: x + y)

Example 3-33. reduce() in Scala
val sum = rdd.reduce((x, y) => x + y)

Example 3-34. reduce() in Java
Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
    public Integer call(Integer x, Integer y) { return x + y; }
});

  和 reduce() 很相似的是 fold(),它和 reduce() 输入函数的签名是一样的,但是除此之外,对于每个分区的初始调用它还需要一个“初始值(zero value)”。你提供的初始值应该是你操作的标识元素,也就是说,多次应用同一函数初始值不应该改变(例如,加法初始值是 0,乘法初始值是 1,或者元素串连成列表的初始值是个空列表)。

你可以通过修改并返回两个参数中的第一个参数来最小化 fold()中的对象创建,但是,不应该修改第二个参数。

  fold()reduce() 都需要返回结果的类型和我们操作的 RDD 中元素的类型一样。这种条件对于 sum 这种操作就很有效,但是有时候我们需要返回一种不同的类型。例如,当我们计算运行平均值时,我们需要保持记录总值和总数量,最终返回一对儿结果。我们可以使用 map() 函数将每个元素转换成(元素,1),就变成了我们想要的返回类型,这样 reduce() 函数可以对成对数据进行计算,顺利输出成对数据。

  aggregate() 函数把我们从必须返回和操作 RDD 类型一样元素的约束中解脱出来。使用 aggregate(),类似 fold,我们提供一个想要返回的初始值类型。然后我们提供一个元素将 RDD 中的元素和累加器结合起来。最后,我们需要提供第二个函数将两个累加器合并,因为每个节点都在本地累加自己的结果。

  我们可以使用 aggregate 来计算一个 RDD 的平均值,避免了在 fold() 函数之前使用 map() 函数进行处理。示例:

Example 3-35. aggregate() in Python

sumCount = nums.aggregate((0, 0),
                (lambda acc, value: (acc[0] + value, acc[1] + 1),
                (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))))
return sumCount[0] / float(sumCount[1])

Example 3-36. aggregate() in Scala

val result = input.aggregate((0, 0))(
                (acc, value) => (acc._1 + value, acc._2 + 1),
                (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble

Example 3-37. aggregate() in Java

class AvgCount implements Serializable {
    public AvgCount(int total, int num) {
        this.total = total;
        this.num = num;
    }
    public int total;
    public int num;
    public double avg() {
        return total / (double) num;
    }
}
Function2<AvgCount, Integer, AvgCount> addAndCount =
    new Function2<AvgCount, Integer, AvgCount>() {
        public AvgCount call(AvgCount a, Integer x) {
            a.total += x;
            a.num += 1;
            return a;
}
};
Function2<AvgCount, AvgCount, AvgCount> combine =
    new Function2<AvgCount, AvgCount, AvgCount>() {
    public AvgCount call(AvgCount a, AvgCount b) {
        a.total += b.total;
        a.num += b.num;
        return a;
    }
};
AvgCount initial = new AvgCount(0, 0);
AvgCount result = rdd.aggregate(initial, addAndCount, combine);
System.out.println(result.avg());

  在 RDD 上的一些 action 将一些或所有的数据以常规集合或值的形式返回给我们的驱动程序。

  最简单和常用的将数据返回给驱动程序的操作是 collect(),这将返回整个 RDD 的内容。collect() 常用在单元测试中,整个 RDD 的内容装载进内存也是在预期之中,这使得我们很容易将 RDD 的值和预期结果进行比较。collect() 受限于需要将所有的数据塞进内存,因为他需要把所有的数据拷贝到驱动程序。

  take(n) 返回 n 个 RDD 上的元素并且尽可能地最小化访问分区的数量,所以他返回的集合可能有一些偏颇。比较重要的需要注意的一点是这些操作返回的元素可能和你预期的顺序不一样。

  这些操作对于单元测试和快速 debug 来说很有用,但是当数据量过大是,可能会导致运行瓶颈。

  如果数据有规定的顺序,我们可以使用 top() 函数提取 RDD 最上面的元素。top() 会使用默认排序来提取数据,但我们也可以提供比较函数来提取顶端元素。

  有时我们需要对驱动程序中的数据进行采样。takeSample(withReplacement,num,seed) 函数允许我们对数据使用替换或非替换采样。

  有时对 RDD 中的所有元素进行 action 却不返回结果给驱动程序很有用。一个很好的例子就是把 JSON 数据发送给 web 服务器,或者将记录插入到数据库中。另一种情况,foreach() 操作允许我们对 RDD 中的每个元素执行计算,而不需要将其带回到本地。

  除了我们讲的基本 RDD 操作之外,更深入的操作的函数名可读性非常好,通过他们的名字你大体就能理解他们所表现的操作方式。count() 返回元素的总数,countByValue() 返回一个唯一值的计数组成的 map。表 3-4 总结了这些 action。

Table3-4 包含{1,2,3,3}RDD 上的基本 action

函数名 目的 示例 结果
collect() 返回 RDD 的所有元素 rdd.collect() {1,2,3,3}
count() 返回 RDD 元素数量 rdd.count 4
countByValue() 每个元素在 RDD 中出现的次数 rdd.countByValue() {(1,1),(2,1),(3,2)}
take(num) 返回 RDD 中 num 个元素 rdd.take(2) {1,2}
top(num) 返回 RDD 顶端的 num 个元素 rdd.top(2) {3,3}
takeOrdered(num)(ordering) 返回基于提供的排序的 num 个元素 rdd.takeOrdered(2)(myOrdering) {3,3}
takeSample(withReplacement,num,[seed]) 返回 num 个随机元素 rdd.takeSample(false,1) 不确定
reduce(func) 并行地把 RDD 中的元素结合在一起 rdd.reduce((x,y)=>x+y) 9
fold(zero)(func) 和 reduce 一样但是有初始值 rdd.fold(0)((x,y)=>x+y) 9
aggregate(zeroValue)(seqOp,combOp) 类似 reduce 但是用来返回一个不同类型 rdd.agregate((0,0))
((x,y)=>
(x._1+y,x._2+1),
(x,y) =>
(x._1+y._1,x._2+y._2)
(9,4)
foreach(func) RDD 中每个元素都会应用在 func 函数上 rdd.foreach(func)

Converting Betwenn RDD Types(RDD 之间的转换)

  有些函数只能在已知 RDD 类型的时候使用,如 mean()variance() 能用在数字类型的 RDD,join() 用在键值对的 RDD 上。我们在第六章会介绍数字数据,第四章介绍键值对 RDD。在 Sacla 和 Java 中,这些方法没有在标准 RDD 类中定义,所以为了使用这些额外功能我们必须确保得到了正确的专门化类。

Scala

  在 Scala 中,用特殊的函数转换 RDD(如,在 Double 类型的 RDD 上声明数字(numberic)函数),特殊的函数是指使用隐式转换自动处理。之前在 17 页提到过的“初始化 SparkContext”为了这些转换工作,我们需要添加 import org.apache.spark.SparkContext._。你可以在 SparkContext 对象文档中查看隐式转换的清单(scala 的隐式转换很好用噢)。这些隐式转换把 RDD 转换成不同的包装类,比如 DoubleRDDFunctions(包装数字数据的 RDD)和 PairRDDFunctions(包装键值对),用来提供附加功能如 mean()variance()

  隐式转换功能非常强大,但有时会让人困惑。如果你可以在 RDD 上调用一个 mean() 函数,你可能查看了 RDD 类的 Scala 文档并且注意到没有 mean() 函数。但是调用却因为 RDD[Double]隐式转换成 DoubleRDDFunctions 成功了。当在你的 RDD 的 Scala 文档中查找函数式,确定一下是否在包装类中能够使用这些函数。

Java

  在 JAVA 中转换两个特别类型的 RDD 就比较明显。特别地,对于这些类型的 RDD,有专门的类成为 JavaDoubleRDD 和 JavaPairRDD,这些类型的数据提供了额外的方法。这中方法的一个有点就是你可以很清楚的理解到底发生了什么,但是这种方式可能有点麻烦。

  要构建这些特殊类型的 RDD,而不是总是使用函数类,我们将需要使用专门的版本。如果我们想从 T 泛型的 RDD 中创建一个 DoubleRDD,我们使用 DoubleFunction<T> 而不是 Function<T,Double>。表 3-5 展示了这些特殊化函数和他们的用法。

  我们还需要在 RDD 上调用不同的函数(这样我们不能创建一个 Double 函数并传递给 map() 函数)。当我们需要一个 DoubleRDD 时,不是调用 map() 进行转换,而我们需要调用 mapToDouble(),所有其他的函数都遵循相同的模式。

表 3-5 特定类型函数的 Java 接口

函数名 等价函数 用法
DoubleFlatMapFunction<T> Function<T,Iterable<Double>> 从 flatMapToDouble 中得到 DoubleRDD
DoubleFunction<T> Function<T,double> 从 mapToDouble 中得到 DoubleRDD
PairFlatMapFunction<T,K,V> Function<T,Iterable<Tuple2<K,V>>> 从 flatMapToPair 中得到 PairRDD<K,V>
PairFunction<T,K,V> Function<T,Tuple2<K,V>> 从 mapToPair 中得到 PairRDD<K,V>

  我们可以修改一下 Example3-28,对 RDD 中的元素平方,产生一个 JavaDoubleRDD,如 Example3-38。这能够使我们访问额外的 DoubleRDD 特定的函数功能如 mean()variance()
Example 3-38. Creating DoubleRDD in Java
JavaDoubleRDD result = rdd.mapToDouble(
    new DoubleFunction<Integer>() {
        public double call(Integer x) {
            return (double) x * x;
        }
});
System.out.println(result.mean());

Python

  Python 的 API 和 Java、Scala 的结构不同。在 Python 中所有的函数都是在基本 RDD 类上实现的,但是如果 RDD 的类型不正确,运行时会失败。

Persistence(Caching)

  向我们之前讨论的,Spark 的 RDD 是惰性求值的,并且我们有时会希望多次使用同一个 RDD。如果我们天真地这样做,每当我们调用一个 action,SPark 将重新计算 RDD 和他所有的依赖项。对于迭代算法,这可能导致非常高昂的代价,因为它要多次查看数据。Example3-39 展示了另一个小例子,统计并写出相同的 RDD。

Example 3-39. Double execution in Scala
val result = input.map(x => x*x)
println(result.count())
println(result.collect().mkString(","))

  为了防止对一个 RDD 计算多次,我们可以让 Spark 持久化数据。当我们让 Spark 持久化一个 RDD 时,计算 RDD 的节点将存储他们的分区。如果一个将数据持久化了的节点失败了,Spark 会在需要时重新计算剩下数据的分区。如果我们希望处理节点失败时避免机器卡顿,可以将数据复制在多个节点上。

  Spark 针对我们的目的提供了多种可供选择的持久化级别,如表 3-6。在 Scala 和 Java 中,默认的 persist() 会如不可序列化的对象一样把数据保存在 JVM 的堆中。在 Python 中,我们总是通过序列化数据来持久化存储,所以默认的是 pickle(python 中的序列化)对象而不是保存在 JVM 堆中。当我们将数据写入磁盘或非堆存储时,这些数据总是要被序列化。

表 3-6.org.apche.spark.storage.StorageLevel 和 pyspark.StorageLevel 中的持久化级别。如果需要我们可以通过在存储级别尾部添加_2 来在两台机器上复制数据。

image

堆外缓存是实验性的并且使用的是 Tachyon。如果你对 Spark 的堆外缓存感兴趣,可以查看一下 Running Spark on Tachyon guide。

Example 3-40. persist() in Scala
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))

  需要注意的是我们在第一次 action 之前调用了 RDD 的 persist()persist() 调用本身不强制求值。

  如果你试图在内存中缓存太多数据,Spark 会使用最近最少使用的缓存策略来自动清除老分区。对于 MEMORY_ONLY 存储级别,它会在它们下一次访问是重新计算这些分区,对于 MEMORY_AND_DISK 级别,会把它们写入磁盘。这意味着你不用担心如果你缓存太多数据而导致 job 的崩溃。但是,这会有大量重复计算的时间。

  最后,RDD 还有一个 unpersist() 方法,可以让你手动将 RDD 从缓存中删除。

Conclusion

  这一章中,我们介绍了 RDD 的操作模型和大量的常用操作。如果你看到这,恭喜你已经学习了所有 SPark 中的核心概念。下一章中,我们将介绍关于键值对 RDD 上可使用的特殊操作集合,这些操作在并行条件下聚合和分组数据非常常用。在此之后,我们将讨论各种数据源的输入和输出,以及使用 SparkContext 的高级话题。

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 552 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...