flink 自定义 TableSink 输出数据到控制台

本贴最后更新于 1713 天前,其中的信息可能已经渤澥桑田

官方文档 中关于 TableSink 仅给出了接口名称和少量描述,关于接口方法的解释所提甚少。

为了能更好地理解 TableSink 的功能和实现方式,这里写了一个简单的场景:
读取一个英文文本文件,统计各单词出现的次数,并将统计结果转为 Table 对象,再把 Table 内容输出到控制台。

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

    EnvironmentSettings settings = EnvironmentSettings
                    .newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode().build();

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);


    // 从 hdfs 读取文件
    DataStreamSource<String> text = streamEnv.readTextFile("hdfs://xxxx/file");

    // 统计
    SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text
            .flatMap(new Tokenizer())
            .keyBy(0)
            .sum(1);

    // 将统计结果转为表对象
    Table countsTable = tableEnv.fromDataStream(counts, "word,count");

    // 创建 TableSink
    PrintTableSink sink = PrintTableSink.ofConsole();

    // 注册 TableSink
    tableEnv.registerTableSink(
            "ConsoleSinkTable",
            new String[]{"word", "count"},
            new TypeInformation[]{Types.STRING, Types.INT},
            sink
    );

    // 将表内容输出
    countsTable.insertInto("ConsoleSinkTable");

    // 启动任务
    streamEnv.execute("this is job name");

}

/**
 * 分词器
 */
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        
        String[] words = value.toLowerCase().split("\\W+");

        for (String word : words) {
            if (word.length() > 0) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}


自定义 TableSink 的实现:

public class PrintTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {

    private String target;
    private PrintTableSinkFunction sinkFunction;

    public PrintTableSink(String target) {
        this.target = target;

        /**
         * 重点!!!
         *
         * PrintTableSinkFunction 是一个自定义的 SinkFunction
         * 描述了当接收到一条数据时,该如何 sink 的具体逻辑
         */
        this.sinkFunction = new PrintTableSinkFunction(target);
    }

    /**
     * 添加当流被消费时的 sink 逻辑
     */
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
        return dataStream.addSink(this.sinkFunction);
    }

    /**
     * 对 "流" 添加 sink 逻辑(单条数据)
     */
    @Override
    public void emitDataStream(DataStream<Row> dataStream) {
        dataStream.addSink(this.sinkFunction);
    }

    /**
     * 对 "批" 添加 sink 逻辑(多条数据)
     */
    @Override
    public void emitDataSet(DataSet<Row> dataSet) {

        try {

            List<Row> elements = dataSet.collect();
            for (Iterator<Row> it = elements.iterator(); it.hasNext(); ) {
                Row row = it.next();
                this.sinkFunction.invoke(row);
            }

            dataSet.print();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    private String[] fieldNames;
    private TypeInformation<?>[] fieldTypes;

    /**
     * 当 StreamTableEnvironment.registerTableSink() 时,会通过此方法完成 TableSink 对象的创建。
     *
     * @param strings          字段名列表
     * @param typeInformations 字段类型列表
     * @return
     */
    @Override
    public TableSink<Row> configure(String[] strings, TypeInformation<?>[] typeInformations) {
        PrintTableSink sink = new PrintTableSink(target);
        sink.fieldNames = strings;
        sink.fieldTypes = typeInformations;

        return sink;
    }

    /**
     * 表的字段列表
     */
    @Override
    public String[] getFieldNames() {
        return fieldNames;
    }

    /**
     * 表字段的数据类型
     */
    @Override
    public TypeInformation<?>[] getFieldTypes() {
        return fieldTypes;
    }

    /**
     * 表字段类型的描述信息
     */
    @Override
    public TypeInformation<Row> getOutputType() {
        return Types.ROW_NAMED(fieldNames, fieldTypes);
    }


    /**
     * 这里定义了当接收到一条数据时,该如何 sink 的具体逻辑
     */
    public static class PrintTableSinkFunction implements SinkFunction<Row> {
        private static Logger LOG = LoggerFactory.getLogger(PrintTableSink.class);
        private String target;

        public PrintTableSinkFunction(String target) {
            this.target = target;
        }

        @Override
        public void invoke(Row row, Context context) throws Exception {
            switch (target) {
                case "Console":
                    System.out.println(row);
                    break;
                case "Logger":
                    LOG.info(row.toString());
                    break;
                default:
            }
        }

        @Override
        public void invoke(Row value) throws Exception {
            invoke(value, null);
        }
    }

}

如果需要将数据保存到 数据库、Redis、Hadoop 平台 等地方,只需在上面的 PrintTableSinkFunction 内修改输出逻辑即可。

这里只演示了写入的两种情况(AppendStreamTableSink、BatchTableSink),还有支持删除和修改的 RetractStreamTableSink、UpsertStreamTableSink 接口没有演示,我尽量这几天补上 :)

注:以上代码基于 flink 1.9

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
LyZane
我的眼神里,是自由光。 昆明

推荐标签 标签

  • 笔记

    好记性不如烂笔头。

    306 引用 • 782 回帖
  • V2EX

    V2EX 是创意工作者们的社区。这里目前汇聚了超过 400,000 名主要来自互联网行业、游戏行业和媒体行业的创意工作者。V2EX 希望能够成为创意工作者们的生活和事业的一部分。

    17 引用 • 236 回帖 • 397 关注
  • HBase

    HBase 是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的 Google 论文 “Bigtable:一个结构化数据的分布式存储系统”。就像 Bigtable 利用了 Google 文件系统所提供的分布式数据存储一样,HBase 在 Hadoop 之上提供了类似于 Bigtable 的能力。

    17 引用 • 6 回帖 • 58 关注
  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 561 关注
  • 微信

    腾讯公司 2011 年 1 月 21 日推出的一款手机通讯软件。用户可以通过摇一摇、搜索号码、扫描二维码等添加好友和关注公众平台,同时可以将自己看到的精彩内容分享到微信朋友圈。

    130 引用 • 793 回帖
  • 房星科技

    房星网,我们不和没有钱的程序员谈理想,我们要让程序员又有理想又有钱。我们有雄厚的房地产行业线下资源,遍布昆明全城的 100 家门店、四千地产经纪人是我们坚实的后盾。

    6 引用 • 141 回帖 • 570 关注
  • CSDN

    CSDN (Chinese Software Developer Network) 创立于 1999 年,是中国的 IT 社区和服务平台,为中国的软件开发者和 IT 从业者提供知识传播、职业发展、软件开发等全生命周期服务,满足他们在职业发展中学习及共享知识和信息、建立职业发展社交圈、通过软件开发实现技术商业化等刚性需求。

    14 引用 • 155 回帖
  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    35 引用 • 35 回帖 • 2 关注
  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    941 引用 • 1458 回帖 • 122 关注
  • Netty

    Netty 是一个基于 NIO 的客户端-服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    49 引用 • 33 回帖 • 24 关注
  • 新人

    让我们欢迎这对新人。哦,不好意思说错了,让我们欢迎这位新人!
    新手上路,请谨慎驾驶!

    51 引用 • 226 回帖
  • IBM

    IBM(国际商业机器公司)或万国商业机器公司,简称 IBM(International Business Machines Corporation),总公司在纽约州阿蒙克市。1911 年托马斯·沃森创立于美国,是全球最大的信息技术和业务解决方案公司,拥有全球雇员 30 多万人,业务遍及 160 多个国家和地区。

    16 引用 • 53 回帖 • 121 关注
  • Dubbo

    Dubbo 是一个分布式服务框架,致力于提供高性能和透明化的 RPC 远程服务调用方案,是 [阿里巴巴] SOA 服务化治理方案的核心框架,每天为 2,000+ 个服务提供 3,000,000,000+ 次访问量支持,并被广泛应用于阿里巴巴集团的各成员站点。

    60 引用 • 82 回帖 • 610 关注
  • WebClipper

    Web Clipper 是一款浏览器剪藏扩展,它可以帮助你把网页内容剪藏到本地。

    3 引用 • 9 回帖
  • V2Ray
    1 引用 • 15 回帖
  • abitmean

    有点意思就行了

    28 关注
  • 招聘

    哪里都缺人,哪里都不缺人。

    189 引用 • 1056 回帖
  • Vue.js

    Vue.js(读音 /vju ː/,类似于 view)是一个构建数据驱动的 Web 界面库。Vue.js 的目标是通过尽可能简单的 API 实现响应的数据绑定和组合的视图组件。

    262 引用 • 664 回帖
  • 七牛云

    七牛云是国内领先的企业级公有云服务商,致力于打造以数据为核心的场景化 PaaS 服务。围绕富媒体场景,七牛先后推出了对象存储,融合 CDN 加速,数据通用处理,内容反垃圾服务,以及直播云服务等。

    26 引用 • 222 回帖 • 165 关注
  • Latke

    Latke 是一款以 JSON 为主的 Java Web 框架。

    70 引用 • 532 回帖 • 725 关注
  • WebSocket

    WebSocket 是 HTML5 中定义的一种新协议,它实现了浏览器与服务器之间的全双工通信(full-duplex)。

    48 引用 • 206 回帖 • 387 关注
  • Sublime

    Sublime Text 是一款可以用来写代码、写文章的文本编辑器。支持代码高亮、自动完成,还支持通过插件进行扩展。

    10 引用 • 5 回帖 • 1 关注
  • 书籍

    宋真宗赵恒曾经说过:“书中自有黄金屋,书中自有颜如玉。”

    76 引用 • 390 回帖
  • LeetCode

    LeetCode(力扣)是一个全球极客挚爱的高质量技术成长平台,想要学习和提升专业能力从这里开始,充足技术干货等你来啃,轻松拿下 Dream Offer!

    209 引用 • 72 回帖
  • C++

    C++ 是在 C 语言的基础上开发的一种通用编程语言,应用广泛。C++ 支持多种编程范式,面向对象编程、泛型编程和过程化编程。

    106 引用 • 152 回帖
  • TGIF

    Thank God It's Friday! 感谢老天,总算到星期五啦!

    285 引用 • 4482 回帖 • 656 关注
  • SendCloud

    SendCloud 由搜狐武汉研发中心孵化的项目,是致力于为开发者提供高质量的触发邮件服务的云端邮件发送平台,为开发者提供便利的 API 接口来调用服务,让邮件准确迅速到达用户收件箱并获得强大的追踪数据。

    2 引用 • 8 回帖 • 448 关注