flink 自定义 TableSink 输出数据到控制台

本贴最后更新于 2108 天前,其中的信息可能已经渤澥桑田

官方文档 中关于 TableSink 仅给出了接口名称和少量描述,关于接口方法的解释所提甚少。

为了能更好地理解 TableSink 的功能和实现方式,这里写了一个简单的场景:
读取一个英文文本文件,统计各单词出现的次数,并将统计结果转为 Table 对象,再把 Table 内容输出到控制台。

public static void main(String[] args) throws Exception { StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings); // 从 hdfs 读取文件 DataStreamSource<String> text = streamEnv.readTextFile("hdfs://xxxx/file"); // 统计 SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text .flatMap(new Tokenizer()) .keyBy(0) .sum(1); // 将统计结果转为表对象 Table countsTable = tableEnv.fromDataStream(counts, "word,count"); // 创建 TableSink PrintTableSink sink = PrintTableSink.ofConsole(); // 注册 TableSink tableEnv.registerTableSink( "ConsoleSinkTable", new String[]{"word", "count"}, new TypeInformation[]{Types.STRING, Types.INT}, sink ); // 将表内容输出 countsTable.insertInto("ConsoleSinkTable"); // 启动任务 streamEnv.execute("this is job name"); } /** * 分词器 */ public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] words = value.toLowerCase().split("\\W+"); for (String word : words) { if (word.length() > 0) { out.collect(new Tuple2<>(word, 1)); } } } }


自定义 TableSink 的实现:

public class PrintTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> { private String target; private PrintTableSinkFunction sinkFunction; public PrintTableSink(String target) { this.target = target; /** * 重点!!! * * PrintTableSinkFunction 是一个自定义的 SinkFunction * 描述了当接收到一条数据时,该如何 sink 的具体逻辑 */ this.sinkFunction = new PrintTableSinkFunction(target); } /** * 添加当流被消费时的 sink 逻辑 */ @Override public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) { return dataStream.addSink(this.sinkFunction); } /** * 对 "流" 添加 sink 逻辑(单条数据) */ @Override public void emitDataStream(DataStream<Row> dataStream) { dataStream.addSink(this.sinkFunction); } /** * 对 "批" 添加 sink 逻辑(多条数据) */ @Override public void emitDataSet(DataSet<Row> dataSet) { try { List<Row> elements = dataSet.collect(); for (Iterator<Row> it = elements.iterator(); it.hasNext(); ) { Row row = it.next(); this.sinkFunction.invoke(row); } dataSet.print(); } catch (Exception e) { e.printStackTrace(); } } private String[] fieldNames; private TypeInformation<?>[] fieldTypes; /** * 当 StreamTableEnvironment.registerTableSink() 时,会通过此方法完成 TableSink 对象的创建。 * * @param strings 字段名列表 * @param typeInformations 字段类型列表 * @return */ @Override public TableSink<Row> configure(String[] strings, TypeInformation<?>[] typeInformations) { PrintTableSink sink = new PrintTableSink(target); sink.fieldNames = strings; sink.fieldTypes = typeInformations; return sink; } /** * 表的字段列表 */ @Override public String[] getFieldNames() { return fieldNames; } /** * 表字段的数据类型 */ @Override public TypeInformation<?>[] getFieldTypes() { return fieldTypes; } /** * 表字段类型的描述信息 */ @Override public TypeInformation<Row> getOutputType() { return Types.ROW_NAMED(fieldNames, fieldTypes); } /** * 这里定义了当接收到一条数据时,该如何 sink 的具体逻辑 */ public static class PrintTableSinkFunction implements SinkFunction<Row> { private static Logger LOG = LoggerFactory.getLogger(PrintTableSink.class); private String target; public PrintTableSinkFunction(String target) { this.target = target; } @Override public void invoke(Row row, Context context) throws Exception { switch (target) { case "Console": System.out.println(row); break; case "Logger": LOG.info(row.toString()); break; default: } } @Override public void invoke(Row value) throws Exception { invoke(value, null); } } }

如果需要将数据保存到 数据库、Redis、Hadoop 平台 等地方,只需在上面的 PrintTableSinkFunction 内修改输出逻辑即可。

这里只演示了写入的两种情况(AppendStreamTableSink、BatchTableSink),还有支持删除和修改的 RetractStreamTableSink、UpsertStreamTableSink 接口没有演示,我尽量这几天补上 :)

注:以上代码基于 flink 1.9

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
LyZane
我的眼神里,是自由光。 昆明

推荐标签 标签

  • 爬虫

    网络爬虫(Spider、Crawler),是一种按照一定的规则,自动地抓取万维网信息的程序。

    106 引用 • 275 回帖 • 1 关注
  • Outlook
    1 引用 • 5 回帖 • 2 关注
  • Love2D

    Love2D 是一个开源的, 跨平台的 2D 游戏引擎。使用纯 Lua 脚本来进行游戏开发。目前支持的平台有 Windows, Mac OS X, Linux, Android 和 iOS。

    14 引用 • 53 回帖 • 564 关注
  • Caddy

    Caddy 是一款默认自动启用 HTTPS 的 HTTP/2 Web 服务器。

    10 引用 • 54 回帖 • 179 关注
  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 352 关注
  • Webswing

    Webswing 是一个能将任何 Swing 应用通过纯 HTML5 运行在浏览器中的 Web 服务器,详细介绍请看 将 Java Swing 应用变成 Web 应用

    1 引用 • 15 回帖 • 643 关注
  • 又拍云

    又拍云是国内领先的 CDN 服务提供商,国家工信部认证通过的“可信云”,乌云众测平台认证的“安全云”,为移动时代的创业者提供新一代的 CDN 加速服务。

    20 引用 • 37 回帖 • 572 关注
  • 禅道

    禅道是一款国产的开源项目管理软件,她的核心管理思想基于敏捷方法 scrum,内置了产品管理和项目管理,同时又根据国内研发现状补充了测试管理、计划管理、发布管理、文档管理、事务管理等功能,在一个软件中就可以将软件研发中的需求、任务、bug、用例、计划、发布等要素有序的跟踪管理起来,完整地覆盖了项目管理的核心流程。

    10 引用 • 15 回帖 • 1 关注
  • jQuery

    jQuery 是一套跨浏览器的 JavaScript 库,强化 HTML 与 JavaScript 之间的操作。由 John Resig 在 2006 年 1 月的 BarCamp NYC 上释出第一个版本。全球约有 28% 的网站使用 jQuery,是非常受欢迎的 JavaScript 库。

    63 引用 • 134 回帖 • 736 关注
  • RemNote
    2 引用 • 16 回帖 • 26 关注
  • wolai

    我来 wolai:不仅仅是未来的云端笔记!

    2 引用 • 14 回帖
  • Excel
    31 引用 • 28 回帖
  • Markdown

    Markdown 是一种轻量级标记语言,用户可使用纯文本编辑器来排版文档,最终通过 Markdown 引擎将文档转换为所需格式(比如 HTML、PDF 等)。

    172 引用 • 1543 回帖
  • Kubernetes

    Kubernetes 是 Google 开源的一个容器编排引擎,它支持自动化部署、大规模可伸缩、应用容器化管理。

    118 引用 • 54 回帖 • 5 关注
  • Vditor

    Vditor 是一款浏览器端的 Markdown 编辑器,支持所见即所得、即时渲染(类似 Typora)和分屏预览模式。它使用 TypeScript 实现,支持原生 JavaScript、Vue、React 和 Angular。

    376 引用 • 1862 回帖
  • 微服务

    微服务架构是一种架构模式,它提倡将单一应用划分成一组小的服务。服务之间互相协调,互相配合,为用户提供最终价值。每个服务运行在独立的进程中。服务于服务之间才用轻量级的通信机制互相沟通。每个服务都围绕着具体业务构建,能够被独立的部署。

    96 引用 • 155 回帖
  • C++

    C++ 是在 C 语言的基础上开发的一种通用编程语言,应用广泛。C++ 支持多种编程范式,面向对象编程、泛型编程和过程化编程。

    108 引用 • 153 回帖
  • Ant-Design

    Ant Design 是服务于企业级产品的设计体系,基于确定和自然的设计价值观上的模块化解决方案,让设计者和开发者专注于更好的用户体验。

    17 引用 • 23 回帖 • 4 关注
  • Latke

    Latke 是一款以 JSON 为主的 Java Web 框架。

    71 引用 • 535 回帖 • 831 关注
  • 酷鸟浏览器

    安全 · 稳定 · 快速
    为跨境从业人员提供专业的跨境浏览器

    3 引用 • 59 回帖 • 52 关注
  • 分享

    有什么新发现就分享给大家吧!

    248 引用 • 1794 回帖 • 1 关注
  • 深度学习

    深度学习(Deep Learning)是机器学习的分支,是一种试图使用包含复杂结构或由多重非线性变换构成的多个处理层对数据进行高层抽象的算法。

    43 引用 • 44 回帖
  • WiFiDog

    WiFiDog 是一套开源的无线热点认证管理工具,主要功能包括:位置相关的内容递送;用户认证和授权;集中式网络监控。

    1 引用 • 7 回帖 • 612 关注
  • 七牛云

    七牛云是国内领先的企业级公有云服务商,致力于打造以数据为核心的场景化 PaaS 服务。围绕富媒体场景,七牛先后推出了对象存储,融合 CDN 加速,数据通用处理,内容反垃圾服务,以及直播云服务等。

    29 引用 • 230 回帖 • 125 关注
  • Log4j

    Log4j 是 Apache 开源的一款使用广泛的 Java 日志组件。

    20 引用 • 18 回帖 • 38 关注
  • 大疆创新

    深圳市大疆创新科技有限公司(DJI-Innovations,简称 DJI),成立于 2006 年,是全球领先的无人飞行器控制系统及无人机解决方案的研发和生产商,客户遍布全球 100 多个国家。通过持续的创新,大疆致力于为无人机工业、行业用户以及专业航拍应用提供性能最强、体验最佳的革命性智能飞控产品和解决方案。

    2 引用 • 14 回帖
  • LaTeX

    LaTeX(音译“拉泰赫”)是一种基于 ΤΕΧ 的排版系统,由美国计算机学家莱斯利·兰伯特(Leslie Lamport)在 20 世纪 80 年代初期开发,利用这种格式,即使使用者没有排版和程序设计的知识也可以充分发挥由 TeX 所提供的强大功能,能在几天,甚至几小时内生成很多具有书籍质量的印刷品。对于生成复杂表格和数学公式,这一点表现得尤为突出。因此它非常适用于生成高印刷质量的科技和数学类文档。

    12 引用 • 59 回帖