Ttransformation 算子
SparkStreaming Dstream 的 Transformation 算子和 SparkCore 的操作基本类似,毕竟 DStream 的底层还是 RDD.
Transformation | Meaning |
---|---|
map | 对传入的每个元素返回一个新的元素 |
flatMap | 对传入的每个元素返回一个或多个元素 |
filter | 过滤掉不符合条件的元素/选择符合条件的元素 |
union | 将两个DStream合并 |
count | 返回元素的个数 |
reduce | 对所有的Values进行聚合 |
countByValue | 按值分组并统计每组的总数,返回(K,V)的格式 |
reduceByKey | 按K分组,对Values进行聚合 |
cogroup | 对两个Dstream进行链接操作,一个Key连接起来的两个RDD的数据,都会以Iterable'<'V'>'的形式出现在一个Tuple中 |
join | 对两个Dstream进行链接操作,每个连接起来的Pair作为Dstream的RDD的一个新元素 |
transform | 对Dstream进行转换,转换成RDD,Dataset. |
updateStateBykey | 为每个Key维护一个状态,并进行更新 |
window | 对滑动窗口内的数据执行操作 |
window
window:滑动窗口,最早在计算机网络中接触,用于做流量控制,这里的滑动窗口用于选择某一段时间内的 RDD 构成一个 Dstream 进行计算,实时流计算设置的 Batch interval 只能让我们获取一段一段的数据,每一段数据之间是不会交叉重叠的,对于数据分析来说,两组数据之间可能存在断层,阶跃。window 可以让数据之间进行平滑的过度。当然有一定的局限性,如果我们的目标仅仅里类似对数据进行一些处理,不去探寻数据见的关系的话,就没有必要,因为这会让每一台拿数据进行多次处理次数取决于 window 算子的参数。
使用:
参数 1:窗口长度
参数 2:滑动间隔
这两个参数必须是 batch inveral 的整数倍。
类似上图,窗口长度为 3 秒,时间间隔为 2 秒。每三秒获取的 inputDStream 聚合为一个窗口进行计算,间隔两秒再计算一次...
相关算子:
Transformation | Meaning |
---|---|
window | 对每个滑动窗口的数据执行自定义计算 |
countByWindow | 对每个窗口的数据执行count操作 |
countByVaueAndWindow | 对每个窗口的数据执行countByValue操作 |
reduceByWindow | 对每个窗口的数据执行reduce操作 |
reduceBykeyAndWindow | 对每个窗口的数据执行reduceBykey操作 |
groupBykeyAndWindow | 对每个窗口的数据执行groupBykey操作 |
//3秒作为一个窗口,间隔为2秒
JavaPairDStream<String,Integer> pairDStreamWindows = pairDStream.window(Durations.seconds(3),Durations.seconds(2));
JavaPairDStream<String,Integer> wordCount = pairDStreamWindows.reduceByKey((x1,x2)->(x1+x2));
//scala
def wordCountwindow(){
val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount");
val ssc = new StreamingContext(conf,Seconds(1))
val words = ssc.socketTextStream("localhost",9999)
.flatMap((line)=>line.split(" "))
.map(x=>(x,1))
//window()此函数有多种重载。
val w3window = words.window(Seconds(3),Seconds(2))
val wordcount = w3window.reduceByKey(_+_)
wordcount.print()
ssc.start()
ssc.awaitTermination()
}
如此我们从一个每一秒进行一次 wordcount 的统计就变成了每隔 2 秒统计 3 秒内的数据作为一个窗口统计一次 wordcount。
def wordCountRBwindow(){
val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount");
val ssc = new StreamingContext(conf,Seconds(1))
val words = ssc.socketTextStream("localhost",9999)
.flatMap((line)=>line.split(" "))
.map(x=>(x,1))
//reduceByKeyAndWindow(reduceFunc,windowDuration,slideDuration),此函数有多种参数重载。
val wordcount20 = words.reduceByKeyAndWindow((x1:Int,x2:Int)=>x1+x2,Seconds(20),Seconds(2))
wordcount20.print()
ssc.start()
ssc.awaitTermination()
}
其他的 window 算子类似。print 算子是为了触发 job 操作。
updateStateByKey
updateStateBykey 可以为每一个 Key 维护一个 state,并在每次数据产生时对 state 进行更新。前提条件是必须开启CheckPoint机制
。checkPoint 可以保证在内存中长期存贮的 state 故障丢失的时候可以得到恢复。
- 定义一个 state,可以为任意数据类型。
- 设置更新函数,更新函数会会执行如何来进行更新。
对于每一个 batch 的数据,spark 都会为之前已经存在的 key 更新 state[无论这个 batch 中是否有和 key 相同的数据],对于新出现的 key 也会进行 state 个更新。如果更新函数返回 none,那么 key 的 state 就会被删除。
回顾前面的 wordcount 程序,从开始的离线操作记录某个文件的 wordcount。到实时的 batch inveral 一个时间延迟内的 wordcount。再到滑动窗口的多个 batch inveral 内的 worcount。
而使用 updateStateByKey,可以实现全局的 worcount。我们用每个 key 的 state 保存 word 的 count,每次一 batch 将数据 state 进行累加,即每一次 batch 和前面所有的数据进行一次 worcount 的 reduceByKey 操作。
< 如果不借助 updateStateByKey 我们就需要将没给 batch 的 wordCount 保存/缓存起来,进行累加。>
开启 Checkpoint,并使用而使用 updateStateByKey:
private static void wordCountUpdateStateByKey() throws InterruptedException{
SparkConf conf = new SparkConf().setAppName("ssBSDataSource").setMaster("local[*]");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));
//开启checkpoint
jsc.checkpoint("hdfs://spark:9000/checkPointDir");
//ReceiverInputDStream
JavaReceiverInputDStream lines = jsc.socketTextStream("localhost",9999);
JavaDStream listDstream = lines.flatMap(line-> Arrays.asList(line.split(" ")).iterator());
JavaPairDStream pairDStream = listDstream.mapToPair(x->new Tuple2<>(x,1));
//这里不再使用reduceByKey 而使用updateStateByKey
JavaPairDStream wordCount = pairDStream.updateStateByKey(
new Function2, Optional, Optional>() {
public Optional call(List values,Optional state) throws Exception {
int vl =0;
if(state.isPresent()){
vl = state.get();
}
for(Integer value : values) {
vl += value;
}
return Optional.of(vl);
}
});
//这里是Lambda版本
JavaPairDStream wordCount2 = pairDStream.updateStateByKey(
(values,state)->{
int vl =0;
if(state.isPresent()){
vl = state.get();
}
for(Integer value : values) {
vl += value;
}
return Optional.of(vl);
});
wordCount.print();
jsc.start();
jsc.awaitTermination();
jsc.close();
}
这里的 Optional 类是 Java8 中的一种防御性检查机制,用于消除空指针异常。详情请看我的《Java8 实战》拥抱变化,下载原书籍,第十章:用 Optional 取代 null。
上述的这些 Transformation 算子都是懒加载的都不能出发 job 操作。最终的操作结果需要 output 操作才能输出/保存起来。
Output 算子:
Transformation | Meaning |
---|---|
直接输出batch中的前十条数据,通常用来测试,在没有output操做的时候用于触发job | |
saveAsTextFile(prefix, [suffix]) | 将每个batch的数据保存到文件中。每个batch的文件的命名格式为:prefix-TIME_IN_MS[.suffix] |
savaAsObjectFile | 同上,但是将每个batch的数据以序列化对象的方式,保存到SequenceFile中。 |
saveAsHadoopFile | 同上,将数据保存到Hadoop中 |
foreachRDD | 最常用的output操作,遍历DStream中的每个产生的RDD,进行处理。可以将每个RDD中的数据写入外部存储,比如文件、数据库、缓存等。通常在其中,是针对RDD执行action操作的,比如foreach。 |
foreachRDD:
顾名思义,遍历 RDD,然后对每个 RDD 进行操作。多数情况下时进行持久化操作,写入到外部存储。通常需要建立一个 Connection,比如 JDBC Connection。然后通过 Connnection 将数据写入外部存储。如下:
1、在 foreach 操作的外部创建 Connection,这种方式会导致 Connection 对象被序列化后传输到每个 task 中,实际上这种 Connection 对象是不能被序列化的,所以这是一种错误的操作。
dstream.foreachRDD { rdd =>
val connection = createNewConnection()
rdd.foreach {
record => connection.send(record)
}
}
2、在 foreach 内部创建 Connection 对象,这种方式会让每一条数据都创建一个 Connection,非常消耗内存和性能。
dstream.foreachRDD { rdd =>
rdd.foreach {
val connection = createNewConnection()
record => connection.send(record)
connection.close()
}
}
3、使用 foreachPartition 代替 foreach,这样只会对一个 Partition 创建一个 Connection,节省开销。
dstream.foreachRDD {
rdd =>rdd.foreachPartition {
partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
总结:
SparkStreaming 的操作算子也分为两类,Transformation 和 output(类似 Action),其中最实用的 window 和 updateStateByKey,以及最常用的 foreachRDD,对于 foreachRDD 需要注意写法,是一个值得优化的地方。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于