流批一体在京东的探索与实践

本贴最后更新于 913 天前,其中的信息可能已经时移世异

01 整体思考

提到流批一体,不得不提传统的大数据平台 —— Lambda 架构。它能够有效地支撑离线和实时的数据开发需求,但它流和批两条数据链路割裂所导致的高开发维护成本以及数据口径不一致是无法忽视的缺陷。

通过一套数据链路来同时满足流和批的数据处理需求是最理想的情况,即流批一体。此外我们认为流批一体还存在一些中间阶段,比如只实现计算的统一或者只实现存储的统一也是有重大意义的。

以只实现计算统一为例,有一些数据应用的实时性要求比较高,比如希望端到端的数据处理延时不超过一秒钟,这对目前开源的、适合作为流批统一的存储来说是一个很大的挑战。以数据湖为例,它的数据可见性与 commit 的间隔相关,进而与 Flink 做 checkpoint 的时间间隔相关,此特性结合数据处理链路的长度,可见做到端到端一秒钟的处理并不容易。因此对于这类需求,只实现计算统一也是可行的。通过计算统一去降低用户的开发及维护成本,解决数据口径不一致的问题。

在流批一体技术落地的过程中,面临的挑战可以总结为以下 4 个方面:

  • 首先是数据实时性。如何把端到端的数据时延降低到秒级别是一个很大的挑战,因为它同时涉及到计算引擎及存储技术。它本质上属于性能问题,也是一个长期目标。
  • 第二个挑战是如何兼容好在数据处理领域已经广泛应用的离线批处理能力。此处涉及开发和调度两个层面的问题,开发层面主要是复用的问题,比如如何复用已经存在的离线表的数据模型,如何复用用户已经在使用的自定义开发的 Hive UDF 等。调度层面的问题主要是如何合理地与调度系统进行集成。
  • 第三个挑战是资源及部署问题。比如通过不同类型的流、批应用的混合部署来提高资源利用率,以及如何基于 metrics 来构建弹性伸缩能力,进一步提高资源利用率。
  • 最后一个挑战也是最困难的一个:用户观念。大多数用户对于比较新的技术理念通常仅限于技术交流或者验证,即使验证之后觉得可以解决实际问题,也需要等待合适的业务来试水。这个问题也催生了一些思考,平台侧一定要多站在用户的视角看待问题,合理地评估对用户的现有技术架构的改动成本以及用户收益、业务迁移的潜在风险等。

上图是京东实时计算平台的全景图,也是我们实现流批一体能力的载体。中间的 Flink 基于开源社区版本深度定制。基于该版本构建的集群,外部依赖包含三个部分,JDOS、HDFS/CFS 和 Zookeeper。

  • JDOS 是京东的 Kubernetes 平台,目前我们所有 Flink 计算任务容器化的,都运行在这套平台之上;
  • Flink 的状态后端有 HDFS 和 CFS 两种选择,其中 CFS 是京东自研的对象存储;
  • Flink 集群的高可用是基于 Zookeeper 构建的。

在应用开发方式方面,平台提供 SQL 和 Jar 包两种方式,其中 Jar 的方式支持用户直接上传 Flink 应用 Jar 包或者提供 Git 地址由平台来负责打包。除此之外我们平台化的功能也相对比较完善,比如基础的元数据服务、SQL 调试功能,产品端支持所有的参数配置,以及基于 metrics 的监控、任务日志查询等。

连接数据源方面,平台通过 connector 支持了丰富的数据源类型,其中 JDQ 基于开源 Kafka 定制,主要应用于大数据场景的消息队列;JMQ 是京东自研,主要应用于在线系统的消息队列;JimDB 是京东自研的分布式 KV 存储。

在当前 Lambda 架构中,假设实时链路的数据存储在 JDQ,离线链路的数据存在 Hive 表中,即便计算的是同一业务模型,元数据的定义也常常是存在差异的,因此我们引入统一的逻辑模型来兼容实时离线两边的元数据。

在计算环节,通过 FlinkSQL 结合 UDF 的方式来实现业务逻辑的流批统一计算,此外平台会提供大量的公用 UDF,同时也支持用户上传自定义 UDF。针对计算结果的输出,我们同样引入统一的逻辑模型来屏蔽流批两端的差异。对于只实现计算统一的场景,可以将计算结果分别写入流批各自对应的存储,以保证数据的实时性与先前保持一致。

对于同时实现计算统一和存储统一的场景,我们可以将计算的结果直接写入到流批统一的存储。我们选择了 Iceberg 作为流批统一的存储,因为它拥有良好的架构设计,比如不会绑定到某一个特定的引擎等。

在兼容批处理能力方面,我们主要进行了以下三个方面的工作:

第一,复用离线数仓中的 Hive 表。

以数据源端为例,为了屏蔽上图左侧图中流、批两端元数据的差异,我们定义了逻辑模型 gdm_order_m 表,并且需要用户显示地指定 Hive 表和 Topic 中的字段与这张逻辑表中字段的映射关系。这里映射关系的定义非常重要,因为基于 FlinkSQL 的计算只需面向这张逻辑表,而无需关心实际的 Hive 表与 Topic 中的字段信息。在运行时通过 connector 创建流表和批表的时候,逻辑表中的字段会通过映射关系被替换成实际的字段。

在产品端,我们可以给逻辑表分别绑定流表和批表,通过拖拽的方式来指定字段之间的映射关系。这种模式使得我们的开发方式与之前有所差异,之前的方式是先新建一个任务并指定是流任务还是批任务,然后进行 SQL 开发,再去指定任务相关的配置,最后发布任务。而在流批一体模式下,开发模式变为了首先完成 SQL 的开发,其中包括逻辑的、物理的 DDL 的定义,以及它们之间的字段映射关系的指定,DML 的编写等,然后分别指定流批任务相关的配置,最后发布成流批两个任务。

第二,与调度系统打通。

离线数仓的数据加工基本是以 Hive/Spark 结合调度的模式,以上图中居中的图为例,数据的加工被分为 4 个阶段,分别对应数仓的 BDM、FDM、GDM 和 ADM 层。随着 Flink 能力的增强,用户希望把 GDM 层的数据加工任务替换为 FlinkSQL 的批任务,这就需要把 FlinkSQL 批任务嵌入到当前的数据加工过程中,作为中间的一个环节。

为了解决这个问题,除了任务本身支持配置调度规则,我们还打通了调度系统,从中继承了父任务的依赖关系,并将任务自身的信息同步到调度系统中,支持作为下游任务的父任务,从而实现了将 FlinkSQL 的批任务作为原数据加工的其中一个环节。

第三,对用户自定义的 Hive UDF、UDAF 及 UDTF 的复用。

对于现存的基于 Hive 的离线加工任务,如果用户已经开发了 UDF 函数,那么最理想的方式是在迁移 Flink 时对这些 UDF 进行直接复用,而不是按照 Flink UDF 定义重新实现。

在 UDF 的兼容问题上,针对使用 Hive 内置函数的场景,社区提供了 load hive modules 方案。如果用户希望使用自己开发的 Hive UDF,可以通过使用 create catalog、use catalog、create function,最后在 DML 中调用的方式来实现, 这个过程会将 Function 的信息注册到 Hive 的 Metastore 中。从平台管理的角度,我们希望用户的 UDF 具备一定的隔离性,限制用户 Job 的粒度,减少与 Hive Metastore 交互以及产生脏函数元数据的风险。

此外,当元信息已经被注册过,希望下次能在 Flink 平台端正常使用,如果不使用 if not exist 语法,通常需要先 drop function,再进行 create 操作。但是这种方式不够优雅,同时也对用户的使用方式有限制。另一种解决方法是用户可以注册临时的 Hive UDF,在 Flink1.12 中注册临时 UDF 的方式是 create temporary function,但是该 Function 需要实现 UserDefinedFunction 接口后才能通过后面的校验,否则会注册失败。

所以我们并没有使用 create temporary function,而是对 create function 做了一些调整,扩展了 ExtFunctionModule,将解析出来的 FunctionDefinition 注册到 ExtFunctionModule 中,做了一次 Job 级别的临时注册。这样的好处就是不会污染 Hive Metastore,提供了良好的隔离性,同时也没有对用户的使用习惯产生限制,提供了良好的体验。

不过这个问题在社区 1.13 的版本已经得到了综合的解决。通过引入 Hive 解析器等扩展,已经可以把实现 UDF、GenericUDF 接口的自定义 Hive 函数通过 create temporary function 语法进行注册和使用。

资源占用方面,流处理和批处理是天然错峰的。对于批处理,离线数仓每天 0 点开始计算过去一整天的数据,所有的离线报表的数据加工会在第二天上班前全部完成,所以通常 00:00 到 8:00 是批计算任务大量占用资源的时间段,而这个时间段通常在线的流量都比较低。流处理的负载与在线的流量是正相关的,所以这个时间段流处理的资源需求是比较低的。上午 8 点到晚上 0 点,在线的流量比较高,而这个时间段批处理的任务大部分都不会被触发执行。

基于这种天然的错峰,我们可以通过在专属的 JDOS Zone 中进行不同类型的流批应用的混部来提升资源的使用率,并且如果统一使用 Flink 引擎来处理流批应用,资源的使用率会更高。

同时为了使应用可以基于流量进行动态调整,我们还开发了自动弹性伸缩的服务 (Auto-Scaling Service)。它的工作原理如下:运行在平台上的 Flink 任务上报 metrics 信息到 metrics 系统,Auto-Scaling Service 会基于 metrics 系统中的一些关键指标,比如 TaskManager 的 CPU 使用率、任务的背压情况等来判定任务是否需要增减计算资源,并把调整的结果反馈给 JRC 平台,JRC 平台通过内嵌的 fabric 客户端将调整的结果同步到 JDOS 平台,从而完成对 TaskManager Pod 个数的调整。此外,用户可以在 JRC 平台上通过配置来决定是否为任务开启此功能。

上图右侧图表是我们在 JDOS Zone 中进行流批混部并结合弹性伸缩服务试点测试时的 CPU 使用情况。可以看到 0 点流任务进行了缩容,将资源释放给批任务。我们设置的新任务在 2 点开始执行,所以从 2 点开始直到早上批任务结束这段时间,CPU 的使用率都比较高,最高到 80% 以上。批任务运行结束后,在线流量开始增长时,流任务进行了扩容,CPU 的使用率也随之上升。

更多内容请看:

https://blog.stackanswer.com/articles/2022/07/01/1656663988707.html

  • FlinkSQL
    2 引用
  • 流批一体
    1 引用
  • Lambda
    24 引用 • 19 回帖
  • 大数据

    大数据(big data)是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    93 引用 • 113 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • 学习

    “梦想从学习开始,事业从实践起步” —— 习近平

    170 引用 • 513 回帖
  • Thymeleaf

    Thymeleaf 是一款用于渲染 XML/XHTML/HTML5 内容的模板引擎。类似 Velocity、 FreeMarker 等,它也可以轻易的与 Spring 等 Web 框架进行集成作为 Web 应用的模板引擎。与其它模板引擎相比,Thymeleaf 最大的特点是能够直接在浏览器中打开并正确显示模板页面,而不需要启动整个 Web 应用。

    11 引用 • 19 回帖 • 369 关注
  • Facebook

    Facebook 是一个联系朋友的社交工具。大家可以通过它和朋友、同事、同学以及周围的人保持互动交流,分享无限上传的图片,发布链接和视频,更可以增进对朋友的了解。

    4 引用 • 15 回帖 • 439 关注
  • Markdown

    Markdown 是一种轻量级标记语言,用户可使用纯文本编辑器来排版文档,最终通过 Markdown 引擎将文档转换为所需格式(比如 HTML、PDF 等)。

    167 引用 • 1521 回帖 • 1 关注
  • 小说

    小说是以刻画人物形象为中心,通过完整的故事情节和环境描写来反映社会生活的文学体裁。

    28 引用 • 108 回帖
  • Netty

    Netty 是一个基于 NIO 的客户端-服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    49 引用 • 33 回帖 • 24 关注
  • HBase

    HBase 是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的 Google 论文 “Bigtable:一个结构化数据的分布式存储系统”。就像 Bigtable 利用了 Google 文件系统所提供的分布式数据存储一样,HBase 在 Hadoop 之上提供了类似于 Bigtable 的能力。

    17 引用 • 6 回帖 • 75 关注
  • JSON

    JSON (JavaScript Object Notation)是一种轻量级的数据交换格式。易于人类阅读和编写。同时也易于机器解析和生成。

    52 引用 • 190 回帖 • 1 关注
  • 职场

    找到自己的位置,萌新烦恼少。

    127 引用 • 1706 回帖
  • GitBook

    GitBook 使您的团队可以轻松编写和维护高质量的文档。 分享知识,提高团队的工作效率,让用户满意。

    3 引用 • 8 回帖
  • 钉钉

    钉钉,专为中国企业打造的免费沟通协同多端平台, 阿里巴巴出品。

    15 引用 • 67 回帖 • 335 关注
  • Gitea

    Gitea 是一个开源社区驱动的轻量级代码托管解决方案,后端采用 Go 编写,采用 MIT 许可证。

    4 引用 • 16 回帖 • 2 关注
  • IBM

    IBM(国际商业机器公司)或万国商业机器公司,简称 IBM(International Business Machines Corporation),总公司在纽约州阿蒙克市。1911 年托马斯·沃森创立于美国,是全球最大的信息技术和业务解决方案公司,拥有全球雇员 30 多万人,业务遍及 160 多个国家和地区。

    17 引用 • 53 回帖 • 138 关注
  • 书籍

    宋真宗赵恒曾经说过:“书中自有黄金屋,书中自有颜如玉。”

    78 引用 • 391 回帖
  • jsoup

    jsoup 是一款 Java 的 HTML 解析器,可直接解析某个 URL 地址、HTML 文本内容。它提供了一套非常省力的 API,可通过 DOM,CSS 以及类似于 jQuery 的操作方法来取出和操作数据。

    6 引用 • 1 回帖 • 487 关注
  • Mac

    Mac 是苹果公司自 1984 年起以“Macintosh”开始开发的个人消费型计算机,如:iMac、Mac mini、Macbook Air、Macbook Pro、Macbook、Mac Pro 等计算机。

    166 引用 • 595 回帖 • 1 关注
  • 单点登录

    单点登录(Single Sign On)是目前比较流行的企业业务整合的解决方案之一。SSO 的定义是在多个应用系统中,用户只需要登录一次就可以访问所有相互信任的应用系统。

    9 引用 • 25 回帖 • 2 关注
  • Dubbo

    Dubbo 是一个分布式服务框架,致力于提供高性能和透明化的 RPC 远程服务调用方案,是 [阿里巴巴] SOA 服务化治理方案的核心框架,每天为 2,000+ 个服务提供 3,000,000,000+ 次访问量支持,并被广泛应用于阿里巴巴集团的各成员站点。

    60 引用 • 82 回帖 • 605 关注
  • 智能合约

    智能合约(Smart contract)是一种旨在以信息化方式传播、验证或执行合同的计算机协议。智能合约允许在没有第三方的情况下进行可信交易,这些交易可追踪且不可逆转。智能合约概念于 1994 年由 Nick Szabo 首次提出。

    1 引用 • 11 回帖 • 2 关注
  • Ant-Design

    Ant Design 是服务于企业级产品的设计体系,基于确定和自然的设计价值观上的模块化解决方案,让设计者和开发者专注于更好的用户体验。

    17 引用 • 23 回帖 • 5 关注
  • RIP

    愿逝者安息!

    8 引用 • 92 回帖 • 366 关注
  • SMTP

    SMTP(Simple Mail Transfer Protocol)即简单邮件传输协议,它是一组用于由源地址到目的地址传送邮件的规则,由它来控制信件的中转方式。SMTP 协议属于 TCP/IP 协议簇,它帮助每台计算机在发送或中转信件时找到下一个目的地。

    4 引用 • 18 回帖 • 624 关注
  • Redis

    Redis 是一个开源的使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,并提供多种语言的 API。从 2010 年 3 月 15 日起,Redis 的开发工作由 VMware 主持。从 2013 年 5 月开始,Redis 的开发由 Pivotal 赞助。

    286 引用 • 248 回帖 • 45 关注
  • Docker

    Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的操作系统上。容器完全使用沙箱机制,几乎没有性能开销,可以很容易地在机器和数据中心中运行。

    492 引用 • 927 回帖
  • 数据库

    据说 99% 的性能瓶颈都在数据库。

    343 引用 • 723 回帖
  • Caddy

    Caddy 是一款默认自动启用 HTTPS 的 HTTP/2 Web 服务器。

    12 引用 • 54 回帖 • 161 关注
  • 倾城之链
    23 引用 • 66 回帖 • 141 关注