Spark Streaming 实时统计数据(累加器的应用)
如果代码缺失导致无法运行,请留言标识,我会补全的 ❤️
场景描述
从 kafka 中取实时数据,对数据进行清洗过滤,然后和当天的历史数据进行合并去重,对合并后的数据集进行汇总。将汇总结果写入 HBase,当时间到第二天的时候清除前一天历史数据,重新统计。
实现逻辑
- 采用 Spark Streaming 读取 Kafka 中的实时数据流,生成 DStream
- 过滤其中的满足要求的数据,生成 DStream[k,v] (注:k 为数据唯一键, v 为详细数据信息)
- 采用 Spark Streaming 中 DStream[k,v]的 mapWithState 方法生成去重后的数据集
- 通过调用 StreamingContext 中的 awaitTerminationOrTimeout(time) 方法设置当前 StreamingContext 的终止时间实现在 24 时终止所有上述 DStream 计算。
- 调用 StreamingContext 中的 stop 方法,终止 StreamingContext。调用 stop 方法默认会终止 SparkContext,设置 stop(stopSparkContext:Boolean = false,stopGracefully:Boolean = true)参数,可以实现不终止 SparkContext,同时能够保持 StreamingContext 已经接受的 Batch 能够处理完成后再终止 StreamingContext
JAVA 代码
RealStatStreaming.java
import kafka.utils.ZKGroupTopicDirs;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function0;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.State;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import scala.Tuple2;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* 用于实时统计数据。
* @date 2019年9月3日17:57:38
* @author Ludengke
*/
public final class RealStatStreaming {
private static final String OFFSET_DIR = KAFKA_ROOT_PATH.concat(new ZKGroupTopicDirs(REALSTAT_GROUP_ID, CONSUMER_TOPIC_NAME).consumerOffsetDir());
private static SparkSession sparkSession = null;
private static JavaStreamingContext sc = null;
public static void main(String[] args) throws Exception {
SparkConf sparkConf = SparkFactory.getDefaultSparkConf()
.set("spark.sql.shuffle.partitions","24")
.setAppName("RealStatStreaming");
sparkSession = SparkSession.builder()
.config(sparkConf)
.getOrCreate();
// 根据 Spark配置生成 sc对象
/**
* 生成方式有2,如果CheckPoint有内容,则从上次CheckPoint启动
* 如果没有则重新生成。代码重新编译之后,CheckPoint需要删除。
*/
sc = JavaStreamingContext.getOrCreate(CHECK_POINT_DIR, (Function0<JavaStreamingContext>) () -> {
sc = new JavaStreamingContext(JavaSparkContext.fromSparkContext(sparkSession.sparkContext()), Durations.seconds(REALSTAT_DURATIONS_SECOND));
sc.checkpoint(CHECK_POINT_DIR);
return sc;
});
// Kafka 相关配置
Map<String, Object> kafkaParams = new HashMap<>(16);
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_QUORUM);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, REALSTAT_GROUP_ID);
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET);
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
RealStatStreaming.work(kafkaParams);
sc.start();
sc.awaitTerminationOrTimeout(getNeedRunTime());
sc.stop();
}
/**
* 计算实时统计任务需要运行的时长。
* 明日0时 - 当前的时间
* @return
* @throws ParseException
*/
private static long getNeedRunTime() throws ParseException {
SimpleDateFormat sdfDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date now = new Date();
String tomorrowMidnight = SystemUtils.getDateAddDays(sdfDate.format(now).substring(0,10),1)+ " 00:00:00";
Date tomorrow = sdfDate.parse(tomorrowMidnight);
return tomorrow.getTime()-now.getTime();
}
private static void work(Map<String, Object> kafkaParams) {
// 根据 Kafka配置以及 sc对象生成 Streaming对象
JavaInputDStream<ConsumerRecord<String, String>> stream = RealStatStreaming.getStreaming(sc,kafkaParams);
// 取出kafka数据中的value
JavaDStream<String> lines = stream.map(ConsumerRecord::value);
/**
* Format将数据转化成<key,bean>的形式,并且过滤
* 使用mapWithState将历史数据和当前数据合并去重处理。
* 调用stateSnapshots获取全部state的值;不调用的话仅仅包含本轮次的值。
* statAndSave统计原始原始单,将结果保存到HBase
*
* PS: 如果常驻内存数据需要初始值的话,需要StateSpec.function(数据更新维护函数).initialState(初始化RDD)
*/
SimpleDateFormat sdfDate = new SimpleDateFormat("yyyy-MM-dd");
String date = sdfDate.format(new Date());
RealStatStreaming.statAndSave(RealStatStreaming.FormatData(lines,date).mapWithState(StateSpec.function(RealStatStreaming::mappingFunc)).stateSnapshots(),date);
// 更新存储在 Zookeeper中的偏移量
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
for (OffsetRange o : offsetRanges) {
ZookeeperFactory.getZkUtils().updatePersistentPath(
String.join(ZK_SPLIT, OFFSET_DIR, String.valueOf(o.partition())),
String.valueOf(o.fromOffset()),
ZookeeperFactory.getZkUtils().DefaultAcls()
);
SystemUtils.info("UPDATE OFFSET WITH [ topic :" + o.topic() + " partition :" + o.partition() + " offset :" + o.fromOffset() + " ~ " + o.untilOffset() + " ]");
}
});
}
/**
* 根据StreamingContext以及Kafka配置生成DStream
*/
private static JavaInputDStream<ConsumerRecord<String, String>> getStreaming(JavaStreamingContext context, Map<String, Object> kafkaParams) {
// 获取偏移量存储路径下的偏移量节点
if (!ZookeeperFactory.getZkClient().exists(OFFSET_DIR)) {
ZookeeperFactory.getZkClient().createPersistent(OFFSET_DIR, true);
}
List<String> children = ZookeeperFactory.getZkClient().getChildren(OFFSET_DIR);
if (children != null && !children.isEmpty()) {
Map<TopicPartition, Long> fromOffsets = new HashMap<>(children.size());
// 可以读取到存在Zookeeper中的偏移量 使用读取到的偏移量创建Streaming来读取Kafka
for (String child : children) {
long offset = Long.valueOf(ZookeeperFactory.getZkClient().readData(String.join(ZK_SPLIT, OFFSET_DIR, child)));
fromOffsets.put(new TopicPartition(CONSUMER_TOPIC_NAME, Integer.valueOf(child)), offset);
SystemUtils.info("FOUND OFFSET IN ZOOKEEPER, USE [ partition :" + child + " offset :" + offset + " ]");
}
SystemUtils.info("CREATE DIRECT STREAMING WITH CUSTOMIZED OFFSET..");
return KafkaUtils.createDirectStream(
context,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Assign(new HashSet<>(fromOffsets.keySet()), kafkaParams, fromOffsets)
);
} else {
// Zookeeper内没有存储偏移量 使用默认的偏移量创建Streaming
SystemUtils.info("NO OFFSET FOUND, CREATE DIRECT STREAMING WITH DEFAULT OFFSET..");
return KafkaUtils.createDirectStream(
context,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(Collections.singleton(CONSUMER_TOPIC_NAME), kafkaParams)
);
}
}
/**
* 根据给出的时间过滤所需用统计的数据
* @param lines 要处理的数据流
* @param date 过滤条件时间字段
* @return
*/
private static JavaPairDStream<String,ApiTrade> FormatData (JavaDStream<String> lines,String date) {
return lines.mapPartitionsToPair(RealStatStreaming::JsonToPairTradeBean).filter((Function<Tuple2<String, ApiTrade>, Boolean>) line->{
if (line._2==null|| line._2.getD()==null || line._2.getE()==null){
return false;
}
if (date.equals(line._2.getD().substring(0,10)) || date.equals(line._2.getE().substring(0,10))){
return true;
}else {
return false;
}
});
}
/**
* 对常驻内存的数据快照进行统计,将结果写入HBase
* @param lines 常驻内存的数据快照
* @param date HBase表的主键
*/
private static void statAndSave(JavaPairDStream<String,ApiTrade> lines,String date) {
lines.foreachRDD(tmp->{
JavaRDD<ApiTrade> apiTrade = tmp.map((Function<Tuple2<String, ApiTrade>, ApiTrade>) v1 -> {
return v1._2;
});
Dataset<ApiTrade> tradeData = sparkSession.createDataset(apiTrade.rdd(), Encoders.bean(ApiTrade.class));
tradeData.createOrReplaceTempView("data");
String selectSql = " count(1) count,sum(g) as money";
String groupSql = " group by b";
Dataset<Row> allStatData = sparkSession.sql(String.join(" ", "select ", "b", selectSql, " from data", groupSql));
/**
* 创建HBase表,里面包含表存在判断。
*/
HBaseFactory.createTables("daily_total_stat",500);
/**
* 总量统计数据写入HBase
*/
allStatData.rdd().toJavaRDD().foreach(line->{
Put put = new Put(Bytes.toBytes(date));
put.addColumn(
Bytes.toBytes(COLUMN_FAMILY),
Bytes.toBytes("count"),
Bytes.toBytes(line.getAs("count").toString()));
put.addColumn(
Bytes.toBytes(COLUMN_FAMILY),
Bytes.toBytes("money"),
Bytes.toBytes(line.getAs("money").toString()));
HBaseFactory.writeToHBase("daily_total_stat",put);
});
});
}
/**
* 将某个分区内的Json数据转化成bean的形式
* @param s 某分区数据迭代器
* @return
*/
private static Iterator JsonToTradeBean(Iterator<String> s){
ArrayList<ApiTrade> tmp = new ArrayList<>();
while (s.hasNext()) {
ApiTrade apiTrade = SystemUtils.LOWER_CASE_WITH_UNDERSCORES_GSON.fromJson(s.next(), ApiTrade.class);
tmp.add(apiTrade);
}
return tmp.iterator();
}
/**
* 将某个分区内的Json数据转化成<key,bean>的形式
* @param s 某分区数据迭代器
* @return
*/
private static Iterator<Tuple2<String,ApiTrade>> JsonToPairTradeBean(Iterator<String> s){
ArrayList<Tuple2<String,ApiTrade>> tmp = new ArrayList<>();
while (s.hasNext()) {
ApiTrade apiTrade = SystemUtils.LOWER_CASE_WITH_UNDERSCORES_GSON.fromJson(s.next(), ApiTrade.class);
tmp.add(new Tuple2<String,ApiTrade>(MD5Utils.encode(apiTrade.getA() + apiTrade.getB() + apiTrade.getC()), apiTrade));
}
return tmp.iterator();
}
/**
* 根据key对应的当前数据和历史数据更新合并成新值(key所对应的value值)
* @param key 历史数据的key
* @param one key对应的当前数据
* @param curState key对应的历史数据
* @return
*/
private static Tuple2<String,ApiTrade> mappingFunc (String key, Optional<ApiTrade> one, State<ApiTrade> curState){
//判断one是否包含值
if (one.isPresent()) {
//取出当前批次的值
ApiTrade oneTrade = one.get();
//判断历史值是否存在,不存在直接新增,存在则判断是否更新
if (curState.exists()) {
//取出历史值,如果历史值为空或者当前值的修改时间大于历史值的修改时间,则更新数据为当前数据
ApiTrade curStateTrade = curState.getOption().isEmpty()?null:curState.getOption().get();
if(curStateTrade==null || oneTrade.getF().compareTo(curStateTrade.getF())>0){
curState.update(oneTrade);
}
} else {
curState.update(oneTrade);
}
}
return new Tuple2<>(key,curState.get());
}
}
SystemUtils .java
import com.google.gson.FieldNamingPolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
/**
* @author ludengke
* @date 2019/9/11
**/
public class SystemUtils {
/**
* 日期调整若干天
*/
public static String getDateAddDays(String date ,Integer count) throws ParseException {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd" );
Calendar cal = Calendar.getInstance();
cal.setTime(format.parse(date));
cal.add(Calendar.DATE, count);
return format.format(cal.getTime());
}
public static final Gson DEFAULT_GSON = new GsonBuilder().create();
public static final Gson LOWER_CASE_WITH_UNDERSCORES_GSON = new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create();
}
SparkFactory.java
import org.apache.spark.SparkConf;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.sql.Row;
/**
* @author ludengke
* @date 2019/9/11
**/
public class SparkFactory {
/**
* 所有任务公共配置
*
* @desc https://spark.apache.org/docs/latest/configuration.html
*/
public static SparkConf getDefaultSparkConf() {
return new SparkConf()
.set("spark.shuffle.file.buffer", "1024k")
.set("spark.reducer.maxSizeInFlight", "128m")
.set("spark.shuffle.memoryFraction", "0.3")
.set("spark.streaming.stopGracefullyOnShutdown", "true")
.set("spark.streaming.kafka.maxRatePerPartition", "300")
.set("spark.serializer", KryoSerializer.class.getCanonicalName())
.registerKryoClasses(new Class[]{Row.class,Object.class,ApiTrade.class});
}
}
GlobalConfig.java
/**
* @author ludengke
* @date 2019/9/11
**/
public class GlobalConfig {
/**
* Kafka 配置在Zookeeper中的根路径
*/
public static final String KAFKA_ROOT_PATH = "/kafka";
/**
* CheckPoint输出目录,在hdfs上。
*/
public static final String CHECK_POINT_DIR = "/user/hdfs/RealStatStreamingCheckpoint";
/**
* 实时统计 消费者组id
*/
public static final String REALSTAT_GROUP_ID = "realstat";
/**
* 实时统计 streaming 间隔
*/
public static final long REALSTAT_DURATIONS_SECOND = 60L;
/**
* kafka连接
*/
public static final String KAFKA_QUORUM = "kafka1:9092,kafka12:9092,kafka3:9092";
/**
* Kafka偏移量获取方式
*/
public static final String AUTO_OFFSET_RESET = "earliest";
/**
* Zookeeper
*/
public static final String ZK_SPLIT = "/";
/**
* zk连接
*/
public static final String ZOOKEEPER_QUORUM = "kafka1:2181,kafka2:2181,kafka3:2181";
/**
* HBase 列簇名
*/
public static final String COLUMN_FAMILY = "default";
}
MD5Utils.java
import java.security.MessageDigest;
/**
* @author : LiuWeidong
* @date : 2018/12/29.
*/
public class MD5Utils {
private static final String[] HEX_DIG_ITS = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"};
public static String encode(String origin) {
String resultString = null;
try {
resultString = origin;
MessageDigest md = MessageDigest.getInstance("MD5");
resultString = byteArrayToHexString(md.digest(resultString.getBytes()));
} catch (Exception ignored) {
}
return resultString;
}
private static String byteArrayToHexString(byte[] b) {
StringBuilder resultSb = new StringBuilder();
for (byte aB : b) {
resultSb.append(byteToHexString(aB));
}
return resultSb.toString();
}
private static String byteToHexString(byte b) {
int n = b;
if (n < 0) {
n += 256;
}
int d1 = n / 16;
int d2 = n % 16;
return HEX_DIG_ITS[d1] + HEX_DIG_ITS[d2];
}
}
ApiTrade.java
import java.io.Serializable;
/**
* @author ldk
*/
public class ApiTrade implements Serializable {
private String a;
private String b;
private String c;
private String d;
private String e;
private String f;
private Integer g;
public String getA() {
return a;
}
public void setA(String a) {
this.a = a;
}
public String getB() {
return b;
}
public void setB(String b) {
this.b = b;
}
public String getC() {
return c;
}
public void setC(String c) {
this.c = c;
}
public String getD() {
return d;
}
public void setD(String d) {
this.d = d;
}
public String getE() {
return e;
}
public void setE(String e) {
this.e = e;
}
public String getF() {
return f;
}
public void setF(String f) {
this.f = f;
}
public Integer getG() {
return g;
}
public void setG(Integer g) {
this.g = g;
}
}
ZookeeperFactory.java
import com.wangdian.spark.tasks.system.GlobalConfig;
import com.wangdian.spark.tasks.utils.serializer.CustomSerializer;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
/**
* @author : ldk
* @date : 2019/1/24.
*/
public class ZookeeperFactory {
private static final ZkConnection ZK_CONNECTION = new ZkConnection(GlobalConfig.ZOOKEEPER_QUORUM);
private static final ZkClient ZK_CLIENT = new ZkClient(getZkConnection(), GlobalConfig.ZOOKEEPER_CONNECTION_TIMEOUT, new CustomSerializer());
private static final ZkUtils ZK_UTILS = new ZkUtils(getZkClient(), getZkConnection(), false);
public static ZkConnection getZkConnection() {
return ZK_CONNECTION;
}
public static ZkClient getZkClient() {
return ZK_CLIENT;
}
public static ZkUtils getZkUtils() {
return ZK_UTILS;
}
private ZookeeperFactory() {
}
}
HBaseFactory.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
/**
* @author : ldk
* @date : 2019/1/22.
*/
public class HBaseFactory {
private static Connection conn = null;
private static Configuration conf = null;
public static Configuration getHBaseConf() {
if (conf == null) {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", GlobalConfig.ZOOKEEPER_QUORUM);
conf.set("zookeeper.znode.parent", GlobalConfig.HBASE_ZNODE_PARENT);
}
return conf;
}
public static Connection createHBaseConn() {
if (conn == null || conn.isClosed()) {
try {
conn = ConnectionFactory.createConnection(getHBaseConf());
} catch (IOException e) {
SystemUtils.error("创建HBase连接异常 : ", e);
}
}
return conn;
}
public static synchronized void createTables(String tableName,Integer version) {
try (Admin admin = HBaseFactory.createHBaseConn().getAdmin()) {
if (admin.tableExists(TableName.valueOf(tableName))) {
return;
}
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
builder.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(COLUMN_FAMILY))
/**
* 启用压缩
*/
.setCompressionType(Compression.Algorithm.SNAPPY)
/**
* 设置最大存储版本号
*/
.setMaxVersions(version)
.build()
);
if (!admin.tableExists(TableName.valueOf(tableName))) {
admin.createTable(builder.build());
}
} catch (Exception e) {
SystemUtils.error("创建HBase表结构异常: " + tableName, e);
}
}
public static boolean isTableExist(String tableName) {
try (Admin admin = HBaseFactory.createHBaseConn().getAdmin()) {
return admin.tableExists(TableName.valueOf(tableName));
} catch (Exception e) {
SystemUtils.error("判断HBase表状态异常: " + tableName, e);
}
return false;
}
public static void writeToHBase(String tableName, Put put) {
Table table = null;
try {
table = HBaseFactory.createHBaseConn().getTable(TableName.valueOf(tableName));
table.put(put);
} catch (Exception e) {
SystemUtils.error("HBase数据写入异常 TABLE NAME :" + "[ " + tableName + " ]", e);
}
finally {
try {
if(table!=null){
table.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void writeToHBase(String tableName, List<Put> puts) {
Table table = null;
try {
table = HBaseFactory.createHBaseConn().getTable(TableName.valueOf(tableName));
table.put(puts);
} catch (Exception e) {
SystemUtils.error("HBase数据写入异常 TABLE NAME :" + "[ " + tableName + " ]", e);
}
finally {
try {
if(table!=null){
table.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
运行截图
PS:可以忽略最后一个 Job 的运行时间,这个是异常 Streaming 的截图,正常的我没截到。
每十个批次合并一次内存持久 RDD。一般任务是两个 job,第十个批次是三个 job,有一个 job 是用于合并内存持久化 RDD 的。
感兴趣的人可以去了解下为什么任务中会有 skipped 的任务。
总结
这个任务对集群有一定的要求,是把统计的数据放在了内存中,计算快,但是需要的内存量大。如果把统计结果放在内存中,会相对较小一些,因为业务的特殊要求,将 mapWithState 方法当成去重函来使用了。
下面这个例子是将统计结果放到内存中了。Spark Streaming 实时统计商户当日累计 PV 流量
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于