flink 自定义 TableSink 输出数据到控制台

本贴最后更新于 2169 天前,其中的信息可能已经渤澥桑田

官方文档 中关于 TableSink 仅给出了接口名称和少量描述,关于接口方法的解释所提甚少。

为了能更好地理解 TableSink 的功能和实现方式,这里写了一个简单的场景:
读取一个英文文本文件,统计各单词出现的次数,并将统计结果转为 Table 对象,再把 Table 内容输出到控制台。

public static void main(String[] args) throws Exception { StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings); // 从 hdfs 读取文件 DataStreamSource<String> text = streamEnv.readTextFile("hdfs://xxxx/file"); // 统计 SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text .flatMap(new Tokenizer()) .keyBy(0) .sum(1); // 将统计结果转为表对象 Table countsTable = tableEnv.fromDataStream(counts, "word,count"); // 创建 TableSink PrintTableSink sink = PrintTableSink.ofConsole(); // 注册 TableSink tableEnv.registerTableSink( "ConsoleSinkTable", new String[]{"word", "count"}, new TypeInformation[]{Types.STRING, Types.INT}, sink ); // 将表内容输出 countsTable.insertInto("ConsoleSinkTable"); // 启动任务 streamEnv.execute("this is job name"); } /** * 分词器 */ public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] words = value.toLowerCase().split("\\W+"); for (String word : words) { if (word.length() > 0) { out.collect(new Tuple2<>(word, 1)); } } } }


自定义 TableSink 的实现:

public class PrintTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> { private String target; private PrintTableSinkFunction sinkFunction; public PrintTableSink(String target) { this.target = target; /** * 重点!!! * * PrintTableSinkFunction 是一个自定义的 SinkFunction * 描述了当接收到一条数据时,该如何 sink 的具体逻辑 */ this.sinkFunction = new PrintTableSinkFunction(target); } /** * 添加当流被消费时的 sink 逻辑 */ @Override public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) { return dataStream.addSink(this.sinkFunction); } /** * 对 "流" 添加 sink 逻辑(单条数据) */ @Override public void emitDataStream(DataStream<Row> dataStream) { dataStream.addSink(this.sinkFunction); } /** * 对 "批" 添加 sink 逻辑(多条数据) */ @Override public void emitDataSet(DataSet<Row> dataSet) { try { List<Row> elements = dataSet.collect(); for (Iterator<Row> it = elements.iterator(); it.hasNext(); ) { Row row = it.next(); this.sinkFunction.invoke(row); } dataSet.print(); } catch (Exception e) { e.printStackTrace(); } } private String[] fieldNames; private TypeInformation<?>[] fieldTypes; /** * 当 StreamTableEnvironment.registerTableSink() 时,会通过此方法完成 TableSink 对象的创建。 * * @param strings 字段名列表 * @param typeInformations 字段类型列表 * @return */ @Override public TableSink<Row> configure(String[] strings, TypeInformation<?>[] typeInformations) { PrintTableSink sink = new PrintTableSink(target); sink.fieldNames = strings; sink.fieldTypes = typeInformations; return sink; } /** * 表的字段列表 */ @Override public String[] getFieldNames() { return fieldNames; } /** * 表字段的数据类型 */ @Override public TypeInformation<?>[] getFieldTypes() { return fieldTypes; } /** * 表字段类型的描述信息 */ @Override public TypeInformation<Row> getOutputType() { return Types.ROW_NAMED(fieldNames, fieldTypes); } /** * 这里定义了当接收到一条数据时,该如何 sink 的具体逻辑 */ public static class PrintTableSinkFunction implements SinkFunction<Row> { private static Logger LOG = LoggerFactory.getLogger(PrintTableSink.class); private String target; public PrintTableSinkFunction(String target) { this.target = target; } @Override public void invoke(Row row, Context context) throws Exception { switch (target) { case "Console": System.out.println(row); break; case "Logger": LOG.info(row.toString()); break; default: } } @Override public void invoke(Row value) throws Exception { invoke(value, null); } } }

如果需要将数据保存到 数据库、Redis、Hadoop 平台 等地方,只需在上面的 PrintTableSinkFunction 内修改输出逻辑即可。

这里只演示了写入的两种情况(AppendStreamTableSink、BatchTableSink),还有支持删除和修改的 RetractStreamTableSink、UpsertStreamTableSink 接口没有演示,我尽量这几天补上 :)

注:以上代码基于 flink 1.9

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
LyZane
我的眼神里,是自由光。 昆明

推荐标签 标签

  • API

    应用程序编程接口(Application Programming Interface)是一些预先定义的函数,目的是提供应用程序与开发人员基于某软件或硬件得以访问一组例程的能力,而又无需访问源码,或理解内部工作机制的细节。

    79 引用 • 431 回帖
  • 大疆创新

    深圳市大疆创新科技有限公司(DJI-Innovations,简称 DJI),成立于 2006 年,是全球领先的无人飞行器控制系统及无人机解决方案的研发和生产商,客户遍布全球 100 多个国家。通过持续的创新,大疆致力于为无人机工业、行业用户以及专业航拍应用提供性能最强、体验最佳的革命性智能飞控产品和解决方案。

    2 引用 • 14 回帖
  • Flume

    Flume 是一套分布式的、可靠的,可用于有效地收集、聚合和搬运大量日志数据的服务架构。

    9 引用 • 6 回帖 • 668 关注
  • Google

    Google(Google Inc.,NASDAQ:GOOG)是一家美国上市公司(公有股份公司),于 1998 年 9 月 7 日以私有股份公司的形式创立,设计并管理一个互联网搜索引擎。Google 公司的总部称作“Googleplex”,它位于加利福尼亚山景城。Google 目前被公认为是全球规模最大的搜索引擎,它提供了简单易用的免费服务。不作恶(Don't be evil)是谷歌公司的一项非正式的公司口号。

    50 引用 • 192 回帖
  • 设计模式

    设计模式(Design pattern)代表了最佳的实践,通常被有经验的面向对象的软件开发人员所采用。设计模式是软件开发人员在软件开发过程中面临的一般问题的解决方案。这些解决方案是众多软件开发人员经过相当长的一段时间的试验和错误总结出来的。

    201 引用 • 120 回帖
  • wolai

    我来 wolai:不仅仅是未来的云端笔记!

    2 引用 • 14 回帖
  • 酷鸟浏览器

    安全 · 稳定 · 快速
    为跨境从业人员提供专业的跨境浏览器

    3 引用 • 59 回帖 • 68 关注
  • H2

    H2 是一个开源的嵌入式数据库引擎,采用 Java 语言编写,不受平台的限制,同时 H2 提供了一个十分方便的 web 控制台用于操作和管理数据库内容。H2 还提供兼容模式,可以兼容一些主流的数据库,因此采用 H2 作为开发期的数据库非常方便。

    11 引用 • 54 回帖 • 677 关注
  • ZooKeeper

    ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 HBase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    61 引用 • 29 回帖 • 1 关注
  • uTools

    uTools 是一个极简、插件化、跨平台的现代桌面软件。通过自由选配丰富的插件,打造你得心应手的工具集合。

    8 引用 • 37 回帖
  • 互联网

    互联网(Internet),又称网际网络,或音译因特网、英特网。互联网始于 1969 年美国的阿帕网,是网络与网络之间所串连成的庞大网络,这些网络以一组通用的协议相连,形成逻辑上的单一巨大国际网络。

    98 引用 • 367 回帖 • 1 关注
  • iOS

    iOS 是由苹果公司开发的移动操作系统,最早于 2007 年 1 月 9 日的 Macworld 大会上公布这个系统,最初是设计给 iPhone 使用的,后来陆续套用到 iPod touch、iPad 以及 Apple TV 等产品上。iOS 与苹果的 Mac OS X 操作系统一样,属于类 Unix 的商业操作系统。

    89 引用 • 150 回帖
  • DevOps

    DevOps(Development 和 Operations 的组合词)是一组过程、方法与系统的统称,用于促进开发(应用程序/软件工程)、技术运营和质量保障(QA)部门之间的沟通、协作与整合。

    59 引用 • 25 回帖
  • Access
    1 引用 • 3 回帖 • 1 关注
  • 倾城之链
    23 引用 • 66 回帖 • 181 关注
  • CSDN

    CSDN (Chinese Software Developer Network) 创立于 1999 年,是中国的 IT 社区和服务平台,为中国的软件开发者和 IT 从业者提供知识传播、职业发展、软件开发等全生命周期服务,满足他们在职业发展中学习及共享知识和信息、建立职业发展社交圈、通过软件开发实现技术商业化等刚性需求。

    14 引用 • 155 回帖
  • WebClipper

    Web Clipper 是一款浏览器剪藏扩展,它可以帮助你把网页内容剪藏到本地。

    3 引用 • 9 回帖
  • BookxNote

    BookxNote 是一款全新的电子书学习工具,助力您的学习与思考,让您的大脑更高效的记忆。

    笔记整理交给我,一心只读圣贤书。

    1 引用 • 1 回帖 • 1 关注
  • TextBundle

    TextBundle 文件格式旨在应用程序之间交换 Markdown 或 Fountain 之类的纯文本文件时,提供更无缝的用户体验。

    1 引用 • 2 回帖 • 82 关注
  • 创业

    你比 99% 的人都优秀么?

    81 引用 • 1395 回帖
  • AWS
    11 引用 • 28 回帖 • 6 关注
  • 星云链

    星云链是一个开源公链,业内简单的将其称为区块链上的谷歌。其实它不仅仅是区块链搜索引擎,一个公链的所有功能,它基本都有,比如你可以用它来开发部署你的去中心化的 APP,你可以在上面编写智能合约,发送交易等等。3 分钟快速接入星云链 (NAS) 测试网

    3 引用 • 16 回帖 • 5 关注
  • Telegram

    Telegram 是一个非盈利性、基于云端的即时消息服务。它提供了支持各大操作系统平台的开源的客户端,也提供了很多强大的 APIs 给开发者创建自己的客户端和机器人。

    5 引用 • 35 回帖
  • Vue.js

    Vue.js(读音 /vju ː/,类似于 view)是一个构建数据驱动的 Web 界面库。Vue.js 的目标是通过尽可能简单的 API 实现响应的数据绑定和组合的视图组件。

    268 引用 • 666 回帖
  • danl
    191 关注
  • 自由行
    6 关注
  • 区块链

    区块链是分布式数据存储、点对点传输、共识机制、加密算法等计算机技术的新型应用模式。所谓共识机制是区块链系统中实现不同节点之间建立信任、获取权益的数学算法 。

    92 引用 • 752 回帖