使用 spark 从 kafka 消费数据写入 hive 动态分区表(一)

本贴最后更新于 2012 天前,其中的信息可能已经东海扬尘

使用 spark 从 kafka 消费数据写入 hive 动态分区表

最近因为业务需求,需要把 kafka 内的数据写入 hive 动态分区表,进入 kafka 的数据是保证不会重复的,同一条业务数据仅会进入 kafka 一次。这就保证数据到了 hive 基本不会发生 update 操作,可以对 hive 进行统计生成静态表的形式将统计数据写入 mysql。咱也不说那么多废话了,开整。

直接写入

从 kafka 取出数据转化成 bean 对象,根据业务要求将数据过滤,清洗,拿到最终的 RDD,直接写入 hive 分区表。(PS:我就不贴全部代码,仅仅贴出主要代码,作为一个有职业道德的程序员,职业道德不允许我这么做。哈哈哈,其实是我怂,我这么大条,万一泄露公司机密就 OVER 了)

RDD 写入 HIVE 动态分区表,因为我需要向两个动态分区表写数据,所以加了 type 字段

private static void writeHive(SparkSession sparkSession,JavaRDD rdd,String type){

        Dataset<Row> writerTradeData = sparkSession.createDataFrame(rdd,type.equals("trade")?ApiTradePlus.class:ApiTradeOrderPlus.class);
        writerTradeData.createOrReplaceTempView("tmp"+type);
        String sql = "";
        if(type.equals("trade")){
            sql = "insert into tmp_trade `partition(tradedate)` select * from "+"tmp"+type;
        }
        else if(type.equals("order")){
            sql = "insert into tmp_trade_order `partition(tradedate)` select * from "+"tmp"+type;
        }
        sparkSession.sql(sql);
    }

别忘记最重要的 hive 操作,默认是不支持动态分区表需要开启配置。以 hive.exec.开头的几个参数,前两个开启动态分区表的配置,后三个是设置分区数上限,我是按照日期分的,一不留神就容易超出分区表数上限,所以设置的稍微大了一点点。

SparkConf sparkConf = SparkFactory.getDefaultSparkConf()
                .set("spark.executor.cores", "4")
                .set("spark.ui.port", "30004")
                .set("hive.exec.dynamic.partition.mode","nonstrict")
                .set("hive.exec.dynamic.partition","true")
                .set("hive.exec.max.dynamic.partitions.pernode","100000")
                .set("hive.exec.max.dynamic.partitions","100000")
                .set("hive.exec.max.created.files","100000")
                .setAppName("KafkaToHiveStreaming");

哦,对了还有这个我写的 sparkConf 工厂类,生成默认配置,速率设置了 100,因为有 15 个分区,10s 执行一次,所以执行一次就会消费 1.5W 的消息,自我觉得应该能够 10s 应该够了。

public static SparkConf getDefaultSparkConf() {

        return new SparkConf()
                .set("spark.executor.memory", "6g")
                .set("spark.shuffle.file.buffer", "1024k")
                .set("spark.reducer.maxSizeInFlight", "128m")
                .set("spark.shuffle.memoryFraction", "0.3")
                .set("spark.streaming.stopGracefullyOnShutdown", "true")
                .set("spark.streaming.kafka.maxRatePerPartition", "100")
                .set("spark.serializer", KryoSerializer.class.getCanonicalName())
                .registerKryoClasses(SERIALIZER_CLASS);
    }

接下来激动人心的时刻到了。把任务提交到 spark 集群,当我满心欢喜提上去之后,打开监控页面,心瞬间就凉了一半,提交的第一个任务,执行了整整 10 分钟,我已经不忍心截图了,悲伤辣么大。😰
10 分钟处理了 1.5Wkafka 消息,写入数据库 2W 左右,因为业务需求,需要特殊记录写两次,位于的分区不一样。

曲线救国

先写入 hive 非分区表,然后再通过 hive 将非分区表的数据迁移到分区表上。
直接写分区表的速度真的是慢的可以,如果放到线上,我估计就要被离职了。为了我 Money 还是要加油呀。所以想出了这个曲线救国的路线,其实刚开始是想写 hadoop 文件,然后使用外部表,外部表数据源选择 hadoop 文件,然后通过 insert into select * from 导入。转念一想不如直接写非分区表,这样能直接生成格式化号的 hadoop 文件,还不用外部表。
所以原来的一个 spark 任务分成了两个 spark 任务:

  1. SparkStreaming 任务:负责消费 kafka 数据写入非分区表。
  2. SparkSql 任务:负责将 hive 的非分区表的数据迁移到动态分区表。

SparkStreaming 关键代码如下:

private static void writeHive(SparkSession sparkSession,JavaRDD rdd,String time,String type){

        Dataset<Row> writerTradeData = sparkSession.createDataFrame(rdd,type.equals(TABLE_API_TRADE)?ApiTradeWithoutOrder.class:ApiTradeOrderWithTime.class);
        writerTradeData.createOrReplaceTempView("tmp"+time);
        String sql = "";
        String selectSql = null;
        /**
         * select 的顺序要求与hive中desc表的字段顺序一致
         */
        if(type.equals(TABLE_API_TRADE)){
            selectSql = " select 全部字段(最好不要用*) from tmp"+time;
            sql = String.join(" ","insert into ",TABLE_API_TRADE) ;
        }
        else if(type.equals(TABLE_API_TRADE_ORDER)) {
            selectSql=" select 全部字段(最好不要用*) from tmp" + time;
            sql = String.join(" ","insert into ",TABLE_API_TRADE_ORDER) ;
        }
        sparkSession.sql(String.join(" ",sql,selectSql));
    }

简单说明一下,这个地方 time 是干啥的,其实这个对应分区表的分区字段,还对代码进行了简单的优化,毕竟公司最近在实行阿里编码规范。这个写非分区表是真的快,嗖嗖嗖的。1.5W/7s,这个 7s 还包含了清洗过滤数据的过程,基本没变。

SparkSql 的关键代码:

private static void deal() {
        String insertSql = " insert into ";
        String partitionSql = " partition(tradedate) ";
        String tradeSelectSql = " select 全部字段(最好不要用*) from " + TABLE_API_TRADE;
        String orderSelectSql = " select 全部字段(最好不要用*) from " + TABLE_API_TRADE_ORDER;
        /**
         * 1. 迁移数据 insert into select from
         * 2. 重建非分区表 truncate table
         * point: select 的顺序要求与hive中desc表的字段顺序一致tradeSelectSql,orderSelectSql
         */
        session.sql("USE ".concat(HIVE_DB));
        session.sql(String.join(" ",insertSql, API_TRADE_PARTITIONED_TABLE_NAME,partitionSql,tradeSelectSql));
        session.sql(String.join(" ",insertSql, API_TRADE_ORDER_PARTITIONED_TABLE_NAME,partitionSql,orderSelectSql));
        session.sql("truncate table" + TABLE_API_TRADE);
        session.sql("truncate table" + TABLE_API_TRADE_ORDER);
    }

说白了,这个就是使用 insert into partition(分区字段) select * from

hadoop 崩了

当我满心欢喜的执行 SparkSql 任务的时候,又有噩耗发生了,hadoop 竟然扛不住 hive 表数据迁移,从非分区表写入到分区表,数据总量大约 2000W 左右吧,分了 1.8W 个 task,在执行到 0.8W 的时候,hadoop 第一个 NameNode 崩了,1.4W 的时候第二个 NameNode 崩了,我感觉我也崩了,哎,花了 10 分钟重启全部集群,还是规划好每次写入数据的条数吧,最后调整到了 600W 进行一次 hive 数据迁移。

hive 就会搞事情

你以为迁移完成就完了嘛?NO!NO!NO!我不得统计 hive 数据的条数呀,很不幸的是 hive 有个配置,默认不不开启,所以你的 select count(1) from table 和上帝一样给你开了个玩笑,返回 0,我忙活了这么久你告诉我一条都没写进去。感觉自己和蔡徐坤一样,会唱,会跳,会 rap,会打篮球,就是不会写程序。乖乖的在 hive-env.sh 中加了

# hive shell sql 爆内存溢出可以增大这个值
export HADOOP_HEAPSIZE=4096

重点来了(敲黑板,记到小本本上)

重点来了,因为数据中可能存在\t\r\n 等鬼东西,所以做了替换,并且在 hive 中的字段分割符使用\001,我就不信了用户数据的东西还有\001。最恶心的是\r\n 会影响 hive 的数据行数,是不是听上去有点懵逼?来我告诉你,hive 默认的行分割符是\n,所以如果你的字段里面包含\n,那么恭喜你中奖了,数据会出现裂变(不知道在这里合适不合适)一条变两条,贼开心,厉害点的会变成 10 条,我就出现了 1 变 10 的操作。😰 ,是谁录入的,让我出来打一顿,缺少社会主义的毒打。
其实吧,这个事要解决很简单,我使用\011 或者其他的作为行分割符不就行了嘛,但是很不幸的告诉你,hive 暂时仅支持\n 作为行分割符,难受的一批,乖乖的清洗数据去吧。

标题:使用 spark 从 kafka 消费数据写入 hive 动态分区表(二)
作者:ludengke95
地址:http://www.ludengke95.info/articles/2019/05/20/1558346695894.html

  • Hive
    22 引用 • 7 回帖 • 1 关注
  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 552 关注
  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    36 引用 • 35 回帖
  • 动态分区表
    1 引用 • 2 回帖
1 操作
ludengke95 在 2019-05-20 18:24:13 更新了该帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
  • someone
    作者

    心疼自己

  • 其他回帖
  • someone
    作者

    我写着两篇博客主要是看到网上都是啥也不说了,直接贴代码,代码全是一样的,一点原创精神都没有。我是真的讨厌,百度一下全是 cv 大师。我为什么不贴全部代码,如果我贴全部代码还要你干嘛,我只是提供一个思路,我不敢说你看完这两篇博客之后你一定能够会了这块,起码这块遇到的问题有一种思路可以帮你,如果你有更好的办法可以联系我,大家一起讨论。为了能够通俗易懂博客里面用了简化的 hive,spark 处理逻辑,实际上 sql 和 streaming 的执行比这个复杂的多,如果感兴趣,可以自己去看内部是如何处理的。