流批一体在京东的探索与实践

本贴最后更新于 832 天前,其中的信息可能已经时移世异

01 整体思考

提到流批一体,不得不提传统的大数据平台 —— Lambda 架构。它能够有效地支撑离线和实时的数据开发需求,但它流和批两条数据链路割裂所导致的高开发维护成本以及数据口径不一致是无法忽视的缺陷。

通过一套数据链路来同时满足流和批的数据处理需求是最理想的情况,即流批一体。此外我们认为流批一体还存在一些中间阶段,比如只实现计算的统一或者只实现存储的统一也是有重大意义的。

以只实现计算统一为例,有一些数据应用的实时性要求比较高,比如希望端到端的数据处理延时不超过一秒钟,这对目前开源的、适合作为流批统一的存储来说是一个很大的挑战。以数据湖为例,它的数据可见性与 commit 的间隔相关,进而与 Flink 做 checkpoint 的时间间隔相关,此特性结合数据处理链路的长度,可见做到端到端一秒钟的处理并不容易。因此对于这类需求,只实现计算统一也是可行的。通过计算统一去降低用户的开发及维护成本,解决数据口径不一致的问题。

在流批一体技术落地的过程中,面临的挑战可以总结为以下 4 个方面:

  • 首先是数据实时性。如何把端到端的数据时延降低到秒级别是一个很大的挑战,因为它同时涉及到计算引擎及存储技术。它本质上属于性能问题,也是一个长期目标。
  • 第二个挑战是如何兼容好在数据处理领域已经广泛应用的离线批处理能力。此处涉及开发和调度两个层面的问题,开发层面主要是复用的问题,比如如何复用已经存在的离线表的数据模型,如何复用用户已经在使用的自定义开发的 Hive UDF 等。调度层面的问题主要是如何合理地与调度系统进行集成。
  • 第三个挑战是资源及部署问题。比如通过不同类型的流、批应用的混合部署来提高资源利用率,以及如何基于 metrics 来构建弹性伸缩能力,进一步提高资源利用率。
  • 最后一个挑战也是最困难的一个:用户观念。大多数用户对于比较新的技术理念通常仅限于技术交流或者验证,即使验证之后觉得可以解决实际问题,也需要等待合适的业务来试水。这个问题也催生了一些思考,平台侧一定要多站在用户的视角看待问题,合理地评估对用户的现有技术架构的改动成本以及用户收益、业务迁移的潜在风险等。

上图是京东实时计算平台的全景图,也是我们实现流批一体能力的载体。中间的 Flink 基于开源社区版本深度定制。基于该版本构建的集群,外部依赖包含三个部分,JDOS、HDFS/CFS 和 Zookeeper。

  • JDOS 是京东的 Kubernetes 平台,目前我们所有 Flink 计算任务容器化的,都运行在这套平台之上;
  • Flink 的状态后端有 HDFS 和 CFS 两种选择,其中 CFS 是京东自研的对象存储;
  • Flink 集群的高可用是基于 Zookeeper 构建的。

在应用开发方式方面,平台提供 SQL 和 Jar 包两种方式,其中 Jar 的方式支持用户直接上传 Flink 应用 Jar 包或者提供 Git 地址由平台来负责打包。除此之外我们平台化的功能也相对比较完善,比如基础的元数据服务、SQL 调试功能,产品端支持所有的参数配置,以及基于 metrics 的监控、任务日志查询等。

连接数据源方面,平台通过 connector 支持了丰富的数据源类型,其中 JDQ 基于开源 Kafka 定制,主要应用于大数据场景的消息队列;JMQ 是京东自研,主要应用于在线系统的消息队列;JimDB 是京东自研的分布式 KV 存储。

在当前 Lambda 架构中,假设实时链路的数据存储在 JDQ,离线链路的数据存在 Hive 表中,即便计算的是同一业务模型,元数据的定义也常常是存在差异的,因此我们引入统一的逻辑模型来兼容实时离线两边的元数据。

在计算环节,通过 FlinkSQL 结合 UDF 的方式来实现业务逻辑的流批统一计算,此外平台会提供大量的公用 UDF,同时也支持用户上传自定义 UDF。针对计算结果的输出,我们同样引入统一的逻辑模型来屏蔽流批两端的差异。对于只实现计算统一的场景,可以将计算结果分别写入流批各自对应的存储,以保证数据的实时性与先前保持一致。

对于同时实现计算统一和存储统一的场景,我们可以将计算的结果直接写入到流批统一的存储。我们选择了 Iceberg 作为流批统一的存储,因为它拥有良好的架构设计,比如不会绑定到某一个特定的引擎等。

在兼容批处理能力方面,我们主要进行了以下三个方面的工作:

第一,复用离线数仓中的 Hive 表。

以数据源端为例,为了屏蔽上图左侧图中流、批两端元数据的差异,我们定义了逻辑模型 gdm_order_m 表,并且需要用户显示地指定 Hive 表和 Topic 中的字段与这张逻辑表中字段的映射关系。这里映射关系的定义非常重要,因为基于 FlinkSQL 的计算只需面向这张逻辑表,而无需关心实际的 Hive 表与 Topic 中的字段信息。在运行时通过 connector 创建流表和批表的时候,逻辑表中的字段会通过映射关系被替换成实际的字段。

在产品端,我们可以给逻辑表分别绑定流表和批表,通过拖拽的方式来指定字段之间的映射关系。这种模式使得我们的开发方式与之前有所差异,之前的方式是先新建一个任务并指定是流任务还是批任务,然后进行 SQL 开发,再去指定任务相关的配置,最后发布任务。而在流批一体模式下,开发模式变为了首先完成 SQL 的开发,其中包括逻辑的、物理的 DDL 的定义,以及它们之间的字段映射关系的指定,DML 的编写等,然后分别指定流批任务相关的配置,最后发布成流批两个任务。

第二,与调度系统打通。

离线数仓的数据加工基本是以 Hive/Spark 结合调度的模式,以上图中居中的图为例,数据的加工被分为 4 个阶段,分别对应数仓的 BDM、FDM、GDM 和 ADM 层。随着 Flink 能力的增强,用户希望把 GDM 层的数据加工任务替换为 FlinkSQL 的批任务,这就需要把 FlinkSQL 批任务嵌入到当前的数据加工过程中,作为中间的一个环节。

为了解决这个问题,除了任务本身支持配置调度规则,我们还打通了调度系统,从中继承了父任务的依赖关系,并将任务自身的信息同步到调度系统中,支持作为下游任务的父任务,从而实现了将 FlinkSQL 的批任务作为原数据加工的其中一个环节。

第三,对用户自定义的 Hive UDF、UDAF 及 UDTF 的复用。

对于现存的基于 Hive 的离线加工任务,如果用户已经开发了 UDF 函数,那么最理想的方式是在迁移 Flink 时对这些 UDF 进行直接复用,而不是按照 Flink UDF 定义重新实现。

在 UDF 的兼容问题上,针对使用 Hive 内置函数的场景,社区提供了 load hive modules 方案。如果用户希望使用自己开发的 Hive UDF,可以通过使用 create catalog、use catalog、create function,最后在 DML 中调用的方式来实现, 这个过程会将 Function 的信息注册到 Hive 的 Metastore 中。从平台管理的角度,我们希望用户的 UDF 具备一定的隔离性,限制用户 Job 的粒度,减少与 Hive Metastore 交互以及产生脏函数元数据的风险。

此外,当元信息已经被注册过,希望下次能在 Flink 平台端正常使用,如果不使用 if not exist 语法,通常需要先 drop function,再进行 create 操作。但是这种方式不够优雅,同时也对用户的使用方式有限制。另一种解决方法是用户可以注册临时的 Hive UDF,在 Flink1.12 中注册临时 UDF 的方式是 create temporary function,但是该 Function 需要实现 UserDefinedFunction 接口后才能通过后面的校验,否则会注册失败。

所以我们并没有使用 create temporary function,而是对 create function 做了一些调整,扩展了 ExtFunctionModule,将解析出来的 FunctionDefinition 注册到 ExtFunctionModule 中,做了一次 Job 级别的临时注册。这样的好处就是不会污染 Hive Metastore,提供了良好的隔离性,同时也没有对用户的使用习惯产生限制,提供了良好的体验。

不过这个问题在社区 1.13 的版本已经得到了综合的解决。通过引入 Hive 解析器等扩展,已经可以把实现 UDF、GenericUDF 接口的自定义 Hive 函数通过 create temporary function 语法进行注册和使用。

资源占用方面,流处理和批处理是天然错峰的。对于批处理,离线数仓每天 0 点开始计算过去一整天的数据,所有的离线报表的数据加工会在第二天上班前全部完成,所以通常 00:00 到 8:00 是批计算任务大量占用资源的时间段,而这个时间段通常在线的流量都比较低。流处理的负载与在线的流量是正相关的,所以这个时间段流处理的资源需求是比较低的。上午 8 点到晚上 0 点,在线的流量比较高,而这个时间段批处理的任务大部分都不会被触发执行。

基于这种天然的错峰,我们可以通过在专属的 JDOS Zone 中进行不同类型的流批应用的混部来提升资源的使用率,并且如果统一使用 Flink 引擎来处理流批应用,资源的使用率会更高。

同时为了使应用可以基于流量进行动态调整,我们还开发了自动弹性伸缩的服务 (Auto-Scaling Service)。它的工作原理如下:运行在平台上的 Flink 任务上报 metrics 信息到 metrics 系统,Auto-Scaling Service 会基于 metrics 系统中的一些关键指标,比如 TaskManager 的 CPU 使用率、任务的背压情况等来判定任务是否需要增减计算资源,并把调整的结果反馈给 JRC 平台,JRC 平台通过内嵌的 fabric 客户端将调整的结果同步到 JDOS 平台,从而完成对 TaskManager Pod 个数的调整。此外,用户可以在 JRC 平台上通过配置来决定是否为任务开启此功能。

上图右侧图表是我们在 JDOS Zone 中进行流批混部并结合弹性伸缩服务试点测试时的 CPU 使用情况。可以看到 0 点流任务进行了缩容,将资源释放给批任务。我们设置的新任务在 2 点开始执行,所以从 2 点开始直到早上批任务结束这段时间,CPU 的使用率都比较高,最高到 80% 以上。批任务运行结束后,在线流量开始增长时,流任务进行了扩容,CPU 的使用率也随之上升。

更多内容请看:

https://blog.stackanswer.com/articles/2022/07/01/1656663988707.html

  • FlinkSQL
    2 引用
  • 流批一体
    1 引用
  • Lambda
    23 引用 • 19 回帖
  • 大数据

    大数据(big data)是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    93 引用 • 113 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • 运维

    互联网运维工作,以服务为中心,以稳定、安全、高效为三个基本点,确保公司的互联网业务能够 7×24 小时为用户提供高质量的服务。

    148 引用 • 257 回帖
  • 游戏

    沉迷游戏伤身,强撸灰飞烟灭。

    176 引用 • 815 回帖
  • Facebook

    Facebook 是一个联系朋友的社交工具。大家可以通过它和朋友、同事、同学以及周围的人保持互动交流,分享无限上传的图片,发布链接和视频,更可以增进对朋友的了解。

    4 引用 • 15 回帖 • 466 关注
  • 倾城之链
    23 引用 • 66 回帖 • 138 关注
  • 博客

    记录并分享人生的经历。

    272 引用 • 2386 回帖
  • PWA

    PWA(Progressive Web App)是 Google 在 2015 年提出、2016 年 6 月开始推广的项目。它结合了一系列现代 Web 技术,在网页应用中实现和原生应用相近的用户体验。

    14 引用 • 69 回帖 • 140 关注
  • Typecho

    Typecho 是一款博客程序,它在 GPLv2 许可证下发行,基于 PHP 构建,可以运行在各种平台上,支持多种数据库(MySQL、PostgreSQL、SQLite)。

    12 引用 • 65 回帖 • 457 关注
  • SQLite

    SQLite 是一个进程内的库,实现了自给自足的、无服务器的、零配置的、事务性的 SQL 数据库引擎。SQLite 是全世界使用最为广泛的数据库引擎。

    5 引用 • 7 回帖 • 2 关注
  • 阿里巴巴

    阿里巴巴网络技术有限公司(简称:阿里巴巴集团)是以曾担任英语教师的马云为首的 18 人,于 1999 年在中国杭州创立,他们相信互联网能够创造公平的竞争环境,让小企业通过创新与科技扩展业务,并在参与国内或全球市场竞争时处于更有利的位置。

    43 引用 • 221 回帖 • 141 关注
  • QQ

    1999 年 2 月腾讯正式推出“腾讯 QQ”,在线用户由 1999 年的 2 人(马化腾和张志东)到现在已经发展到上亿用户了,在线人数超过一亿,是目前使用最广泛的聊天软件之一。

    45 引用 • 557 回帖 • 114 关注
  • 国际化

    i18n(其来源是英文单词 internationalization 的首末字符 i 和 n,18 为中间的字符数)是“国际化”的简称。对程序来说,国际化是指在不修改代码的情况下,能根据不同语言及地区显示相应的界面。

    8 引用 • 26 回帖
  • Logseq

    Logseq 是一个隐私优先、开源的知识库工具。

    Logseq is a joyful, open-source outliner that works on top of local plain-text Markdown and Org-mode files. Use it to write, organize and share your thoughts, keep your to-do list, and build your own digital garden.

    5 引用 • 62 回帖
  • Eclipse

    Eclipse 是一个开放源代码的、基于 Java 的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。

    75 引用 • 258 回帖 • 624 关注
  • 爬虫

    网络爬虫(Spider、Crawler),是一种按照一定的规则,自动地抓取万维网信息的程序。

    106 引用 • 275 回帖
  • MyBatis

    MyBatis 本是 Apache 软件基金会 的一个开源项目 iBatis,2010 年这个项目由 Apache 软件基金会迁移到了 google code,并且改名为 MyBatis ,2013 年 11 月再次迁移到了 GitHub。

    170 引用 • 414 回帖 • 387 关注
  • 书籍

    宋真宗赵恒曾经说过:“书中自有黄金屋,书中自有颜如玉。”

    77 引用 • 390 回帖
  • C

    C 语言是一门通用计算机编程语言,应用广泛。C 语言的设计目标是提供一种能以简易的方式编译、处理低级存储器、产生少量的机器码以及不需要任何运行环境支持便能运行的编程语言。

    85 引用 • 165 回帖 • 4 关注
  • SOHO

    为成为自由职业者在家办公而努力吧!

    7 引用 • 55 回帖 • 29 关注
  • Vditor

    Vditor 是一款浏览器端的 Markdown 编辑器,支持所见即所得、即时渲染(类似 Typora)和分屏预览模式。它使用 TypeScript 实现,支持原生 JavaScript、Vue、React 和 Angular。

    345 引用 • 1783 回帖 • 3 关注
  • ZeroNet

    ZeroNet 是一个基于比特币加密技术和 BT 网络技术的去中心化的、开放开源的网络和交流系统。

    1 引用 • 21 回帖 • 627 关注
  • Angular

    AngularAngularJS 的新版本。

    26 引用 • 66 回帖 • 534 关注
  • Hibernate

    Hibernate 是一个开放源代码的对象关系映射框架,它对 JDBC 进行了非常轻量级的对象封装,使得 Java 程序员可以随心所欲的使用对象编程思维来操纵数据库。

    39 引用 • 103 回帖 • 707 关注
  • 旅游

    希望你我能在旅途中找到人生的下一站。

    90 引用 • 898 回帖 • 2 关注
  • 996
    13 引用 • 200 回帖
  • PHP

    PHP(Hypertext Preprocessor)是一种开源脚本语言。语法吸收了 C 语言、 Java 和 Perl 的特点,主要适用于 Web 开发领域,据说是世界上最好的编程语言。

    179 引用 • 407 回帖 • 494 关注
  • Hprose

    Hprose 是一款先进的轻量级、跨语言、跨平台、无侵入式、高性能动态远程对象调用引擎库。它不仅简单易用,而且功能强大。你无需专门学习,只需看上几眼,就能用它轻松构建分布式应用系统。

    9 引用 • 17 回帖 • 617 关注
  • SEO

    发布对别人有帮助的原创内容是最好的 SEO 方式。

    35 引用 • 200 回帖 • 15 关注