二、Spark 算子和 RDD

本贴最后更新于 2268 天前,其中的信息可能已经时移世易

二、Spark 算子和 RDD


Spark 的算子分为两类:

Transformation:转换 。Spark 所有 Transformation
Action : 动作 。Spark 所有 Action
Transformation 延迟执行,Transformation 会记录元数据信息,当计算任务触发 Action 时,才会被真正计算。


创建 RDD 的两种方式

1.通过 HDFS 支持的文件系统创建 RDD,创建后 RDD 里并没有真正要计算的数据(懒加载),只记录了元数据(数据路径)

  1. 通过 Scala 集合或数组以并行化的方式创建 RDD。sc.parallelize(Array(1,2,3,4,5))
    RDD.partitions.length 可以查看 RDD 的分区数量

RDD 的特点

  • 具有一个分区列表,一个分区肯定在某个机器上,但一个机器可能有多个分区
  • 函数会作用于每一个分区上,同一个操作会在每个分区上都执行
  • RDD 之间有一系列的依赖,算子是链式调用,每个算子都生成一个新的 RDD。后一个 RDD 会记录前一个 RDD 的某些信息,所有当计算过程中前面的数据丢失,后面的 RDD 根据依赖关系重新恢复丢掉的数据。
  • 如果是 Key-Value 类型,会有一个分区器(Partitioner)。默认是 hash-Partitioner。可以自定义分区器。
  • 记录 RDD 中每个分区位置的列表,为了移动计算,而不移动数据。例如,HDFS 文件的块位置。

  • Internally, each RDD is characterized by five main properties:
    • A list of partitions
    • A function for computing each split
    • A list of dependencies on other RDDs
    • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

提交任务到集群

还是 WordCount 的例子。这次不使用 spark-shell。在本地写好后,打出 jar 包,传到 spark 集群,然后执行。

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Sweeney on 2017/11/7.
  */
object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WC")
    val sc = new SparkContext(conf) // 创建一个SparkContext
    sc.textFile(args(0))            // 从路径中读取数据
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, false)
      .saveAsTextFile(args(1))      // 将结果报存到一个目录

    sc.stop()
  }
}

打出 jar 包后传到 Master

shiwei@ubuntu1:~/bigdata/jars$ pwd
/home/shiwei/bigdata/jars
shiwei@ubuntu1:~/bigdata/jars$ ls
WC-assembly-1.0.jar

通过 spark-submit 来提交任务。

./bin/spark-submit --class WordCount --master spark://ubuntu1:7077 /home/shiwei/bigdata/jars/WC-assembly-1.0.jar ./README.md ~/bigdata/results/WC.out

最终结果保存在 worker 机器上的 ~/bigdata/results/WC.out 目录下。而在 Master 上是不会有的,因为真正执行操作的是 worker 机器。所以这里使用 HDFS 要好一点,可以指定输出结果到哪个节点上。


踩的坑

第一个坑是我在 WordCount 项目中引入了 2.11 的 Spark-core,但是项目 Scala SDK 使用的是 2.12。所以在上传到集群提交执行的时候报错了。最后将 SDK 版本调整为 2.11 才解决问题!!!
报错如下:

Caused by: java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)[Ljava/lang/Object;
	at WordCount$.$anonfun$main$1(WordCount.scala:10)
	at WordCount$.$anonfun$main$1$adapted(WordCount.scala:10)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

第二个坑是提交执行的时候,给输出结果指定目录。指定的是 ~/bigdata/results/WC.out,成功运行完毕后再 Master 上这个目录下找不到输出结果,重试了几次,鬼斧神差的去 worker 上这个目录看了下,果然每个 worker 的这个目录都有结果。唯独执行提交的 master 上没有。后来想了想,这样应该是正常的,因为任务都是在 worker 上执行的。

  • Simon
    20 引用 • 10 回帖
  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 561 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...