概要
本文先从上一篇中的本地输入输出出发,先制作从 Kafka 输入,再制作输出到 MySQL
本地输入输出
代码
package com.abeffect.blink; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class WordCount { public static void main(String[] args) throws Exception { // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // get input data DataSet<String> text = env.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles," ); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new LineSplitter()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1); // execute and print result counts.print(); } public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } } }
kafka 输入, stdout 输出
代码
输出类 StdoutSink.java
package com.abeffect.blink; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; public class StdoutSink extends RichSinkFunction<Tuple1<String>> { @Override public void invoke(Tuple1<String> value) throws Exception { System.out.println(value.f0); } }
执行类 KafkaCount.java
package com.abeffect.blink; import org.apache.commons.lang.StringUtils; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import java.util.Properties; public class KafkaCount { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); DataStream<String> sourceStream = env .addSource(new FlinkKafkaConsumer010<>("fw-blink-test", new SimpleStringSchema(), properties)); DataStream<Tuple1<String>> sourceStreamTra = sourceStream.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { return StringUtils.isNotBlank(value); } }).map(new MapFunction<String, Tuple1<String>>() { private static final long serialVersionUID = 1L; @Override public Tuple1<String> map(String value) throws Exception { String[] args = value.split(":"); return new Tuple1<String>(args[0]); } }); sourceStreamTra.addSink(new StdoutSink()); env.execute("data to stdout start"); } }
kafka 输入测试
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic fw-blink-test
结果查看
$ tailf flink-abeffect-jobmanager-0-fox.local.out 3 1 2 3 11 12 13
kafka 输入, mysql 输出
代码
输出类 MySQLSink.java
package com.abeffect.blink; import java.sql.DriverManager; import java.sql.Connection; import java.sql.PreparedStatement; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; public class MySQLSink extends RichSinkFunction<Tuple1<String>> { private static final long serialVersionUID = 1L; private Connection connection; private PreparedStatement preparedStatement; String username = "root"; String password = "toor"; String drivername = "com.mysql.jdbc.Driver"; String dburl = "jdbc:mysql://localhost:3306/blink_test"; @Override public void invoke(Tuple1<String> value) throws Exception { Class.forName(drivername); connection = DriverManager.getConnection(dburl, username, password); String sql = "insert into sink0 (`key`) values (?)"; preparedStatement = connection.prepareStatement(sql); preparedStatement.setString(1, value.f0); preparedStatement.executeUpdate(); if (preparedStatement != null) { preparedStatement.close(); } if (connection != null) { connection.close(); } } }
执行类 KafkaCount.java
package com.abeffect.blink; import org.apache.commons.lang.StringUtils; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import java.util.Properties; public class KafkaCount { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); DataStream<String> sourceStream = env .addSource(new FlinkKafkaConsumer010<>("fw-blink-test", new SimpleStringSchema(), properties)); DataStream<Tuple1<String>> sourceStreamTra = sourceStream.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { return StringUtils.isNotBlank(value); } }).map(new MapFunction<String, Tuple1<String>>() { private static final long serialVersionUID = 1L; @Override public Tuple1<String> map(String value) throws Exception { String[] args = value.split(":"); return new Tuple1<String>(args[0]); } }); sourceStreamTra.addSink(new MySQLSink()); env.execute("data to mysql start"); } }
kafka 输入测试
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic fw-blink-test
结果查看
mysql> select * from sink0; +----+------+ | id | key | +----+------+ | 1 | 000 | | 2 | a2 | | 3 | a3 | | 4 | b1 | | 5 | b2 | | 6 | b3 | +----+------+
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于