数据 ETL 平台 Streamsets 简单应用

本贴最后更新于 1790 天前,其中的信息可能已经时移世易

StreamSets 介绍

Screenshotfrom20191220162320.png

StreamSets 是最近兴起的 ETL 平台,它的特点是具有多样性的组件支持,可集成于 CDH 平台。最为吸引人的就是可视化的数据流通流程设置,多个 pipelines 的编写,RestApi 形式的自动化支持,等等等等,当然选择使用它的最终理由还是因为支持的组件多。这一篇就简单来讲讲 StreamSets 的简单使用。

StreamSets 部署

StreamSets 的部署有很多形式,这里列举两个最方便的。

Docker-compose

这里我提供一份写好的 docker-compose 文件,如下

version: '3.1'
services: 
  streamsets: 
    image: streamsets/datacollector
    restart: always
    ports:
      - "18630:18630"
    environment:
      HOST_IP: 0.0.0.0
    volumes:
      - streamsets:/opt/steramsets
      - /data/mdt/data_hbase:/data/hbase
volumes: 
  streamsets:

这里把端口映射到了 18630 方便等下的登录使用,HOST_IP 改成万能 IP,不设置访问限制方便访问,相关数据挂到本地磁盘。
启动命令

sudo docker-compose up -d

直接复制 yaml 配置创建 docker-compose.yaml 就可以了

K8S

因为 StreamSets 内部的 data collector 重启机制问题,会导致组件无法成功添加的问题,所以这里还没有解决,暂不推荐 K8S,不过还是把配置文件给出来(组件无法附加版本)

apiVersion: apps/v1
kind: Deployment
metadata:
  name: streamsets-datacollector
  namespace: default
  labels:
    app: streamsets-datacollector
spec:
  replicas: 1
  selector:
    matchLabels:
      app: streamsets-datacollector
  template:
    metadata:
      labels:
        app: streamsets-datacollector
    spec:
      containers:
      - name: streamsets
        image: streamsets/datacollector
        env:
        - name: HOST_IP
          value: 0.0.0.0
        resources:
          limits:
            cpu: '1'
            memory: 2Gi
          requests:
            cpu: '1'
            memory: 2Gi
        volumeMounts:        #容器内挂载点
          - mountPath: /opt/steramsets
            name: stp
          - mountPath: /data/hbase
            name: sthb
      volumes:
      - name: stp
        hostPath:
          path: streamsets
      - name: sthb
        hostPath:
          path: /data/mdt/data_hbase
          
---

apiVersion: v1
kind: Service
metadata:
  labels:
    app: streamsets-datacollector
  name: streamsets-datacollector
  namespace: default
spec:
  type: NodePort
  ports:
  - port: 18630
    protocol: TCP
    targetPort: 18630
    nodePort: 31630
  selector:
    app: streamsets-datacollector

也是把端口暴露在外部 18630
复制下来创建 StreamSets.yaml
启动命令

kubectl apply -f StreamSets.yaml -n data-service

这里是使用 data-service 命名空间,K8S 的概念这里不多说了

CDH

这个安装方式我就不多说了吧,因为直接在 cloudera-manager 里面添加角色就行了,具体在那个机器上的角色看自己的需求了。

StreamSets 应用

Pipelines 创建

这里我们简单的举一个 HTTP-client 数据获取的例子。
为了方便起见,我们先创建一个 test.txt 文件,内容可以如下:

name,age
symoon,18
willson,50

这里用,分割,方便那我们取数据。
当然也可以自己设置测试数据,这里随便写了一个。
接下来我们简单开启一个 python 服务,用来启动一个 api

python3 -m http.server 8001

这个不多说了,大家应该都知道,这条命令在 txt 所在文件夹启动,这时候访问

http://127.0.0.1:8001/test.txt

这个 API 就是我们需要的数据。接下来我们到 StreamSets 里面操作。
Screenshotfrom20191127102305.png
账号密码都是默认的 admin
输入正确后,我们进入到主页面
Screenshotfrom20191220181025.png
点击 CREATE NEW PIPELINE 创建一个新的任务,Title 什么的随便填,我直接填了 test
创建成功后我们进入到任务管理界面:
Screenshotfrom20191220181127.png

我们选择拖拽一个 HTTP Client 作为数据源。相关配置如下填写,这里我们用 docker 启动的 StreamSets 的化就无法用 127.0.0.1 了,我们使用本机的 IP

Screenshotfrom20191220182037.png

Screenshotfrom20191220181509.png

Screenshotfrom20191220182059.png

这里填写的信息都有说明,相信大家都能看懂就不多说了,接下来测试一下,我们点击一下右上角类似眼睛的按钮。
默认信息就好,批量可以从 10 改成 1。结果如图
Screenshotfrom20191220182144.png

Mysql 增量采集案例

这里我们举个简单的增量采集案例,采集 Mysql 指定表中的增量数据到 Kafka 中。
接下来来实战一下,我们先创建一个 pipeline,上面已经说明了,就不多说了。
因为 mysql 的 connector 需要安装 jdbc 组件,所以我们需要进行组件安装。进入 Pipeline 中的界面,以如下方式选择。
Screenshotfrom20191221091758.png

点击加号 Add/Remove Stages,进入到组件安装界面。
Screenshotfrom20191221092001.png

这里我们找到 JDBC,点击右侧的配置按钮,点击 install。
因为我们需要用到 Kafka 服务,这里我们继续进入 Apache Kafka 中,安装 Kafka-V2.0 版本。安装成功后会提示重启,根据提示点击 Restart data collector 按钮即可。等待一分钟左右服务重启即可。

重新进入到我们刚才创建的 Pipeline,配置 JDBC 拖动的组件及配置如下:
Screenshotfrom20191221094547.png
这里我们的数据库名字和表名字分别是 test 和 Test,偏移值由 id 字段来计算。初始值设置 0。
因为 jdbc 需要相应的驱动,所以我们需要在 External Libraries 处上传 mysql 的 jdbc 驱动。这里直接附上一份。
mysqlconnectorjava8.0.18.jar
下载好后上传,依旧需要重启服务生效。
后面还需要提供账号密码,这里就不截图多说了。
这时候依旧是点那个眼睛按钮测试一下,首先我们还是需要在数据库里面创建这张表的,具体格式如下

id|name|age
1|symoon|18
2|willson|19
3|lake|20

大概这种表格式,测试后结果如下:
Screenshotfrom20191221095157.png
这时候我们已经拿到了增量数据,接下来配置 Kafka 的数据接收端。
Screenshotfrom20191221095630.png

这里我们在上面的接收端选择直接选择 Kafka producer 就可以了,我使用的是本机 docker 启动的 Kafka 具体的配置我就不发了,在之前的一键启动 docker 程序里面有,topic 随便输入一个 test,接下来还是点击眼睛测试一下。

Screenshotfrom20191221095836.png

kafka 的 input 成功了,接下来我们直接运行起来。
Screenshotfrom20191221095918.png
数据流通的性能还有一些相关信息都在这里显示了,这个任务会一直运行着,增量数据都可以流通到 Kafka,其他的数据库使用其他连接器也可以支持,这里就不多说了,大家可以都一一去试试。

  • Docker

    Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的操作系统上。容器完全使用沙箱机制,几乎没有性能开销,可以很容易地在机器和数据中心中运行。

    491 引用 • 916 回帖 • 1 关注
  • Kubernetes

    Kubernetes 是 Google 开源的一个容器编排引擎,它支持自动化部署、大规模可伸缩、应用容器化管理。

    110 引用 • 54 回帖
  • StreamSets
    1 引用
  • 大数据

    大数据(big data)是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    93 引用 • 113 回帖 • 2 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...