Flink(1)——初体验

本贴最后更新于 2544 天前,其中的信息可能已经时过境迁

背景

Flink 是一个实时计算平台,同时支持 stream 计算和 batch 计算。

其中的 stream 计算支持 exact-once,低延迟和高吞吐。

安装

docker 安装

参考 flink docker 官方页面

docker pull flink

也可以走国内镜像

docker pull registry.docker-cn.com/library/flink

单机版启动

$docker run --name flink_local -p 8081:8081 -t flink local

打开浏览器可访问, http://localhost:8081

默认端口:

  • 8081: Web Client port
  • 6123: Job Manager RPC port
  • 6122: Task Managers RPC port
  • 6121: Task Managers Data port

集群版本请继续看 flink docker 官方页面

提交任务

通过 Web UI 提交任务

或者命令行提交任务

$ docker run --rm -t flink flink run -m <jobmanager:port> -c <your_class> <your_jar> <your_params>

小试牛刀

maven 构建第一个任务

使用指定的 maven archetype 来构造工程 blink-test,如下:

mvn archetype:generate \
DarchetypeGroupId=org.apache.flink \
DarchetypeArtifactId=flink-quickstart-java \
DarchetypeVersion=1.3.2

目录内容如下:

.
├── blink-test.iml
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── abeffect
        │           └── blink
        │               ├── BatchJob.java
        │               ├── SocketTextStreamWordCount.java
        │               ├── StreamingJob.java
        │               └── WordCount.java
        └── resources
            └── log4j.properties

打包

mvn package

见打包的 jar 文件

target/blinktest-1.0-SNAPSHOT.jar

运行

提交作业

通过 web ui 提交作业

结果查看

非 docker 版本

安装

启动

bin/start-local.sh

查看任务

$ flink list
Retrieving JobManager.
Using address localhost/127.0.0.1:6123 to connect to JobManager.
No running jobs.
No scheduled jobs.

提交任务

flink run -c com.abeffect.blink.WordCount ~/IdeaProjects/blink-test/target/blinktest-1.0-SNAPSHOT.jar
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8081
Starting execution of program
Submitting job with JobID: e841d6ca8fb8081f595e0f01cc9935f5. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1554195054] with leader session id 00000000-0000-0000-0000-000000000000.
12/02/2017 11:18:04	Job execution switched to status RUNNING.
12/02/2017 11:18:04	CHAIN DataSource (at main(WordCount.java:52) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:61)) -> Combine(SUM(1), at main(WordCount.java:64)(1/1) switched to SCHEDULED
12/02/2017 11:18:04	CHAIN DataSource (at main(WordCount.java:52) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:61)) -> Combine(SUM(1), at main(WordCount.java:64)(1/1) switched to DEPLOYING
12/02/2017 11:18:04	CHAIN DataSource (at main(WordCount.java:52) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:61)) -> Combine(SUM(1), at main(WordCount.java:64)(1/1) switched to RUNNING
12/02/2017 11:18:05	Reduce (SUM(1), at main(WordCount.java:64)(1/1) switched to SCHEDULED
12/02/2017 11:18:05	Reduce (SUM(1), at main(WordCount.java:64)(1/1) switched to DEPLOYING
12/02/2017 11:18:05	Reduce (SUM(1), at main(WordCount.java:64)(1/1) switched to RUNNING
12/02/2017 11:18:06	CHAIN DataSource (at main(WordCount.java:52) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:61)) -> Combine(SUM(1), at main(WordCount.java:64)(1/1) switched to FINISHED
12/02/2017 11:18:06	DataSink (collect())(1/1) switched to SCHEDULED
12/02/2017 11:18:06	DataSink (collect())(1/1) switched to DEPLOYING
12/02/2017 11:18:06	Reduce (SUM(1), at main(WordCount.java:64)(1/1) switched to FINISHED
12/02/2017 11:18:06	DataSink (collect())(1/1) switched to RUNNING
12/02/2017 11:18:06	DataSink (collect())(1/1) switched to FINISHED
12/02/2017 11:18:06	Job execution switched to status FINISHED.
(a,1)
(against,1)
(and,1)
(arms,1)
(arrows,1)
(be,2)
(fortune,1)
(in,1)
(is,1)
(mind,1)
(nobler,1)
(not,1)
(of,2)
(or,2)
(outrageous,1)
(question,1)
(sea,1)
(slings,1)
(suffer,1)
(take,1)
(that,1)
(the,3)
(tis,1)
(to,4)
(troubles,1)
(whether,1)
Program execution finished
Job with JobID e841d6ca8fb8081f595e0f01cc9935f5 has finished.
Job Runtime: 1642 ms
Accumulator Results:
- fe75b1f5e0c1c59aac0e22f24b1ae083 (java.util.ArrayList) [26 elements]

取消任务

停止运行任务

flink cancel jobId

停止任务

停止输入数据流,source 需要实现 StoppableFunction 接口

flink stop jobId
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3187 引用 • 8213 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...