Flink(2)——定制输入输出源

本贴最后更新于 2343 天前,其中的信息可能已经沧海桑田

概要

本文先从上一篇中的本地输入输出出发,先制作从 Kafka 输入,再制作输出到 MySQL

本地输入输出

代码

package com.abeffect.blink;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCount {

	public static void main(String[] args) throws Exception {

		// set up the execution environment
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		// get input data
		DataSet<String> text = env.fromElements(
				"To be, or not to be,--that is the question:--",
				"Whether 'tis nobler in the mind to suffer",
				"The slings and arrows of outrageous fortune",
				"Or to take arms against a sea of troubles,"
				);

		DataSet<Tuple2<String, Integer>> counts =
				// split up the lines in pairs (2-tuples) containing: (word,1)
				text.flatMap(new LineSplitter())
				// group by the tuple field "0" and sum up tuple field "1"
				.groupBy(0)
				.sum(1);

		// execute and print result
		counts.print();

	}

	public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

		@Override
		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
			// normalize and split the line
			String[] tokens = value.toLowerCase().split("\\W+");

			// emit the pairs
			for (String token : tokens) {
				if (token.length() > 0) {
					out.collect(new Tuple2<String, Integer>(token, 1));
				}
			}
		}
	}
}

kafka 输入, stdout 输出

代码

输出类 StdoutSink.java

package com.abeffect.blink;

import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class StdoutSink extends
        RichSinkFunction<Tuple1<String>> {

    @Override
    public void invoke(Tuple1<String> value) throws Exception {
        System.out.println(value.f0);
    }
}

执行类 KafkaCount.java

package com.abeffect.blink;

import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.Properties;

public class KafkaCount {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        DataStream<String> sourceStream = env
                .addSource(new FlinkKafkaConsumer010<>("fw-blink-test", new SimpleStringSchema(), properties));

        DataStream<Tuple1<String>> sourceStreamTra = sourceStream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return StringUtils.isNotBlank(value);
            }
        }).map(new MapFunction<String, Tuple1<String>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple1<String> map(String value)
                    throws Exception {
                String[] args = value.split(":");
                return new Tuple1<String>(args[0]);
            }
        });

        sourceStreamTra.addSink(new StdoutSink());
        env.execute("data to stdout start");
    }
}

kafka 输入测试

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic fw-blink-test

结果查看

$ tailf flink-abeffect-jobmanager-0-fox.local.out
3
1
2
3
11
12
13

kafka 输入, mysql 输出

代码

输出类 MySQLSink.java

package com.abeffect.blink;

import java.sql.DriverManager;
import java.sql.Connection;
import java.sql.PreparedStatement;

import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class MySQLSink extends
        RichSinkFunction<Tuple1<String>> {

    private static final long serialVersionUID = 1L;
    private Connection connection;
    private PreparedStatement preparedStatement;
    String username = "root";
    String password = "toor";
    String drivername = "com.mysql.jdbc.Driver";
    String dburl = "jdbc:mysql://localhost:3306/blink_test";

    @Override
    public void invoke(Tuple1<String> value) throws Exception {
        Class.forName(drivername);
        connection = DriverManager.getConnection(dburl, username, password);
        String sql = "insert into sink0 (`key`) values (?)";
        preparedStatement = connection.prepareStatement(sql);
        preparedStatement.setString(1, value.f0);
        preparedStatement.executeUpdate();
        if (preparedStatement != null) {
            preparedStatement.close();
        }
        if (connection != null) {
            connection.close();
        }
    }
}

执行类 KafkaCount.java

package com.abeffect.blink;

import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.Properties;

public class KafkaCount {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        DataStream<String> sourceStream = env
                .addSource(new FlinkKafkaConsumer010<>("fw-blink-test", new SimpleStringSchema(), properties));

        DataStream<Tuple1<String>> sourceStreamTra = sourceStream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return StringUtils.isNotBlank(value);
            }
        }).map(new MapFunction<String, Tuple1<String>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple1<String> map(String value)
                    throws Exception {
                String[] args = value.split(":");
                return new Tuple1<String>(args[0]);
            }
        });

        sourceStreamTra.addSink(new MySQLSink());
        env.execute("data to mysql start");
    }
}

kafka 输入测试

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic fw-blink-test

结果查看

mysql> select * from sink0;
+----+------+
| id | key  |
+----+------+
|  1 | 000  |
|  2 | a2   |
|  3 | a3   |
|  4 | b1   |
|  5 | b2   |
|  6 | b3   |
+----+------+
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3169 引用 • 8207 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...