SparkStreaming Dstream 的 Transformation/output 算子(window,updateStateByKey)

本贴最后更新于 2851 天前,其中的信息可能已经时移世异

Ttransformation 算子

SparkStreaming Dstream 的 Transformation 算子和 SparkCore 的操作基本类似,毕竟 DStream 的底层还是 RDD.

TransformationMeaning
map对传入的每个元素返回一个新的元素
flatMap对传入的每个元素返回一个或多个元素
filter过滤掉不符合条件的元素/选择符合条件的元素
union将两个DStream合并
count返回元素的个数
reduce对所有的Values进行聚合
countByValue按值分组并统计每组的总数,返回(K,V)的格式
reduceByKey按K分组,对Values进行聚合
cogroup对两个Dstream进行链接操作,一个Key连接起来的两个RDD的数据,都会以Iterable'<'V'>'的形式出现在一个Tuple中
join对两个Dstream进行链接操作,每个连接起来的Pair作为Dstream的RDD的一个新元素
transform对Dstream进行转换,转换成RDD,Dataset.
updateStateBykey为每个Key维护一个状态,并进行更新
window对滑动窗口内的数据执行操作
可以看到大部分的算子跟RDD的操作算子基本相同,独有的算子如`transform,updateStateByKey,window`,也是SparkStreaming中比较有用的算子。

window

ac7a14b4f0604b3f9fd247a3b626aaa9-image.png

window:滑动窗口,最早在计算机网络中接触,用于做流量控制,这里的滑动窗口用于选择某一段时间内的 RDD 构成一个 Dstream 进行计算,实时流计算设置的 Batch interval 只能让我们获取一段一段的数据,每一段数据之间是不会交叉重叠的,对于数据分析来说,两组数据之间可能存在断层,阶跃。window 可以让数据之间进行平滑的过度。当然有一定的局限性,如果我们的目标仅仅里类似对数据进行一些处理,不去探寻数据见的关系的话,就没有必要,因为这会让每一台拿数据进行多次处理次数取决于 window 算子的参数。

使用:

参数 1:窗口长度
参数 2:滑动间隔
这两个参数必须是 batch inveral 的整数倍。
类似上图,窗口长度为 3 秒,时间间隔为 2 秒。每三秒获取的 inputDStream 聚合为一个窗口进行计算,间隔两秒再计算一次...

相关算子:

TransformationMeaning
window对每个滑动窗口的数据执行自定义计算
countByWindow对每个窗口的数据执行count操作
countByVaueAndWindow对每个窗口的数据执行countByValue操作
reduceByWindow对每个窗口的数据执行reduce操作
reduceBykeyAndWindow对每个窗口的数据执行reduceBykey操作
groupBykeyAndWindow对每个窗口的数据执行groupBykey操作
使用起来比较简单:只需要把wordcounnt中的Dstream执行window操作得到一个Dstream,对这个Dstream进行后续的操作即是对window的操作.
//3秒作为一个窗口,间隔为2秒 JavaPairDStream<String,Integer> pairDStreamWindows = pairDStream.window(Durations.seconds(3),Durations.seconds(2)); JavaPairDStream<String,Integer> wordCount = pairDStreamWindows.reduceByKey((x1,x2)->(x1+x2));
//scala def wordCountwindow(){ val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount"); val ssc = new StreamingContext(conf,Seconds(1)) val words = ssc.socketTextStream("localhost",9999) .flatMap((line)=>line.split(" ")) .map(x=>(x,1)) //window()此函数有多种重载。 val w3window = words.window(Seconds(3),Seconds(2)) val wordcount = w3window.reduceByKey(_+_) wordcount.print() ssc.start() ssc.awaitTermination() }

如此我们从一个每一秒进行一次 wordcount 的统计就变成了每隔 2 秒统计 3 秒内的数据作为一个窗口统计一次 wordcount。

def wordCountRBwindow(){ val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount"); val ssc = new StreamingContext(conf,Seconds(1)) val words = ssc.socketTextStream("localhost",9999) .flatMap((line)=>line.split(" ")) .map(x=>(x,1)) //reduceByKeyAndWindow(reduceFunc,windowDuration,slideDuration),此函数有多种参数重载。 val wordcount20 = words.reduceByKeyAndWindow((x1:Int,x2:Int)=>x1+x2,Seconds(20),Seconds(2)) wordcount20.print() ssc.start() ssc.awaitTermination() }

其他的 window 算子类似。print 算子是为了触发 job 操作。

updateStateByKey

updateStateBykey 可以为每一个 Key 维护一个 state,并在每次数据产生时对 state 进行更新。前提条件是必须开启CheckPoint机制。checkPoint 可以保证在内存中长期存贮的 state 故障丢失的时候可以得到恢复。

  • 定义一个 state,可以为任意数据类型。
  • 设置更新函数,更新函数会会执行如何来进行更新。

对于每一个 batch 的数据,spark 都会为之前已经存在的 key 更新 state[无论这个 batch 中是否有和 key 相同的数据],对于新出现的 key 也会进行 state 个更新。如果更新函数返回 none,那么 key 的 state 就会被删除。

回顾前面的 wordcount 程序,从开始的离线操作记录某个文件的 wordcount。到实时的 batch inveral 一个时间延迟内的 wordcount。再到滑动窗口的多个 batch inveral 内的 worcount。
而使用 updateStateByKey,可以实现全局的 worcount。我们用每个 key 的 state 保存 word 的 count,每次一 batch 将数据 state 进行累加,即每一次 batch 和前面所有的数据进行一次 worcount 的 reduceByKey 操作。
< 如果不借助 updateStateByKey 我们就需要将没给 batch 的 wordCount 保存/缓存起来,进行累加。>
开启 Checkpoint,并使用而使用 updateStateByKey:

private static void wordCountUpdateStateByKey() throws InterruptedException{ SparkConf conf = new SparkConf().setAppName("ssBSDataSource").setMaster("local[*]"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1)); //开启checkpoint jsc.checkpoint("hdfs://spark:9000/checkPointDir"); //ReceiverInputDStream JavaReceiverInputDStream lines = jsc.socketTextStream("localhost",9999); JavaDStream listDstream = lines.flatMap(line-> Arrays.asList(line.split(" ")).iterator()); JavaPairDStream pairDStream = listDstream.mapToPair(x->new Tuple2<>(x,1)); //这里不再使用reduceByKey 而使用updateStateByKey JavaPairDStream wordCount = pairDStream.updateStateByKey( new Function2, Optional, Optional>() { public Optional call(List values,Optional state) throws Exception { int vl =0; if(state.isPresent()){ vl = state.get(); } for(Integer value : values) { vl += value; } return Optional.of(vl); } }); //这里是Lambda版本 JavaPairDStream wordCount2 = pairDStream.updateStateByKey( (values,state)->{ int vl =0; if(state.isPresent()){ vl = state.get(); } for(Integer value : values) { vl += value; } return Optional.of(vl); }); wordCount.print(); jsc.start(); jsc.awaitTermination(); jsc.close(); }

这里的 Optional 类是 Java8 中的一种防御性检查机制,用于消除空指针异常。详情请看我的《Java8 实战》拥抱变化,下载原书籍,第十章:用 Optional 取代 null。

上述的这些 Transformation 算子都是懒加载的都不能出发 job 操作。最终的操作结果需要 output 操作才能输出/保存起来。

Output 算子:

TransformationMeaning
print直接输出batch中的前十条数据,通常用来测试,在没有output操做的时候用于触发job
saveAsTextFile(prefix, [suffix])将每个batch的数据保存到文件中。每个batch的文件的命名格式为:prefix-TIME_IN_MS[.suffix]
savaAsObjectFile同上,但是将每个batch的数据以序列化对象的方式,保存到SequenceFile中。
saveAsHadoopFile同上,将数据保存到Hadoop中
foreachRDD最常用的output操作,遍历DStream中的每个产生的RDD,进行处理。可以将每个RDD中的数据写入外部存储,比如文件、数据库、缓存等。通常在其中,是针对RDD执行action操作的,比如foreach。
>Dstream中的所有的操作最终都是由output操作触发的,没有output操作spark是不会执行前面的操作逻辑。其中较为特殊的是foreachRDD,尽管是用了这个output算子也不会触发job,在其中还需要action算子才能够真正的开始触发job执行.

foreachRDD:

顾名思义,遍历 RDD,然后对每个 RDD 进行操作。多数情况下时进行持久化操作,写入到外部存储。通常需要建立一个 Connection,比如 JDBC Connection。然后通过 Connnection 将数据写入外部存储。如下:
1、在 foreach 操作的外部创建 Connection,这种方式会导致 Connection 对象被序列化后传输到每个 task 中,实际上这种 Connection 对象是不能被序列化的,所以这是一种错误的操作。

dstream.foreachRDD { rdd => val connection = createNewConnection() rdd.foreach { record => connection.send(record) } }

2、在 foreach 内部创建 Connection 对象,这种方式会让每一条数据都创建一个 Connection,非常消耗内存和性能。

dstream.foreachRDD { rdd => rdd.foreach { val connection = createNewConnection() record => connection.send(record) connection.close() } }

3、使用 foreachPartition 代替 foreach,这样只会对一个 Partition 创建一个 Connection,节省开销。

dstream.foreachRDD { rdd =>rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } }

总结:

SparkStreaming 的操作算子也分为两类,Transformation 和 output(类似 Action),其中最实用的 window 和 updateStateByKey,以及最常用的 foreachRDD,对于 foreachRDD 需要注意写法,是一个值得优化的地方。

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 565 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
rzx
此生最怕深情被辜负,最怕兄弟成陌路。对世界充满善意,同时又充满深深的恨意,我渴望天降甘霖福泽众生,又渴望灭世洪水重创世纪。 广州