Spark 编程原理及 RDD 的特性与基本操作

本贴最后更新于 2682 天前,其中的信息可能已经时移世异

Spark 编程原理

1、配置初始环境:SparkConf,SparkContext

SparkConf conf = new SparkConf().setAppName("mapTest").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf);

2、初始化 RDD(定义 RDD 的来源,HDFS,file,hive,hbase...)
3、定义 RDD 的操作算子(Transformation,Action),对 RDD 进行各种处理,变换,统计。。。
4、重复的进行下一步的 RDD 操作,直到所有的操作完成,因为 Spark 是是内存迭代式的,可以在一次操作完成后继续下一次操作。
5、获得最终想要的结果,保存到 HDFS,file,hive,hbase...

RDD 的特性

RDD:(Resillient Distributed Dataset)弹性式分布式数据集

  • 首先它是一个数据集合,可以是本地文件,可以是 HDFS 文件,也可以是数据库文件。
  • 其次每个 RDD 都会被分成若干个 partition(分区),每个 partition 在一个节点上。
  • 然后 RDD 是存储在内存中的,当内存不够用的时候会存储在磁盘(内部机制会自动进行内存和磁盘的切换)。
  • 最后 RDD 具有很强的容错性,当某个 paitition 数据丢失后,会根据其来源进行计算得到此 paritition 的数据。

RDD 的来源:

  • 文本文件 file:
jsc.textFile("file.txt");//加载某个文件
jsc.textFile("directory/*.txt");//加载某类文件
  • HDFS 文件:
jsc.textFile("hdfs://spark:9000/filePath");//加载hdfs文件
  • sequenceFile 文件
jsc.sequenceFile("path");//加载二进制文件
  • HIVE
  • Hbase
val hbconf = HbaseConfiguration.create()
			.addResource(new Path("hbase-site.xml"))
			.set(TableInputFormat.INPUT_TABLE,tableName)
  • ...

在 Hadoop 集群中使用 path 如果是 Hdfs 应写成:“hdfs:///path”,如本地文件:“

RDD 的操作

RDD 的操作分为两个部分 Transformation 和 Action。Transformmation->Action 这两种操作分为多个算子(即操作函数)。Transformation 针对已有的 RDD 创建一个新的 RDD,主要是对数据进行映射,变换,统计,过滤。。。Action 主要是对数据进行最后的执行操作,遍历,聚合,保存等操作。在没有执行 Action 操作之前,Transformation 操作都不会被执行,因为 Transformation 算子是 Lazy(懒加载的)。

Transformations 算子

TransformationMeaning
map(func)对数据集中的每个元素进行函数映射并返回
flatMap(func)类似map,但每个输入项都可以映射到0个或多个输出项(因此函数应该返回一个序列而不是单个项)。
filter(func)过滤掉不符合条件的元素/选择符合条件的元素
mapPartitions类似map,但是运行在每个分区上
mapPartitionsWithIndex。。。
sample(withReplacement,fraction,seed)使用给定的随机数生成器种子采样数据的一小部分,不管是否替换。
union(dataset)来两个数据集进行连接(求并集)
intersection(dataset)两个数据集进行连接(求交集)
distinct对数据及去重
groupByKey通过key分组
reduceByKey通过key进行聚合
aggregateByKey。。。
sortByKey排序
join对(K,V)和(K,W)连接得到(k,(v,w)),[求交集]。外连接可以用:leftOuterJoin, rightOuterJoin, fullOuterJoin.
cogroup对(K,V)和(K,W)连接得到(k,(v,w)),[求并集,v或w可能有个为空]。
cartesian。。。
pipe。。。
coalesce(num paritition)减少RDD的分区数量
repartition对Shuflle过后的数据进行重分区
repartitionAndSortWithinPartitions重分区,并且对每个分区的数据进行按key排序
#### Actions算子
ActionMeaning
reduce(func)通过摸个函数func对dataSet进行聚合操作
Collect()返回数据集中的所有元素
count()对数据集进行数量统计
first()返回第一条数据,等同于take(1)
take(n)返回数据集中的前n条
takeSample(num,seed)返回一个数组
takeOrdered(n,[ordering])返回一个排序RDD的前n条数据,可以自定义比较器
saveAsTextFile(path)将数据写入到一个文件(本地文件,HDFS文件,任何Hadoop支持的文件)
saveAsSequenceFile(path)将数据写入到一个给定路径的Hadoop SequenceFile。
saveAsObjectFile(path)吧另存为一个java序列化文件,可用`SparkContext.objectFile()`加载这个文件.
countByKey()通过key来统计,类似于分组统计
foreach(func)遍历数据集,并对每个数据执行func函数操作

Shuffle

Spark 中的某些操作会触发一个称之为"Shuffle"的事件,Shuffle 是 Spark 用来重新分配数据,使其用来分组不同的分区的一种机制。这种机制通常涉及到 Executor机器 之间复制数据,因此 Shuffle 是一种复杂而昂贵的操作。

想要了解 Shuffle 操作发生了什么,我们可以通过 reduceByKey 这个算子实例来看。reduceBykey 操作产生一个全新的 RDD,这个 RDD 包含单个 Key 的所有 Value 的和。但是单个 key 的 Value 不移地能够在同一个 Paratition,甚至不在同一个机器上。但是他们必须共同定位才能计算结果。

在 Spark 中数据通常不会分布在同一个 Paritation,一个 Task 会操作一个 Paritation 数据,但是一个 reduceBeKey 必须组织所有的数据到几个不同的分区中读取所有 Key 和所有 Value,然后将这些不同分区的数据组合起来进行计算每个 Key 的 Value,这个过程称之为 Shuffle
Shuffle 操作的分区,元素顺序确定,但每个元素的顺序不是有序的,要想使之有序可以使用:

  • mapPartitions 对每个 Paritation 排序,例如.sorted
  • repartitionAndSortWithinPartitions
  • sortBy 对所有 RDD 数据排序

可以触发 Shuffle 操作的算子:包括 repatition算子 如:reparation 和 coalesce;"ByKey"算子(除了 countByKey)如:groupByKey,reduceByKey,join算子 如:cogroup,join.

RDD 的持久化

Spark 非常重要的一个功能特性就是可以将 RDD 持久化在内存中。当对 RDD 执行持久化操作时,每个节点都会将自己操作的 RDD 的 partition 持久化到内存中,并且在之后对该 RDD 的反复使用中,直接使用内存缓存的 partition。这样的话,对于针对一个 RDD 反复执行多个操作的场景,就只要对 RDD 计算一次即可,后面直接使用该 RDD,而不需要反复计算多次该 RDD。
##为什么要进行 RDD 持久化
对于一个 RDD 操作来说,操作执行完之后 RDD 就会被销毁,如果下一次计算跟上一次相同,需要用到同一个 RDD 那么此时这个 RDD 已经销毁,我们要得到这个 RDD 就必须重新读取文件,计算出这个 RDD。但是这样做就浪费了大量的时间做重复的计算上,尤其是当数据量特别大且 transformation 操作较多的时候,当 Actions 操作触发任务执行这个过程将会非常耗时的时候。因此对于这些 RDD 我们可以将其持久化到内存中,方便重复使用。
巧妙使用 RDD 持久化,甚至在某些场景下,可以将 spark 应用程序的性能提升 10 倍。对于迭代式算法和快速交互式应用来说,RDD 持久化,是非常重要的。

持久化的方式

只需要对 RDD 调用 cache()或者 persist()方法就可以。当 RDD 倍计算出来之后就会被还缓存到每个节点上,Spark 的持久化机制可以自动容错
当某个 paritation 的数据丢失之后可以通过其源头自动重新计算该 Paritation.

  • cache():在创建该 RDD 的时候就必须调用 cache()才有用,是简化版的 persist(),默认级别是 MEMORYONLY,调用了 persist()的无参构造。
  • persist(Leve):persist()操作,可以设置持久化的级别,选择持久化到内存还是磁盘。。。
val docsRDD = sc.textFile("input/docs").cache()
val docsRDD = sc.textFile("input/docs").persist(StorageLevels.MEMORY_ONLY);

我么可以调用 unpersist()从内存中清除缓存。
Shuffle操作的时候也会进行数据的持久化,一般都是磁盘,这样是为了防止数据丢失,导致重新计算这个过程。

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 552 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
rzx
此生最怕深情被辜负,最怕兄弟成陌路。对世界充满善意,同时又充满深深的恨意,我渴望天降甘霖福泽众生,又渴望灭世洪水重创世纪。 广州