在分布式大数据计算引擎这个领域,现在最常用的 Apache Spark 早已支持 python 语言编写,而且对 ML(机器学习)和 DM(数据挖掘)也都有 api 的支持,而作为第三代计算引擎的 flink, 从 1.9.0 版本开始增加了对 Python 的支持(PyFlink),在 Flink 1.10 中,PyFlink 添加了对 Python UDFs(自定义函数) 的支持,可以在 Table API/SQL 中注册并使用自定义函数,而从 Flink 1.11 开始还支持在 Windows 上本地运行 PyFlink 作业,所以可以在 Windows 上开发和调试 PyFlink 作业
Pyflink 的安装
Pyflink 的安装十分之简单
- 首先先看看自己系统的 python 版本(PyFlink 需要 Python 版本(3.6、3.7 或 3.8))
$ python3 --version
# the version printed here must be 3.6, 3.7 or 3.8
- 环境配置
由于系统可能包含多个 Python 版本,因此也包含多个 Python 二进制可执行文件。运行以下ls
命令来找出系统中可用的 Python 二进制可执行文件:
$ ls /usr/bin/python*
- 选择软链接
python
指向您的python3
解释器
ln -s /usr/bin/python3 python
- 安装 Pyflink
由于 pyflink 还在持续火热的更新之中每个版本的变化较大,所以无脑安装最新版就行(更新此文章时最新版为 apache-flink1.13.2)
$ python3 -m pip install apache-flink
windows 开发环境配置
这里我们选择 Pycharm IDE 来进行 windows Pyflink 开发
- 首先配置 Pytthon 虚拟环境
配置路径PyCharm -> Preferences -> Project Interpreter
记住选择的 python 版本一定要是 3.6、3.7 或 3.8
- 新建一个项目,环境选择我们刚刚配置的 python 虚拟环境
-
安装 Pyflink
进入到终端界面
先看看 python 版本
然后安装 Pyflink
最终完成之后你可以在site-packages
下面找的pyflink
目录,如下
-
Hello World 示例
新建一个.py 文件输入以下代码
#!/usr/bin/env python38
#-*- coding:utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
def hello_world():
"""
从随机Source读取数据,然后直接利用PrintSink输出。
"""
settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)
source_ddl = """
CREATE TABLE random_source (
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='1000',
'fields.f_random.min'='1',
'fields.f_random.max'='1000',
'fields.f_random_str.length'='10'
)
"""
sink_ddl = """
CREATE TABLE print_sink (
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
'connector' = 'print'
)
"""
# 注册source和sink
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
# 数据提取
tab = t_env.from_path("random_source")
# 这里我们暂时先使用 标注了 deprecated 的API, 因为新的异步提交测试有待改进...
tab.execute_insert("print_sink").wait()
# 执行作业
t_env.execute_sql("Flink Hello World")
if __name__ == '__main__':
hello_world()
结果如下
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于