使用 spark 从 kafka 消费数据写入 hive 动态分区表(二)
上次咱们说到数据从 kafka 到 hive,也从 hive 非分区表到分区表的迁移。经过测试发现曲线救国的方法虽然 kafka 到 hive 快了,但是 hive 非分区表到分区表贼慢,再一次难受,不着急咱们慢慢来分析原因。
分析日志
拿到日志文件看看什么日志最多,什么操作最耗时间。日志大体分为一下几类:
- 初始化日志:这个耗时忽略不计(包括分配 executor,创建临时文件夹啥的等等)。
- task 日志:这个贼重要,可以知道那个 task 执行失败,task 位于那个几点,总共有多少任务,每个任务的执行时间,每个任务的分区(具体是从哪里看,下面两行日志一目了然呀)。
Starting task 53.0 in stage 0.0 (TID 47, IP, executor 5, partition 53, PROCESS_LOCAL, 4920 bytes)
Finished task 53.0 in stage 0.0 (TID 40) in 76 ms on IP(executor 5) (24/3135)
- Rename 日志:就是讲数据写入的临时文件重命名为对应的 hive 表数据文件。
metadata.Hive: Renaming src: hdfs://分区表文件存储路径/.hive-staging_hive_2019-05-20_10-24-43_342_2230325038880850463-1/-ext-10000/tradedate=2017-12-02/part-00086-e563b05e-3202-4510-8951-3d05d246c279.c000, dest: hdfs://分区表文件存储路径/tradedate=2017-12-02/part-00086-e563b05e-3202-4510-8951-3d05d246c279.c000, Status:true
经过对比时间发现:task 执行时间也就几十秒,Ranme 执行时间几分钟,甚至十几分钟。
对症下药
问题找到就是 Rename 阶段时间长拖慢了整个进度,想办法解决呗。
- 原版 sql 执行:
insert into partition(分区字段) select * from
,可以理解为把非分区表的文件生成备份,然后把备份 rename 成对应表的分区的数据文件(当然其实内部不是这样的,这样只是方便理解,如果是这样的分区字段怎么办呢?有兴趣可以去了解一下,这个 sql 是怎么执行的)。 - 新版 sql 执行:
insert into partition(分区字段) select * from table DISTRIBUTE BY 分区字段
。DISTRIBUTE BY 就可以理解为 mysql 中的 group by,就是分组,但是这个分组可不一样,这个会大大减少 reduce 的数量(相比之前的 sql,每个文件中的每个分区字段对应一个 reduce 结果集)。现在的 reduce 数量 ≈ 之前的 reduce 数量/文件数。
我为什么这么着重的说 reduce,因为 reduce 的数量一一对应 hive 表数据文件的数量(仅仅针对本次写入涉及的分区)。reduce 的数量减少,那么临时文件数量减少,那么 rename 的对象就少,大大减少了 rename 所消耗的时间。
重点
hive 非分区表到分区表的数据迁移 sql:
insert into partition(分区字段) select * from table DISTRIBUTE BY 分区字段
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于