spark 算子详解 ------Transformation 算子介绍

本贴最后更新于 2134 天前,其中的信息可能已经事过境迁

一、Value 数据类型的 Transformation 算子 

1.输入分区与输出分区一对一类型的算子

1.1.map 算子

功能:map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD,任何原RDD中的元素在新RDD中都有且仅有一个元素与之对应。
源码:
>
/**
 * Return a new RDD by applying a function to all elements of this RDD. 
 */
 def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
示例:
>
scala> val a = sc.parallelize(1 to 10,2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
>
scala> val b = a.map(_ * 2)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25
>
scala> a.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>
scala> b.collect
res1: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

1.2.flatMap 算子

功能:将RDD中的每个元素通过函数f转换为新的元素,并将生成的RDD的每个集合中的元素合并为一个集合,生成MapPartitionsRDD。
源码:
>
/**
 * Return a new RDD by first applying a function to all elements of this *  RDD, and then flattening the results. 
 */
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
示例:
>
scala> val a = sc.parallelize(1 to 5)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
>
scala> val b = a.flatMap(x => 1 to x)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at flatMap at <console>:25
>
scala> b.collect
res2: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)

1.3.mapPartitions 算子

功能:mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区。mapPartitions获取么个分区的迭代器,在函数中通过这个分区整体的迭代器对整个分区的元素进行操作。
1.4.mapPartitionsWithIndex 算子
功能:函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引,第二个参数为输入函数,即对每个分区操作的函数。
1.5.glom 算子
功能:将每个分区内的元素组成一个数组,分区数不变。
源码:
>
/**
 * Return an RDD created by coalescing all elements within each partition into an array. 
 */
def glom(): RDD[Array[T]] = withScope {
  new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}
示例:
>
scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
>
scala> a.collect
res2: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
>
scala> val b = a.glom
b: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[3] at glom at <console>:25
>
scala> b.collect
res3: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9))

1.6.randomSplit 算子

功能:根据weight(权重值)将一个RDD划分成多个RDD,权重越高划分得到的元素较多的几率就越大。
1.需要注意的是第一个参数weight数组内数据的加和应为1。
2.第二个参数seed是可选参数 ,作为random的种子,如果每次随机的种子相同,生成的随机数序列总是相同的。
源码:
>
/**
 * Randomly splits this RDD with the provided weights. * * @param weights weights for splits, will be normalized if they don't sum to 1
 * @param seed random seed
 * * @return split RDDs in an array
 */def randomSplit(
  weights: Array[Double],
  seed: Long = Utils.random.nextLong): Array[RDD[T]] = {
  require(weights.forall(_ >= 0),
  s"Weights must be nonnegative, but got ${weights.mkString("[", ",", "]")}")
  require(weights.sum > 0,
  s"Sum of weights must be positive, but got ${weights.mkString("[", ",", "]")}")
>
  withScope {
  val sum = weights.sum
    val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
  normalizedCumWeights.sliding(2).map { x =>
      randomSampleWithRange(x(0), x(1), seed)
  }.toArray
  }
}
示例:
>
scala> val a = sc.parallelize(1 to 9)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at <console>:24
>
scala> val b= a.randomSplit(Array(0.2,0.3,0.5))
b: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[27] at randomSplit at <console>:25, MapPartitionsRDD[28] at randomSplit at <console>:25, MapPartitionsRDD[29] at randomSplit at <console>:25)
>
scala> b.size
res20: Int = 3
>
scala> b(0).collect
res21: Array[Int] = Array(2, 3, 8)
>
scala> b(1).collect
res22: Array[Int] = Array(1, 5, 9)
>
scala> b(2).collect
res23: Array[Int] = Array(4, 6, 7)
>
>下面是测试相同的种子会生成相同的结果
scala> val c= a.randomSplit(Array(0.2,0.8), 2)
c: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[30] at randomSplit at <console>:25, MapPartitionsRDD[31] at randomSplit at <console>:25)
>
scala> c(0).collect
res25: Array[Int] = Array(2, 3, 7)
>
scala> c(1).collect
res26: Array[Int] = Array(1, 4, 5, 6, 8, 9)
>
scala> val d= a.randomSplit(Array(0.2,0.8), 2)
d: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[32] at randomSplit at <console>:25, MapPartitionsRDD[33] at randomSplit at <console>:25)
>
scala> d(0).collect
res27: Array[Int] = Array(2, 3, 7)
>
scala> d(1).collect
res28: Array[Int] = Array(1, 4, 5, 6, 8, 9)
>
scala> val e= a.randomSplit(Array(0.2,0.8), 3)
e: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[34] at randomSplit at <console>:25, MapPartitionsRDD[35] at randomSplit at <console>:25)
>
scala> e(0).collect
res29: Array[Int] = Array(1, 5, 9)
>
scala> e(1).collect
res30: Array[Int] = Array(2, 3, 4, 6, 7, 8)

2.输入分区与输出分区多对一类型的算子

2.1.union 算子

功能:求两个算子的并集,并且不去重,需要保证两个 RDD 元素的数据类型相同。
源码:
>
/**
 * Return the union of this RDD and another one. Any identical elements will appear multiple 
 * times (use `.distinct()` to eliminate them).
 */
def union(other: RDD[T]): RDD[T] = withScope {
  sc.union(this, other)
}
示例:
>
scala> val a = sc.parallelize(1 to 5)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at parallelize at <console>:24
>
scala> val b = sc.parallelize(3 to 8)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[37] at parallelize at <console>:24
>
scala> val c = a.union(b)
c: org.apache.spark.rdd.RDD[Int] = UnionRDD[38] at union at <console>:27
>
scala> c.collect
res31: Array[Int] = Array(1, 2, 3, 4, 5, 3, 4, 5, 6, 7, 8)

2.2.cartesian 算子

功能:对 两 个 RDD 内 的 所 有 元 素进 行 笛 卡 尔 积 操 作。 操 作 后, 内 部 实 现 返 回CartesianRDD。
3.输入分区与输出分区多对多类型的算子

3.1.groupBy 算子

功能:将元素通过函数生成相应的 Key,数据就转化为 Key-Value 格式,之后将 Key 相同的元素分为一组。
源码:
>
/**
 * Return an RDD of grouped items. Each group consists of a key and a sequence of elements 
 * mapping to that key. The ordering of elements within each group is not guaranteed, and 
 * may even differ each time the resulting RDD is evaluated. 
 * 
 * @note This operation may be very expensive. If you are grouping in order to perform an
 * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
  * or `PairRDDFunctions.reduceByKey` will provide much better performance.
 */
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
  groupBy[K](f, defaultPartitioner(this))
}
示例:
>
scala> val rdd1 = sc.parallelize(1 to 9, 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[39] at parallelize at <console>:24
>
scala> val rdd2 = rdd1.groupBy(x => { if (x % 2 == 0) "even" else "odd" })
rdd2: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[41] at groupBy at <console>:25
>
scala> rdd2.collect
res17: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 6, 8)), (odd,CompactBuffer(1, 3, 5, 7, 9)))

3.2.coalesce 算子

功能:该函数用于将RDD进行重分区,默认不进行shuffle。
1.如果分区数减少,默认不进行shuffle,此时父RDD和子RDD之间是窄依赖。比如:1000个分区被重新设置成10个分区,这样不会发生shuffle。
2.如果分区数量增大时,比如Rdd的原分区数是100,想设置成1000,此时,需要把shuffle设置成true才行,因为如果设置成false,
不会进行shuffle操作,此时父RDD和子RDD之间是窄依赖,这时并不会增加RDD的分区。
  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 560 关注
  • rdd
    5 引用 • 2 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...