大数据实时计算框架:SparkStreaming

本贴最后更新于 2688 天前,其中的信息可能已经时异事殊

实时计算

简介

随着大数据的快速发展,业务场景越来越复杂,离线式的批处理框架 MapReduce 已经不能满足业务,大量的场景需要实时的数据处理结果来进行分析,决策。例如实时的用户推荐,在 618 这样的刺激环境下普通历史数据的推荐已经不能满足场景,就需要采集前分钟,甚至式前几秒的数据进行分析。实时计算适用于这种对历史数据依赖不强,短时间内变化较大的数据。用户行为分析,舆情分析,等等不断随环境和时间实时变化的数据都可能用到实时计算。

流程

实时的数据源:微博,微信,股票交易,银行流水,商城交易,日志等数据。

消息中间件:Kafka,作为消息队列,提供数据的缓冲功能,同时也提供容错机制。

实时处理框架(SparkStreaming):通过编写的 app 应用来拉取消息中间件的数据进行分布式的并行计算,处理和输出。

实时计算一定是基于分布式的并行计算框架的,单机对于短时间的高数据量远远达不到实时处理。

SparkStreaming

简介

SparkStreaming 是 Spark 提供的分布式的大数据实时计算框架,是基于 SparkCore(Spark 核心 API)的扩展,他提供了动态的,高吞吐量的,可容错的流式数据处理。他可以从多个数据 Kafka,Flume,Kinesis,Twitter,Tcp scokets 中获取数据,然后使用复杂的算法和高级的函数算子如:map,reduce,join,window...进行数据处理加工。最后可以将处理后的数据输出到文件系统,数据库,和可视化界面,同样也可以在数据流上使用机器学习和图形计算算法。
SparkStreaming 同 sparksql 一样在核心 RDD 上封装一种数据集 DStream,用于适应实时计算的特点,类似于 sparksql 的 Dataset 和 DataFrame 用于方便交互式查询操作。
9add1de69872485caa9a92d17ddc8741-image.png

原理

接收实时的输入数据流,将数据拆分成多个 batch,如每一秒的数据封装成一个 batch,将每个 batch 交给 Spark 进行处理。最后将结果输出,同样的输出也是按 batch 来进行划分。
345810b0db7e4406be1727635e17174b-image.png

示例 SparkStreaming 的工作

StreamingContext

1.用 conf 对象初始化 Streaming;

//scala
import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val sc   = new StreamingContext(conf,Seconds(1))
//java
import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));

2.用 context 对象初始化

//scala
import org.apache.spark.streaming._

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
//java
import org.apache.spark.streaming.api.java.*;

JavaSparkContext sc = ...   //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

appName,是用来在 Spark UI 上显示的应用名称。master,是一个 Spark、Mesos 或者 Yarn 集群的 URL,或者是 local[*]运行在本地模式(用于本地测试和单元测试)。
必须根据应用程序的延迟需求和可用的集群资源来设置 批处理间隔batch interval

Discretized Streams(DStream)

Spark Streaming 提供了一种高级的抽象,叫做 DStream,英文全称为 Discretized Stream,中文翻译为“离散流”,它代表了一个持续不断的数据流。DStream 可以通过输入数据源来创建,比如 Kafka、Flume 和 Kinesis,也可以通过对其他 DStream 应用高阶函数来创建,比如 map、reduce、join、window。!
DStream 的内部,其实一系列持续不断产生的 RDD。RDD 是 Spark Core 的核心抽象,即不可变的,分布式的弹性性数据集。DStream 中的每个 RDD 都包含了一个时间段内的数据。
ea19380297544753860c68e24c49939b-image.png
对 DStream 应用的算子,比如 flatmap,其实在底层会被翻译为对 DStream 中每个 RDD 的操作。比如对一个 DStream 执行一个 flatmap 操作,会产生一个新的 DStream。但是,在底层其原理为,对输入 DStream 中每个时间段的 RDD,都应用一遍 flatmap 操作,然后生成的新的 RDD,即作为新的 DStream 中的那个时间段的一个 RDD。底层的 RDD 的 transformation 操作,其实,还是由 Spark Core 的计算引擎来实现的。Spark Streaming 对 Spark Core 进行了一层封装,隐藏了细节,然后对开发人员提供了方便易用的高层次的 API。
b781236cf5264cc5a6ded56b512d5476-image.png

实时的 WordCount 应用

我们通过经典的 wordcount 来初步的了解 SparkStreaming 是如何进行实时的流处理的。

//基本数据源:
//Tcp Socket: `socketTextStream`
//file,Hdfs: `textFileStream("hdfs://...")`
private static  void ssBSDataSource() throws InterruptedException{
  SparkConf conf = new SparkConf().setAppName("ssBSDataSource").setMaster("local[2]");
  //创建JavaStreamingContext,设置时间延迟为1秒(每次收集前一秒的数据)
  JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));

  /*jsc.textFileStream("input/streaming");//file
    jsc.textFileStream("hdfs://...");     //hdfs 
  */  
  //从Socket中获取数据,监听端口9999,的得到的是ReceiverInputDStream
  JavaReceiverInputDStream lines =   jsc.socketTextStream("localhost",9999);
    //接下来就是wordcount的操作了。
  JavaDStream listDstream =  lines.flatMap(line-> Arrays.asList(line.split(" ")).iterator());
  JavaPairDStream pairDStream =  listDstream.mapToPair(x->new Tuple2<>(x,1));
  JavaPairDStream wordCount =  pairDStream.reduceByKey((x1,x2)->(x1+x2));

  wordCount.print();//默认输出前10条数据
  jsc.start();
  jsc.awaitTermination();//不断的不等待一段时间间隔进行一次执行。

  //直到我们使用jsc.close();
}

创建StreamingContext->获取InputDStream->业务操作->输出->循环获取数据并操作,可见实时流处理相比普通的 RDD 操作并没有太多的不同之处,这里的 StreamingContext 相当于 SparkContext,SparkSession,DStream 相当与 RDD 和 Dataset,是操作对象。一个不同是 StreamingContext需要start()启动才能执行,另一个不同点在于 SparkStreaming 会不断的从端口获取实时数据,然后执行相同的操作。

SparkStreaming 的数据源

基础数据源:文件(file,HDFS),Socket,Akka Ator,是内置支持的数据源。

Hdfs:JavaReceiverInputDStream lines=textFileStream("hdfs://spark1:9000/fileDir")
对于文件数据源的实时处理,会不断的去检查目录,一旦有 移入或者重命名(只处理新加入目录和名称不同的新文件)的文件就会进行数据的获取和处理,处理之后即使内容修改也不会再处理。需要强调的是数据格式必须一致。

高级数据源:Kafka,Flume,Kinesis,Twitter,可以引用第三方依赖调用。

Kafka:需要配置依赖,或者直接在 jar 放到 Spark 的 jars 中一同加入 Library.Version=SparkVersion

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

Flume 同 Kafka 配置:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume_2.10</artifactId>
    <version>2.1.1</version>
</dependency>

依赖可以去 Maven repository 查询。具体应用后面再讲。

自定义数据源:以自定义数据源,来决定如何接受和存储。

DStream 和 Receiver

  • InputDStream 代表从数据源接收到的输入 DStream,在上面的实例中 lines 代表从 socket 中接收到的数据流的输入 DStream,除了文件数据流之外每个InputDstream都会绑定一个Receiver对象这个对象[Receiver]用于获取/接收数据并将其存储到Spark的内存中,以备后续使用。
  • 如果想要 并行的接受多个数据流,我们可以创建多个 JavaReceierInputDStream,这样每个 InputDStream 都会创建一个 Receiver,从而并行的接收多个数据流。但是 SparkStreamingAPP 在执行的时候是个持续工作的过程,Spark 的 Work/Executor 会独占一个 CPU Core,所以说一旦 APP 运行,那么这个 CPU Core 就没法给其他 APP 使用。

所以说我们要保证一个 APP 运行,在 本地Local在起码保证要有大于2个线程【一个用于给处理 InputDStream 的 Executor 分配一个线程,一个用于 Receiver 接收数据。即一条接收数据,一条处理数据。】所以 setMaster("local[n]"),n 不能设置为 1,也不能直接 local,必须是要为 local[n] n>=2.
在集群中(不设置 Master)必须要保证单个节点 Cpu Core>1,然后给每个 Executor 分配的 CPU Core 必须 >1。这样才能保证才能保证 Executor 既可以执行 Receiver 数据的接收,又可以进行数据的处理。


因此,在我们配置 Spark 的时候必须给 Executor 配置 >1 的 CPU Core.才能满足 SparkStreaming 的单任务执行。
特例:基于 Hdfs 文件的数据源是没有绑定 Receiver 的。因此不会占用一个 CPU Core.

参考:

SparkStreming 官方文档

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 552 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
  • someone

    :eyes: 革命尚未成功,同志还需努力!!!!:smiling_imp: :smiling_imp: