flink 自定义 TableSink 输出数据到控制台

本贴最后更新于 1324 天前,其中的信息可能已经渤澥桑田

官方文档 中关于 TableSink 仅给出了接口名称和少量描述,关于接口方法的解释所提甚少。

为了能更好地理解 TableSink 的功能和实现方式,这里写了一个简单的场景:
读取一个英文文本文件,统计各单词出现的次数,并将统计结果转为 Table 对象,再把 Table 内容输出到控制台。

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

    EnvironmentSettings settings = EnvironmentSettings
                    .newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode().build();

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);


    // 从 hdfs 读取文件
    DataStreamSource<String> text = streamEnv.readTextFile("hdfs://xxxx/file");

    // 统计
    SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text
            .flatMap(new Tokenizer())
            .keyBy(0)
            .sum(1);

    // 将统计结果转为表对象
    Table countsTable = tableEnv.fromDataStream(counts, "word,count");

    // 创建 TableSink
    PrintTableSink sink = PrintTableSink.ofConsole();

    // 注册 TableSink
    tableEnv.registerTableSink(
            "ConsoleSinkTable",
            new String[]{"word", "count"},
            new TypeInformation[]{Types.STRING, Types.INT},
            sink
    );

    // 将表内容输出
    countsTable.insertInto("ConsoleSinkTable");

    // 启动任务
    streamEnv.execute("this is job name");

}

/**
 * 分词器
 */
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        
        String[] words = value.toLowerCase().split("\\W+");

        for (String word : words) {
            if (word.length() > 0) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}


自定义 TableSink 的实现:

public class PrintTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {

    private String target;
    private PrintTableSinkFunction sinkFunction;

    public PrintTableSink(String target) {
        this.target = target;

        /**
         * 重点!!!
         *
         * PrintTableSinkFunction 是一个自定义的 SinkFunction
         * 描述了当接收到一条数据时,该如何 sink 的具体逻辑
         */
        this.sinkFunction = new PrintTableSinkFunction(target);
    }

    /**
     * 添加当流被消费时的 sink 逻辑
     */
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
        return dataStream.addSink(this.sinkFunction);
    }

    /**
     * 对 "流" 添加 sink 逻辑(单条数据)
     */
    @Override
    public void emitDataStream(DataStream<Row> dataStream) {
        dataStream.addSink(this.sinkFunction);
    }

    /**
     * 对 "批" 添加 sink 逻辑(多条数据)
     */
    @Override
    public void emitDataSet(DataSet<Row> dataSet) {

        try {

            List<Row> elements = dataSet.collect();
            for (Iterator<Row> it = elements.iterator(); it.hasNext(); ) {
                Row row = it.next();
                this.sinkFunction.invoke(row);
            }

            dataSet.print();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    private String[] fieldNames;
    private TypeInformation<?>[] fieldTypes;

    /**
     * 当 StreamTableEnvironment.registerTableSink() 时,会通过此方法完成 TableSink 对象的创建。
     *
     * @param strings          字段名列表
     * @param typeInformations 字段类型列表
     * @return
     */
    @Override
    public TableSink<Row> configure(String[] strings, TypeInformation<?>[] typeInformations) {
        PrintTableSink sink = new PrintTableSink(target);
        sink.fieldNames = strings;
        sink.fieldTypes = typeInformations;

        return sink;
    }

    /**
     * 表的字段列表
     */
    @Override
    public String[] getFieldNames() {
        return fieldNames;
    }

    /**
     * 表字段的数据类型
     */
    @Override
    public TypeInformation<?>[] getFieldTypes() {
        return fieldTypes;
    }

    /**
     * 表字段类型的描述信息
     */
    @Override
    public TypeInformation<Row> getOutputType() {
        return Types.ROW_NAMED(fieldNames, fieldTypes);
    }


    /**
     * 这里定义了当接收到一条数据时,该如何 sink 的具体逻辑
     */
    public static class PrintTableSinkFunction implements SinkFunction<Row> {
        private static Logger LOG = LoggerFactory.getLogger(PrintTableSink.class);
        private String target;

        public PrintTableSinkFunction(String target) {
            this.target = target;
        }

        @Override
        public void invoke(Row row, Context context) throws Exception {
            switch (target) {
                case "Console":
                    System.out.println(row);
                    break;
                case "Logger":
                    LOG.info(row.toString());
                    break;
                default:
            }
        }

        @Override
        public void invoke(Row value) throws Exception {
            invoke(value, null);
        }
    }

}

如果需要将数据保存到 数据库、Redis、Hadoop 平台 等地方,只需在上面的 PrintTableSinkFunction 内修改输出逻辑即可。

这里只演示了写入的两种情况(AppendStreamTableSink、BatchTableSink),还有支持删除和修改的 RetractStreamTableSink、UpsertStreamTableSink 接口没有演示,我尽量这几天补上 :)

注:以上代码基于 flink 1.9

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
LyZane
我的眼神里,是自由光。 昆明

推荐标签 标签

  • 机器学习

    机器学习(Machine Learning)是一门多领域交叉学科,涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多门学科。专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能。

    75 引用 • 37 回帖 • 1 关注
  • 自由行
  • Caddy

    Caddy 是一款默认自动启用 HTTPS 的 HTTP/2 Web 服务器。

    10 引用 • 54 回帖 • 108 关注
  • BAE

    百度应用引擎(Baidu App Engine)提供了 PHP、Java、Python 的执行环境,以及云存储、消息服务、云数据库等全面的云服务。它可以让开发者实现自动地部署和管理应用,并且提供动态扩容和负载均衡的运行环境,让开发者不用考虑高成本的运维工作,只需专注于业务逻辑,大大降低了开发者学习和迁移的成本。

    19 引用 • 75 回帖 • 553 关注
  • CAP

    CAP 指的是在一个分布式系统中, Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可兼得。

    11 引用 • 5 回帖 • 533 关注
  • Swift

    Swift 是苹果于 2014 年 WWDC(苹果开发者大会)发布的开发语言,可与 Objective-C 共同运行于 Mac OS 和 iOS 平台,用于搭建基于苹果平台的应用程序。

    34 引用 • 37 回帖 • 441 关注
  • Google

    Google(Google Inc.,NASDAQ:GOOG)是一家美国上市公司(公有股份公司),于 1998 年 9 月 7 日以私有股份公司的形式创立,设计并管理一个互联网搜索引擎。Google 公司的总部称作“Googleplex”,它位于加利福尼亚山景城。Google 目前被公认为是全球规模最大的搜索引擎,它提供了简单易用的免费服务。不作恶(Don't be evil)是谷歌公司的一项非正式的公司口号。

    49 引用 • 192 回帖 • 1 关注
  • Quicker

    Quicker 您的指尖工具箱!操作更少,收获更多!

    17 引用 • 81 回帖
  • abitmean

    有点意思就行了

    2 关注
  • RESTful

    一种软件架构设计风格而不是标准,提供了一组设计原则和约束条件,主要用于客户端和服务器交互类的软件。基于这个风格设计的软件可以更简洁,更有层次,更易于实现缓存等机制。

    30 引用 • 114 回帖
  • 智能合约

    智能合约(Smart contract)是一种旨在以信息化方式传播、验证或执行合同的计算机协议。智能合约允许在没有第三方的情况下进行可信交易,这些交易可追踪且不可逆转。智能合约概念于 1994 年由 Nick Szabo 首次提出。

    1 引用 • 11 回帖 • 10 关注
  • 反馈

    Communication channel for makers and users.

    123 引用 • 893 回帖 • 144 关注
  • 持续集成

    持续集成(Continuous Integration)是一种软件开发实践,即团队开发成员经常集成他们的工作,通过每个成员每天至少集成一次,也就意味着每天可能会发生多次集成。每次集成都通过自动化的构建(包括编译,发布,自动化测试)来验证,从而尽早地发现集成错误。

    14 引用 • 7 回帖 • 1 关注
  • Tomcat

    Tomcat 最早是由 Sun Microsystems 开发的一个 Servlet 容器,在 1999 年被捐献给 ASF(Apache Software Foundation),隶属于 Jakarta 项目,现在已经独立为一个顶级项目。Tomcat 主要实现了 JavaEE 中的 Servlet、JSP 规范,同时也提供 HTTP 服务,是市场上非常流行的 Java Web 容器。

    155 引用 • 529 回帖 • 3 关注
  • 七牛云

    七牛云是国内领先的企业级公有云服务商,致力于打造以数据为核心的场景化 PaaS 服务。围绕富媒体场景,七牛先后推出了对象存储,融合 CDN 加速,数据通用处理,内容反垃圾服务,以及直播云服务等。

    23 引用 • 206 回帖 • 167 关注
  • SQLite

    SQLite 是一个进程内的库,实现了自给自足的、无服务器的、零配置的、事务性的 SQL 数据库引擎。SQLite 是全世界使用最为广泛的数据库引擎。

    4 引用 • 7 回帖 • 1 关注
  • VirtualBox

    VirtualBox 是一款开源虚拟机软件,最早由德国 Innotek 公司开发,由 Sun Microsystems 公司出品的软件,使用 Qt 编写,在 Sun 被 Oracle 收购后正式更名成 Oracle VM VirtualBox。

    10 引用 • 2 回帖 • 3 关注
  • 大疆创新

    深圳市大疆创新科技有限公司(DJI-Innovations,简称 DJI),成立于 2006 年,是全球领先的无人飞行器控制系统及无人机解决方案的研发和生产商,客户遍布全球 100 多个国家。通过持续的创新,大疆致力于为无人机工业、行业用户以及专业航拍应用提供性能最强、体验最佳的革命性智能飞控产品和解决方案。

    2 引用 • 14 回帖 • 1 关注
  • CloudFoundry

    Cloud Foundry 是 VMware 推出的业界第一个开源 PaaS 云平台,它支持多种框架、语言、运行时环境、云平台及应用服务,使开发人员能够在几秒钟内进行应用程序的部署和扩展,无需担心任何基础架构的问题。

    5 引用 • 18 回帖 • 99 关注
  • 工具

    子曰:“工欲善其事,必先利其器。”

    257 引用 • 648 回帖
  • flomo

    flomo 是新一代 「卡片笔记」 ,专注在碎片化时代,促进你的记录,帮你积累更多知识资产。

    5 引用 • 35 回帖 • 2 关注
  • 星云链

    星云链是一个开源公链,业内简单的将其称为区块链上的谷歌。其实它不仅仅是区块链搜索引擎,一个公链的所有功能,它基本都有,比如你可以用它来开发部署你的去中心化的 APP,你可以在上面编写智能合约,发送交易等等。3 分钟快速接入星云链 (NAS) 测试网

    3 引用 • 16 回帖 • 28 关注
  • FFmpeg

    FFmpeg 是一套可以用来记录、转换数字音频、视频,并能将其转化为流的开源计算机程序。

    22 引用 • 31 回帖
  • V2EX

    V2EX 是创意工作者们的社区。这里目前汇聚了超过 400,000 名主要来自互联网行业、游戏行业和媒体行业的创意工作者。V2EX 希望能够成为创意工作者们的生活和事业的一部分。

    17 引用 • 236 回帖 • 572 关注
  • 电影

    这是一个不能说的秘密。

    119 引用 • 594 回帖
  • Ant-Design

    Ant Design 是服务于企业级产品的设计体系,基于确定和自然的设计价值观上的模块化解决方案,让设计者和开发者专注于更好的用户体验。

    17 引用 • 23 回帖 • 1 关注
  • Unity

    Unity 是由 Unity Technologies 开发的一个让开发者可以轻松创建诸如 2D、3D 多平台的综合型游戏开发工具,是一个全面整合的专业游戏引擎。

    22 引用 • 7 回帖 • 315 关注