Spark Streaming 实时统计数据(累加器的应用)

本贴最后更新于 2070 天前,其中的信息可能已经东海扬尘

Spark Streaming 实时统计数据(累加器的应用)

如果代码缺失导致无法运行,请留言标识,我会补全的 ❤️

场景描述

从 kafka 中取实时数据,对数据进行清洗过滤,然后和当天的历史数据进行合并去重,对合并后的数据集进行汇总。将汇总结果写入 HBase,当时间到第二天的时候清除前一天历史数据,重新统计。

实现逻辑

  1. 采用 Spark Streaming 读取 Kafka 中的实时数据流,生成 DStream
  2. 过滤其中的满足要求的数据,生成 DStream[k,v] (注:k 为数据唯一键, v 为详细数据信息)
  3. 采用 Spark Streaming 中 DStream[k,v]的 mapWithState 方法生成去重后的数据集
  4. 通过调用 StreamingContext 中的 awaitTerminationOrTimeout(time) 方法设置当前 StreamingContext 的终止时间实现在 24 时终止所有上述 DStream 计算。
  5. 调用 StreamingContext 中的 stop 方法,终止 StreamingContext。调用 stop 方法默认会终止 SparkContext,设置 stop(stopSparkContext:Boolean = false,stopGracefully:Boolean = true)参数,可以实现不终止 SparkContext,同时能够保持 StreamingContext 已经接受的 Batch 能够处理完成后再终止 StreamingContext

JAVA 代码

RealStatStreaming.java

import kafka.utils.ZKGroupTopicDirs; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function0; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.State; import org.apache.spark.streaming.StateSpec; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.*; import scala.Tuple2; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; /** * 用于实时统计数据。 * @date 2019年9月3日17:57:38 * @author Ludengke */ public final class RealStatStreaming { private static final String OFFSET_DIR = KAFKA_ROOT_PATH.concat(new ZKGroupTopicDirs(REALSTAT_GROUP_ID, CONSUMER_TOPIC_NAME).consumerOffsetDir()); private static SparkSession sparkSession = null; private static JavaStreamingContext sc = null; public static void main(String[] args) throws Exception { SparkConf sparkConf = SparkFactory.getDefaultSparkConf() .set("spark.sql.shuffle.partitions","24") .setAppName("RealStatStreaming"); sparkSession = SparkSession.builder() .config(sparkConf) .getOrCreate(); // 根据 Spark配置生成 sc对象 /** * 生成方式有2,如果CheckPoint有内容,则从上次CheckPoint启动 * 如果没有则重新生成。代码重新编译之后,CheckPoint需要删除。 */ sc = JavaStreamingContext.getOrCreate(CHECK_POINT_DIR, (Function0<JavaStreamingContext>) () -> { sc = new JavaStreamingContext(JavaSparkContext.fromSparkContext(sparkSession.sparkContext()), Durations.seconds(REALSTAT_DURATIONS_SECOND)); sc.checkpoint(CHECK_POINT_DIR); return sc; }); // Kafka 相关配置 Map<String, Object> kafkaParams = new HashMap<>(16); kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_QUORUM); kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, REALSTAT_GROUP_ID); kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET); kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); RealStatStreaming.work(kafkaParams); sc.start(); sc.awaitTerminationOrTimeout(getNeedRunTime()); sc.stop(); } /** * 计算实时统计任务需要运行的时长。 * 明日0时 - 当前的时间 * @return * @throws ParseException */ private static long getNeedRunTime() throws ParseException { SimpleDateFormat sdfDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date now = new Date(); String tomorrowMidnight = SystemUtils.getDateAddDays(sdfDate.format(now).substring(0,10),1)+ " 00:00:00"; Date tomorrow = sdfDate.parse(tomorrowMidnight); return tomorrow.getTime()-now.getTime(); } private static void work(Map<String, Object> kafkaParams) { // 根据 Kafka配置以及 sc对象生成 Streaming对象 JavaInputDStream<ConsumerRecord<String, String>> stream = RealStatStreaming.getStreaming(sc,kafkaParams); // 取出kafka数据中的value JavaDStream<String> lines = stream.map(ConsumerRecord::value); /** * Format将数据转化成<key,bean>的形式,并且过滤 * 使用mapWithState将历史数据和当前数据合并去重处理。 * 调用stateSnapshots获取全部state的值;不调用的话仅仅包含本轮次的值。 * statAndSave统计原始原始单,将结果保存到HBase * * PS: 如果常驻内存数据需要初始值的话,需要StateSpec.function(数据更新维护函数).initialState(初始化RDD) */ SimpleDateFormat sdfDate = new SimpleDateFormat("yyyy-MM-dd"); String date = sdfDate.format(new Date()); RealStatStreaming.statAndSave(RealStatStreaming.FormatData(lines,date).mapWithState(StateSpec.function(RealStatStreaming::mappingFunc)).stateSnapshots(),date); // 更新存储在 Zookeeper中的偏移量 stream.foreachRDD(rdd -> { OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); for (OffsetRange o : offsetRanges) { ZookeeperFactory.getZkUtils().updatePersistentPath( String.join(ZK_SPLIT, OFFSET_DIR, String.valueOf(o.partition())), String.valueOf(o.fromOffset()), ZookeeperFactory.getZkUtils().DefaultAcls() ); SystemUtils.info("UPDATE OFFSET WITH [ topic :" + o.topic() + " partition :" + o.partition() + " offset :" + o.fromOffset() + " ~ " + o.untilOffset() + " ]"); } }); } /** * 根据StreamingContext以及Kafka配置生成DStream */ private static JavaInputDStream<ConsumerRecord<String, String>> getStreaming(JavaStreamingContext context, Map<String, Object> kafkaParams) { // 获取偏移量存储路径下的偏移量节点 if (!ZookeeperFactory.getZkClient().exists(OFFSET_DIR)) { ZookeeperFactory.getZkClient().createPersistent(OFFSET_DIR, true); } List<String> children = ZookeeperFactory.getZkClient().getChildren(OFFSET_DIR); if (children != null && !children.isEmpty()) { Map<TopicPartition, Long> fromOffsets = new HashMap<>(children.size()); // 可以读取到存在Zookeeper中的偏移量 使用读取到的偏移量创建Streaming来读取Kafka for (String child : children) { long offset = Long.valueOf(ZookeeperFactory.getZkClient().readData(String.join(ZK_SPLIT, OFFSET_DIR, child))); fromOffsets.put(new TopicPartition(CONSUMER_TOPIC_NAME, Integer.valueOf(child)), offset); SystemUtils.info("FOUND OFFSET IN ZOOKEEPER, USE [ partition :" + child + " offset :" + offset + " ]"); } SystemUtils.info("CREATE DIRECT STREAMING WITH CUSTOMIZED OFFSET.."); return KafkaUtils.createDirectStream( context, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Assign(new HashSet<>(fromOffsets.keySet()), kafkaParams, fromOffsets) ); } else { // Zookeeper内没有存储偏移量 使用默认的偏移量创建Streaming SystemUtils.info("NO OFFSET FOUND, CREATE DIRECT STREAMING WITH DEFAULT OFFSET.."); return KafkaUtils.createDirectStream( context, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(Collections.singleton(CONSUMER_TOPIC_NAME), kafkaParams) ); } } /** * 根据给出的时间过滤所需用统计的数据 * @param lines 要处理的数据流 * @param date 过滤条件时间字段 * @return */ private static JavaPairDStream<String,ApiTrade> FormatData (JavaDStream<String> lines,String date) { return lines.mapPartitionsToPair(RealStatStreaming::JsonToPairTradeBean).filter((Function<Tuple2<String, ApiTrade>, Boolean>) line->{ if (line._2==null|| line._2.getD()==null || line._2.getE()==null){ return false; } if (date.equals(line._2.getD().substring(0,10)) || date.equals(line._2.getE().substring(0,10))){ return true; }else { return false; } }); } /** * 对常驻内存的数据快照进行统计,将结果写入HBase * @param lines 常驻内存的数据快照 * @param date HBase表的主键 */ private static void statAndSave(JavaPairDStream<String,ApiTrade> lines,String date) { lines.foreachRDD(tmp->{ JavaRDD<ApiTrade> apiTrade = tmp.map((Function<Tuple2<String, ApiTrade>, ApiTrade>) v1 -> { return v1._2; }); Dataset<ApiTrade> tradeData = sparkSession.createDataset(apiTrade.rdd(), Encoders.bean(ApiTrade.class)); tradeData.createOrReplaceTempView("data"); String selectSql = " count(1) count,sum(g) as money"; String groupSql = " group by b"; Dataset<Row> allStatData = sparkSession.sql(String.join(" ", "select ", "b", selectSql, " from data", groupSql)); /** * 创建HBase表,里面包含表存在判断。 */ HBaseFactory.createTables("daily_total_stat",500); /** * 总量统计数据写入HBase */ allStatData.rdd().toJavaRDD().foreach(line->{ Put put = new Put(Bytes.toBytes(date)); put.addColumn( Bytes.toBytes(COLUMN_FAMILY), Bytes.toBytes("count"), Bytes.toBytes(line.getAs("count").toString())); put.addColumn( Bytes.toBytes(COLUMN_FAMILY), Bytes.toBytes("money"), Bytes.toBytes(line.getAs("money").toString())); HBaseFactory.writeToHBase("daily_total_stat",put); }); }); } /** * 将某个分区内的Json数据转化成bean的形式 * @param s 某分区数据迭代器 * @return */ private static Iterator JsonToTradeBean(Iterator<String> s){ ArrayList<ApiTrade> tmp = new ArrayList<>(); while (s.hasNext()) { ApiTrade apiTrade = SystemUtils.LOWER_CASE_WITH_UNDERSCORES_GSON.fromJson(s.next(), ApiTrade.class); tmp.add(apiTrade); } return tmp.iterator(); } /** * 将某个分区内的Json数据转化成<key,bean>的形式 * @param s 某分区数据迭代器 * @return */ private static Iterator<Tuple2<String,ApiTrade>> JsonToPairTradeBean(Iterator<String> s){ ArrayList<Tuple2<String,ApiTrade>> tmp = new ArrayList<>(); while (s.hasNext()) { ApiTrade apiTrade = SystemUtils.LOWER_CASE_WITH_UNDERSCORES_GSON.fromJson(s.next(), ApiTrade.class); tmp.add(new Tuple2<String,ApiTrade>(MD5Utils.encode(apiTrade.getA() + apiTrade.getB() + apiTrade.getC()), apiTrade)); } return tmp.iterator(); } /** * 根据key对应的当前数据和历史数据更新合并成新值(key所对应的value值) * @param key 历史数据的key * @param one key对应的当前数据 * @param curState key对应的历史数据 * @return */ private static Tuple2<String,ApiTrade> mappingFunc (String key, Optional<ApiTrade> one, State<ApiTrade> curState){ //判断one是否包含值 if (one.isPresent()) { //取出当前批次的值 ApiTrade oneTrade = one.get(); //判断历史值是否存在,不存在直接新增,存在则判断是否更新 if (curState.exists()) { //取出历史值,如果历史值为空或者当前值的修改时间大于历史值的修改时间,则更新数据为当前数据 ApiTrade curStateTrade = curState.getOption().isEmpty()?null:curState.getOption().get(); if(curStateTrade==null || oneTrade.getF().compareTo(curStateTrade.getF())>0){ curState.update(oneTrade); } } else { curState.update(oneTrade); } } return new Tuple2<>(key,curState.get()); } }

SystemUtils .java

import com.google.gson.FieldNamingPolicy; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; /** * @author ludengke * @date 2019/9/11 **/ public class SystemUtils { /** * 日期调整若干天 */ public static String getDateAddDays(String date ,Integer count) throws ParseException { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd" ); Calendar cal = Calendar.getInstance(); cal.setTime(format.parse(date)); cal.add(Calendar.DATE, count); return format.format(cal.getTime()); } public static final Gson DEFAULT_GSON = new GsonBuilder().create(); public static final Gson LOWER_CASE_WITH_UNDERSCORES_GSON = new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create(); }

SparkFactory.java

import org.apache.spark.SparkConf; import org.apache.spark.serializer.KryoSerializer; import org.apache.spark.sql.Row; /** * @author ludengke * @date 2019/9/11 **/ public class SparkFactory { /** * 所有任务公共配置 * * @desc https://spark.apache.org/docs/latest/configuration.html */ public static SparkConf getDefaultSparkConf() { return new SparkConf() .set("spark.shuffle.file.buffer", "1024k") .set("spark.reducer.maxSizeInFlight", "128m") .set("spark.shuffle.memoryFraction", "0.3") .set("spark.streaming.stopGracefullyOnShutdown", "true") .set("spark.streaming.kafka.maxRatePerPartition", "300") .set("spark.serializer", KryoSerializer.class.getCanonicalName()) .registerKryoClasses(new Class[]{Row.class,Object.class,ApiTrade.class}); } }

GlobalConfig.java

/** * @author ludengke * @date 2019/9/11 **/ public class GlobalConfig { /** * Kafka 配置在Zookeeper中的根路径 */ public static final String KAFKA_ROOT_PATH = "/kafka"; /** * CheckPoint输出目录,在hdfs上。 */ public static final String CHECK_POINT_DIR = "/user/hdfs/RealStatStreamingCheckpoint"; /** * 实时统计 消费者组id */ public static final String REALSTAT_GROUP_ID = "realstat"; /** * 实时统计 streaming 间隔 */ public static final long REALSTAT_DURATIONS_SECOND = 60L; /** * kafka连接 */ public static final String KAFKA_QUORUM = "kafka1:9092,kafka12:9092,kafka3:9092"; /** * Kafka偏移量获取方式 */ public static final String AUTO_OFFSET_RESET = "earliest"; /** * Zookeeper */ public static final String ZK_SPLIT = "/"; /** * zk连接 */ public static final String ZOOKEEPER_QUORUM = "kafka1:2181,kafka2:2181,kafka3:2181"; /** * HBase 列簇名 */ public static final String COLUMN_FAMILY = "default"; }

MD5Utils.java

import java.security.MessageDigest; /** * @author : LiuWeidong * @date : 2018/12/29. */ public class MD5Utils { private static final String[] HEX_DIG_ITS = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}; public static String encode(String origin) { String resultString = null; try { resultString = origin; MessageDigest md = MessageDigest.getInstance("MD5"); resultString = byteArrayToHexString(md.digest(resultString.getBytes())); } catch (Exception ignored) { } return resultString; } private static String byteArrayToHexString(byte[] b) { StringBuilder resultSb = new StringBuilder(); for (byte aB : b) { resultSb.append(byteToHexString(aB)); } return resultSb.toString(); } private static String byteToHexString(byte b) { int n = b; if (n < 0) { n += 256; } int d1 = n / 16; int d2 = n % 16; return HEX_DIG_ITS[d1] + HEX_DIG_ITS[d2]; } }

ApiTrade.java

import java.io.Serializable; /** * @author ldk */ public class ApiTrade implements Serializable { private String a; private String b; private String c; private String d; private String e; private String f; private Integer g; public String getA() { return a; } public void setA(String a) { this.a = a; } public String getB() { return b; } public void setB(String b) { this.b = b; } public String getC() { return c; } public void setC(String c) { this.c = c; } public String getD() { return d; } public void setD(String d) { this.d = d; } public String getE() { return e; } public void setE(String e) { this.e = e; } public String getF() { return f; } public void setF(String f) { this.f = f; } public Integer getG() { return g; } public void setG(Integer g) { this.g = g; } }

ZookeeperFactory.java

import com.wangdian.spark.tasks.system.GlobalConfig; import com.wangdian.spark.tasks.utils.serializer.CustomSerializer; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; /** * @author : ldk * @date : 2019/1/24. */ public class ZookeeperFactory { private static final ZkConnection ZK_CONNECTION = new ZkConnection(GlobalConfig.ZOOKEEPER_QUORUM); private static final ZkClient ZK_CLIENT = new ZkClient(getZkConnection(), GlobalConfig.ZOOKEEPER_CONNECTION_TIMEOUT, new CustomSerializer()); private static final ZkUtils ZK_UTILS = new ZkUtils(getZkClient(), getZkConnection(), false); public static ZkConnection getZkConnection() { return ZK_CONNECTION; } public static ZkClient getZkClient() { return ZK_CLIENT; } public static ZkUtils getZkUtils() { return ZK_UTILS; } private ZookeeperFactory() { } }

HBaseFactory.java

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.stream.Stream; /** * @author : ldk * @date : 2019/1/22. */ public class HBaseFactory { private static Connection conn = null; private static Configuration conf = null; public static Configuration getHBaseConf() { if (conf == null) { conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", GlobalConfig.ZOOKEEPER_QUORUM); conf.set("zookeeper.znode.parent", GlobalConfig.HBASE_ZNODE_PARENT); } return conf; } public static Connection createHBaseConn() { if (conn == null || conn.isClosed()) { try { conn = ConnectionFactory.createConnection(getHBaseConf()); } catch (IOException e) { SystemUtils.error("创建HBase连接异常 : ", e); } } return conn; } public static synchronized void createTables(String tableName,Integer version) { try (Admin admin = HBaseFactory.createHBaseConn().getAdmin()) { if (admin.tableExists(TableName.valueOf(tableName))) { return; } TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); builder.setColumnFamily( ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(COLUMN_FAMILY)) /** * 启用压缩 */ .setCompressionType(Compression.Algorithm.SNAPPY) /** * 设置最大存储版本号 */ .setMaxVersions(version) .build() ); if (!admin.tableExists(TableName.valueOf(tableName))) { admin.createTable(builder.build()); } } catch (Exception e) { SystemUtils.error("创建HBase表结构异常: " + tableName, e); } } public static boolean isTableExist(String tableName) { try (Admin admin = HBaseFactory.createHBaseConn().getAdmin()) { return admin.tableExists(TableName.valueOf(tableName)); } catch (Exception e) { SystemUtils.error("判断HBase表状态异常: " + tableName, e); } return false; } public static void writeToHBase(String tableName, Put put) { Table table = null; try { table = HBaseFactory.createHBaseConn().getTable(TableName.valueOf(tableName)); table.put(put); } catch (Exception e) { SystemUtils.error("HBase数据写入异常 TABLE NAME :" + "[ " + tableName + " ]", e); } finally { try { if(table!=null){ table.close(); } } catch (IOException e) { e.printStackTrace(); } } } public static void writeToHBase(String tableName, List<Put> puts) { Table table = null; try { table = HBaseFactory.createHBaseConn().getTable(TableName.valueOf(tableName)); table.put(puts); } catch (Exception e) { SystemUtils.error("HBase数据写入异常 TABLE NAME :" + "[ " + tableName + " ]", e); } finally { try { if(table!=null){ table.close(); } } catch (IOException e) { e.printStackTrace(); } } } }

运行截图

20190911182414.png

PS:可以忽略最后一个 Job 的运行时间,这个是异常 Streaming 的截图,正常的我没截到。
每十个批次合并一次内存持久 RDD。一般任务是两个 job,第十个批次是三个 job,有一个 job 是用于合并内存持久化 RDD 的。

感兴趣的人可以去了解下为什么任务中会有 skipped 的任务。

总结

这个任务对集群有一定的要求,是把统计的数据放在了内存中,计算快,但是需要的内存量大。如果把统计结果放在内存中,会相对较小一些,因为业务的特殊要求,将 mapWithState 方法当成去重函来使用了。
下面这个例子是将统计结果放到内存中了。Spark Streaming 实时统计商户当日累计 PV 流量

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 563 关注
  • mapWithState
    1 引用
  • 累加器
    1 引用

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...