官方文档 中关于 TableSink 仅给出了接口名称和少量描述,关于接口方法的解释所提甚少。
为了能更好地理解 TableSink 的功能和实现方式,这里写了一个简单的场景:
读取一个英文文本文件,统计各单词出现的次数,并将统计结果转为 Table 对象,再把 Table 内容输出到控制台。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
// 从 hdfs 读取文件
DataStreamSource<String> text = streamEnv.readTextFile("hdfs://xxxx/file");
// 统计
SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);
// 将统计结果转为表对象
Table countsTable = tableEnv.fromDataStream(counts, "word,count");
// 创建 TableSink
PrintTableSink sink = PrintTableSink.ofConsole();
// 注册 TableSink
tableEnv.registerTableSink(
"ConsoleSinkTable",
new String[]{"word", "count"},
new TypeInformation[]{Types.STRING, Types.INT},
sink
);
// 将表内容输出
countsTable.insertInto("ConsoleSinkTable");
// 启动任务
streamEnv.execute("this is job name");
}
/**
* 分词器
*/
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] words = value.toLowerCase().split("\\W+");
for (String word : words) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
自定义 TableSink 的实现:
public class PrintTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
private String target;
private PrintTableSinkFunction sinkFunction;
public PrintTableSink(String target) {
this.target = target;
/**
* 重点!!!
*
* PrintTableSinkFunction 是一个自定义的 SinkFunction
* 描述了当接收到一条数据时,该如何 sink 的具体逻辑
*/
this.sinkFunction = new PrintTableSinkFunction(target);
}
/**
* 添加当流被消费时的 sink 逻辑
*/
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
return dataStream.addSink(this.sinkFunction);
}
/**
* 对 "流" 添加 sink 逻辑(单条数据)
*/
@Override
public void emitDataStream(DataStream<Row> dataStream) {
dataStream.addSink(this.sinkFunction);
}
/**
* 对 "批" 添加 sink 逻辑(多条数据)
*/
@Override
public void emitDataSet(DataSet<Row> dataSet) {
try {
List<Row> elements = dataSet.collect();
for (Iterator<Row> it = elements.iterator(); it.hasNext(); ) {
Row row = it.next();
this.sinkFunction.invoke(row);
}
dataSet.print();
} catch (Exception e) {
e.printStackTrace();
}
}
private String[] fieldNames;
private TypeInformation<?>[] fieldTypes;
/**
* 当 StreamTableEnvironment.registerTableSink() 时,会通过此方法完成 TableSink 对象的创建。
*
* @param strings 字段名列表
* @param typeInformations 字段类型列表
* @return
*/
@Override
public TableSink<Row> configure(String[] strings, TypeInformation<?>[] typeInformations) {
PrintTableSink sink = new PrintTableSink(target);
sink.fieldNames = strings;
sink.fieldTypes = typeInformations;
return sink;
}
/**
* 表的字段列表
*/
@Override
public String[] getFieldNames() {
return fieldNames;
}
/**
* 表字段的数据类型
*/
@Override
public TypeInformation<?>[] getFieldTypes() {
return fieldTypes;
}
/**
* 表字段类型的描述信息
*/
@Override
public TypeInformation<Row> getOutputType() {
return Types.ROW_NAMED(fieldNames, fieldTypes);
}
/**
* 这里定义了当接收到一条数据时,该如何 sink 的具体逻辑
*/
public static class PrintTableSinkFunction implements SinkFunction<Row> {
private static Logger LOG = LoggerFactory.getLogger(PrintTableSink.class);
private String target;
public PrintTableSinkFunction(String target) {
this.target = target;
}
@Override
public void invoke(Row row, Context context) throws Exception {
switch (target) {
case "Console":
System.out.println(row);
break;
case "Logger":
LOG.info(row.toString());
break;
default:
}
}
@Override
public void invoke(Row value) throws Exception {
invoke(value, null);
}
}
}
如果需要将数据保存到 数据库、Redis、Hadoop 平台 等地方,只需在上面的 PrintTableSinkFunction 内修改输出逻辑即可。
这里只演示了写入的两种情况(AppendStreamTableSink、BatchTableSink),还有支持删除和修改的 RetractStreamTableSink、UpsertStreamTableSink 接口没有演示,我尽量这几天补上 :)
注:以上代码基于 flink 1.9
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于