spark 算子详解 ------Action 算子介绍

本贴最后更新于 2223 天前,其中的信息可能已经东海扬尘

一、无输出的算子

1.foreach 算子

功能:对 RDD 中的每个元素都应用 f 函数操作,无返回值。
源码: > /** * Applies a function f to all elements of this RDD. */ def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }
示例: > scala> val rdd1 = sc.parallelize(1 to 9) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24 > scala> rdd1.foreach(x => printf("%d ", x)) 1 2 3 4 5 6 7 8 9

2.foreachPartition 算子

功能:该函数和foreach类似,不同的是,foreach是直接在每个partition中直接对iterator执行foreach操作,传入的function只是在foreach内部使用, 而foreachPartition是在每个partition中把iterator给传入的function,让function自己对iterator进行处理(可以避免内存溢出)。 > 简单来说,foreach的iterator是针对的rdd中的元素,而foreachPartition的iterator是针对的分区本身。
二、输出到 HDFS 等文件系统的算子 1.saveAsTextFile 算子 功能:该函数将数据输出,以文本文件的形式写入本地文件系统或者HDFS等。Spark将对每个元素调用toString方法,将数据元素转换为文本文件中的一行记录。若将文件保存到本地文件系统,那么只会保存在executor所在机器的本地目录。 ​ 2.saveAsObjectFile 算子 功能:该函数用于将RDD以ObjectFile形式写入本地文件系统或者HDFS等。 ​ 源码: > /** * Save this RDD as a SequenceFile of serialized objects. */ def saveAsObjectFile(path: String): Unit = withScope { this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) .saveAsSequenceFile(path) } ​ 示例: > scala> val rdd1 = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("d", 5), ("a", 4)), 2) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[40] at parallelize at <console>:24 > scala> rdd1.saveAsObjectFile("file:///opt/app/test/saveAsObejctFileTest.txt") ​ 3.saveAsHadoopFile 算子 功能:该函数将RDD存储在HDFS上的文件中,可以指定outputKeyClass、outputValueClass以及压缩格式,每个分区输出一个文件。 ​ 4.saveAsSequenceFile 算子 功能:该函数用于将RDD以Hadoop SequenceFile的形式写入本地文件系统或者HDFS等。 ​ 5.saveAsHadoopDataset 算子 功能:该函数使用旧的Hadoop API将RDD输出到任何Hadoop支持的存储系统,例如Hbase,为该存储系统使用Hadoop JobConf 对象。 ​ 源码: > /** * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for * that storage system. The JobConf should set an OutputFormat and any output paths required * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop * MapReduce job. */ def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope { val config = new HadoopMapRedWriteConfigUtil[K, V](new SerializableJobConf(conf)) SparkHadoopWriter.write( rdd = self, config = config) } ​ 示例: > val rdd1 = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("d", 5), ("a", 4)), 2) var jobConf = new JobConf() jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]]) jobConf.setOutputKeyClass(classOf[Text]) jobConf.setOutputValueClass(classOf[IntWritable]) jobConf.set("mapred.output.dir","/test/") rdd1.saveAsHadoopDataset(jobConf) ​ 6.saveAsNewAPIHadoopFile 算子 功能:该函数用于将RDD数据保存到HDFS上,使用新版本Hadoop API。用法基本同saveAsHadoopFile。 ​ 7.saveAsNewAPIHadoopDataset 算子 功能:使用新的Hadoop API将RDD输出到任何Hadoop支持的存储系统,例如Hbase,为该存储系统使用Hadoop Configuration对象。Conf设置一个OutputFormat和任何需要的输出路径(如要写入的表名),就像为Hadoop MapReduce作业配置的那样。 ​ 源码: > /** * Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop * Configuration object for that storage system. The Conf should set an OutputFormat and any * output paths required (e.g. a table name to write to) in the same way as it would be * configured for a Hadoop MapReduce job. * * @note We should make sure our tasks are idempotent when speculation is enabled, i.e. do * not use output committer that writes data directly. * There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad * result of using direct output committer with speculation enabled. */ def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope { val config = new HadoopMapReduceWriteConfigUtil[K, V](new SerializableConfiguration(conf)) SparkHadoopWriter.write( rdd = self, config = config) } ​ 示例: > val rdd1 = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("d", 5), ("a", 4)), 2) var jobConf = new JobConf() jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]]) jobConf.setOutputKeyClass(classOf[Text]) jobConf.setOutputValueClass(classOf[IntWritable]) jobConf.set("mapred.output.dir","/test/") rdd1.saveAsNewAPIHadoopDataset(jobConf) ​ 三、输出 scala 集合和数据类型的算子 1.first 算子 功能:返回RDD中的第一个元素,不排序。 ​ 源码: > /** * Return the first element in this RDD. */ def first(): T = withScope { take(1) match { case Array(t) => t case _ => throw new UnsupportedOperationException("empty collection") } } ​ 示例: > scala> val rdd1 = sc.parallelize(1 to 9) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 > scala> val rdd2 = rdd1.first() rdd2: Int = 1 > scala> print(rdd2) 1 ​ 2.count 算子 功能:返回RDD中的元素数量。 ​ 源码: > /** * Return the number of elements in the RDD. */ def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum ​ 示例: > scala> val rdd1 = sc.parallelize(1 to 9) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24 > scala> println(rdd1.count()) 9 ​ 3.reduce 算子 功能:将RDD中元素两两传递给输入函数,同时产生一个新值,新值与RDD中下一个元素再被传递给输入函数,直到最后只有一个值为止。 ​ 源码: > /** * Reduces the elements of this RDD using the specified commutative and * associative binary operator. */ def reduce(f: (T, T) => T): T = withScope { val cleanF = sc.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { Some(iter.reduceLeft(cleanF)) } else { None } } var jobResult: Option[T] = None val mergeResult = (index: Int, taskResult: Option[T]) => { if (taskResult.isDefined) { jobResult = jobResult match { case Some(value) => Some(f(value, taskResult.get)) case None => taskResult } } } sc.runJob(this, reducePartition, mergeResult) // Get the final result out of our Option, or throw an exception if the RDD was empty jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) } ​ 示例: > scala> val rdd1 = sc.parallelize(1 to 9) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24 > scala> val rdd2 = rdd1.reduce((x,y) => x + y) rdd2: Int = 45 ​ 4.collect 算子 功能:将一个RDD以一个Array数组形式返回其中的所有元素。 ​ 源码: > /** * Return an array that contains all of the elements in this RDD. * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. */ def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) } ​ 示例: > scala> val rdd1 = sc.parallelize(1 to 9) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24 > scala> rdd1.collect res3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9) ​ 5.take 算子 功能:返回一个包含数据集前n个元素的数组(从0下标到n-1下标的元素),不排序。 ​ 6.top 算子 功能:从按降序排列的RDD中获取前N个元素,或者有可选的key函数决定顺序,返回一个数组。 ​ 源码: > /** * Returns the top k (largest) elements from this RDD as defined by the specified * implicit Ordering[T] and maintains the ordering. This does the opposite of * [[takeOrdered]]. For example: * {{{ * sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1) * // returns Array(12) * * sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2) * // returns Array(6, 5) * }}} * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. * * @param num k, the number of top elements to return * @param ord the implicit ordering for T * @return an array of top elements */def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { takeOrdered(num)(ord.reverse) } ​ 示例: > scala> val rdd1 = sc.parallelize(1 to 9) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24 > scala> val rdd2 = rdd1.top(3) rdd2: Array[Int] = Array(9, 8, 7) ​ 7.takeOrdered 算子 功能:返回RDD中前n个元素,并按默认顺序排序(升序)或者按自定义比较器顺序排序。 ​ 源码: > /** * Returns the first k (smallest) elements from this RDD as defined by the specified * implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]]. * For example: * {{{ * sc.parallelize(Seq(10, 4, 2, 12, 3)).takeOrdered(1) * // returns Array(2) * * sc.parallelize(Seq(2, 3, 4, 5, 6)).takeOrdered(2) * // returns Array(2, 3) * }}} * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. * * @param num k, the number of elements to return * @param ord the implicit ordering for T * @return an array of top elements */def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { if (num == 0) { Array.empty } else { val mapRDDs = mapPartitions { items => // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) queue ++= collectionUtils.takeOrdered(items, num)(ord) Iterator.single(queue) } if (mapRDDs.partitions.length == 0) { Array.empty } else { mapRDDs.reduce { (queue1, queue2) => queue1 ++= queue2 queue1 }.toArray.sorted(ord) } }} ​ 示例: > scala> val rdd1 = sc.makeRDD(Seq(5,4,2,1,3,6)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at makeRDD at <console>:24 > scala> val rdd2 = rdd1.takeOrdered(3) rdd2: Array[Int] = Array(1, 2, 3) ​ 8.aggregate 算子 功能:aggregate函数将每个分区里面的元素进行聚合(seqOp),然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。 ​ 9.fold 算子 功能:通过op函数聚合各分区中的元素及合并各分区的元素,op函数需要两个参数,在开始时第一个传入的参数为zeroValue,T为RDD数据集的数据类型,,其作用相当于SeqOp和comOp函数都相同的aggregate函数。 ​ 10.lookup 算子 功能:该函数对(Key,Value)型的RDD操作,返回指定Key对应的元素形成的Seq。 这个函数处理优化的部分在于,如果这个RDD包含分区器,则只会对应处理K所在的分区,然后返回由(K,V)形成的Seq。 如果RDD不包含分区器,则需要对全RDD元素进行暴力扫描处理,搜索指定K对应的元素 ​ 11.countByKey 算子 功能:用于统计RDD[K,V]中每个K的数量,返回具有每个key的计数的(k,int)pairs的Map。 ​ 源码: > /** * Count the number of elements for each key, collecting the results to a local Map. * * @note This method should only be used if the resulting map is expected to be small, as * the whole thing is loaded into the driver's memory. * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which * returns an RDD[T, Long] instead of a map. */ def countByKey(): Map[K, Long] = self.withScope { self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap } ​ 示例: > scala> val rdd1 = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("d", 4), ("a", 5)), 2) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[17] at parallelize at <console>:24 > scala> val rdd2 = rdd1.countByKey() rdd2: scala.collection.Map[String,Long] = Map(d -> 1, b -> 1, a -> 2, c -> 1) ​ ​​​​​​​​
  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 568 关注
  • rdd
    5 引用 • 2 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
  • zzgdream888

    这个算子的意思是 spark 集成的一些方法吗 😄