StreamSets 介绍
StreamSets 是最近兴起的 ETL 平台,它的特点是具有多样性的组件支持,可集成于 CDH 平台。最为吸引人的就是可视化的数据流通流程设置,多个 pipelines 的编写,RestApi 形式的自动化支持,等等等等,当然选择使用它的最终理由还是因为支持的组件多。这一篇就简单来讲讲 StreamSets 的简单使用。
StreamSets 部署
StreamSets 的部署有很多形式,这里列举两个最方便的。
Docker-compose
这里我提供一份写好的 docker-compose 文件,如下
version: '3.1'
services:
streamsets:
image: streamsets/datacollector
restart: always
ports:
- "18630:18630"
environment:
HOST_IP: 0.0.0.0
volumes:
- streamsets:/opt/steramsets
- /data/mdt/data_hbase:/data/hbase
volumes:
streamsets:
这里把端口映射到了 18630 方便等下的登录使用,HOST_IP 改成万能 IP,不设置访问限制方便访问,相关数据挂到本地磁盘。
启动命令
sudo docker-compose up -d
直接复制 yaml 配置创建 docker-compose.yaml 就可以了
K8S
因为 StreamSets 内部的 data collector 重启机制问题,会导致组件无法成功添加的问题,所以这里还没有解决,暂不推荐 K8S,不过还是把配置文件给出来(组件无法附加版本)
apiVersion: apps/v1
kind: Deployment
metadata:
name: streamsets-datacollector
namespace: default
labels:
app: streamsets-datacollector
spec:
replicas: 1
selector:
matchLabels:
app: streamsets-datacollector
template:
metadata:
labels:
app: streamsets-datacollector
spec:
containers:
- name: streamsets
image: streamsets/datacollector
env:
- name: HOST_IP
value: 0.0.0.0
resources:
limits:
cpu: '1'
memory: 2Gi
requests:
cpu: '1'
memory: 2Gi
volumeMounts: #容器内挂载点
- mountPath: /opt/steramsets
name: stp
- mountPath: /data/hbase
name: sthb
volumes:
- name: stp
hostPath:
path: streamsets
- name: sthb
hostPath:
path: /data/mdt/data_hbase
---
apiVersion: v1
kind: Service
metadata:
labels:
app: streamsets-datacollector
name: streamsets-datacollector
namespace: default
spec:
type: NodePort
ports:
- port: 18630
protocol: TCP
targetPort: 18630
nodePort: 31630
selector:
app: streamsets-datacollector
也是把端口暴露在外部 18630
复制下来创建 StreamSets.yaml
启动命令
kubectl apply -f StreamSets.yaml -n data-service
这里是使用 data-service 命名空间,K8S 的概念这里不多说了
CDH
这个安装方式我就不多说了吧,因为直接在 cloudera-manager 里面添加角色就行了,具体在那个机器上的角色看自己的需求了。
StreamSets 应用
Pipelines 创建
这里我们简单的举一个 HTTP-client 数据获取的例子。
为了方便起见,我们先创建一个 test.txt 文件,内容可以如下:
name,age
symoon,18
willson,50
这里用,分割,方便那我们取数据。
当然也可以自己设置测试数据,这里随便写了一个。
接下来我们简单开启一个 python 服务,用来启动一个 api
python3 -m http.server 8001
这个不多说了,大家应该都知道,这条命令在 txt 所在文件夹启动,这时候访问
http://127.0.0.1:8001/test.txt
这个 API 就是我们需要的数据。接下来我们到 StreamSets 里面操作。
账号密码都是默认的 admin
输入正确后,我们进入到主页面
点击 CREATE NEW PIPELINE 创建一个新的任务,Title 什么的随便填,我直接填了 test
创建成功后我们进入到任务管理界面:
我们选择拖拽一个 HTTP Client 作为数据源。相关配置如下填写,这里我们用 docker 启动的 StreamSets 的化就无法用 127.0.0.1 了,我们使用本机的 IP
这里填写的信息都有说明,相信大家都能看懂就不多说了,接下来测试一下,我们点击一下右上角类似眼睛的按钮。
默认信息就好,批量可以从 10 改成 1。结果如图
Mysql 增量采集案例
这里我们举个简单的增量采集案例,采集 Mysql 指定表中的增量数据到 Kafka 中。
接下来来实战一下,我们先创建一个 pipeline,上面已经说明了,就不多说了。
因为 mysql 的 connector 需要安装 jdbc 组件,所以我们需要进行组件安装。进入 Pipeline 中的界面,以如下方式选择。
点击加号 Add/Remove Stages,进入到组件安装界面。
这里我们找到 JDBC,点击右侧的配置按钮,点击 install。
因为我们需要用到 Kafka 服务,这里我们继续进入 Apache Kafka 中,安装 Kafka-V2.0 版本。安装成功后会提示重启,根据提示点击 Restart data collector 按钮即可。等待一分钟左右服务重启即可。
重新进入到我们刚才创建的 Pipeline,配置 JDBC 拖动的组件及配置如下:
这里我们的数据库名字和表名字分别是 test 和 Test,偏移值由 id 字段来计算。初始值设置 0。
因为 jdbc 需要相应的驱动,所以我们需要在 External Libraries 处上传 mysql 的 jdbc 驱动。这里直接附上一份。
mysqlconnectorjava8.0.18.jar
下载好后上传,依旧需要重启服务生效。
后面还需要提供账号密码,这里就不截图多说了。
这时候依旧是点那个眼睛按钮测试一下,首先我们还是需要在数据库里面创建这张表的,具体格式如下
id|name|age
1|symoon|18
2|willson|19
3|lake|20
大概这种表格式,测试后结果如下:
这时候我们已经拿到了增量数据,接下来配置 Kafka 的数据接收端。
这里我们在上面的接收端选择直接选择 Kafka producer 就可以了,我使用的是本机 docker 启动的 Kafka 具体的配置我就不发了,在之前的一键启动 docker 程序里面有,topic 随便输入一个 test,接下来还是点击眼睛测试一下。
kafka 的 input 成功了,接下来我们直接运行起来。
数据流通的性能还有一些相关信息都在这里显示了,这个任务会一直运行着,增量数据都可以流通到 Kafka,其他的数据库使用其他连接器也可以支持,这里就不多说了,大家可以都一一去试试。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于