Spring Cloud Data Flow 入门

本贴最后更新于 1489 天前,其中的信息可能已经水流花落

What

Microservice based Streaming and Batch data processing for Cloud Foundry and Kubernetes

简单解释一下 StreamingBatch 处理的区别

  • Streaming: 应用始终处于启动状态,有数据就处理。通过消息传递中间件消耗或生成无限量的数据。
  • Batch(Task): 需要处理数据时启动应用,处理完成后关闭应用,只处理有限量的数据。

思考一个问题。如何动态启动一个未启动的 Spring 项目?

答案是配置了一个 maven jar 包地址,或者是一个 docker image 地址,在任务触发时,下载对应的程序(镜像)并启动。

SCDF-event-driven-applications

Concept

  • Application: 指向某一个 jardockerStreamBatch 程序
  • Task: 通过 DSL 组装多个 Application,定义处理任务
  • Stream: 通过 DSL 组装多个 Application,按照 source->processor->sink 定义数据处理流
  • Job: 每个 Task 的执行记录,包括入参及结果记录

组件

  • Data Flow Server: 定义、校验并执行 StreamBatch,监听应用状态,记录执行记录。

    • 基于 DSL 定义 StreamBatch
    • 通过 jar 包 或者 docker 镜像 注册应用
    • 执行 StreamBatch 并记录执行记录,监听应用状态
  • Skipper Server: 负责流处理发布

    • 部署 Streams 到一或多个平台
    • 基于蓝绿更新策略升级或回滚 Stream
    • 持久化 Stream 的描述信息,包括历史版本
  • Database:Data Flow ServerSkipper Server 依赖关系数据库。

    • 默认通过内嵌的 H2 数据库启动,支持多种关系型数据库
    • 在服务启动时自动创建表结构
    • Task 可以通过配置共享外部数据库作为持久化
  • Messaging Middleware: 用于消息驱动模型的消息中间件。

    • 支持 KafkaRabbitMQ
    • Stream 必须通过消息中间件驱动
  • Monitor: 监控系统运行指标。

    • 通过 PrometheusInfluxDB 存储
    • 通过 Grafana 展示

使用

上面的概念和组件有点复杂?我们先快速启动一个应用,通过界面看一下如何使用

安装

可以通过一句 docker 命令启动

docker run --rm -it -p 9393:9393 springcloud/spring-cloud-dataflow-server:2.6.3

通过 http://localhost:9393/dashboard/#/about 访问管理界面

注: Schedules 未开启的原因是单机版的安装方式不支持定时任务。

创建 Application

提供了三种方式,但是要注意,不支持直接上传 jar 包或者发现运行中的应用,只能指定 maven 仓库地址(或者 docker 地址)进行拉取

选用第二种(import from an HTTP URI location),并选择官方 Task Apps Maven 示例,本质上是从官方拉取下列配置,在 第三种(import from properties) 中输入下列信息也是同样的效果。

task.timestamp=maven://org.springframework.cloud.task.app:timestamp-task:2.1.1.RELEASE
task.timestamp.metadata=maven://org.springframework.cloud.task.app:timestamp-task:jar:metadata:2.1.1.RELEASE
task.composed-task-runner=maven://org.springframework.cloud.task.app:composedtaskrunner-task:2.1.4.RELEASE
task.composed-task-runner.metadata=maven://org.springframework.cloud.task.app:composedtaskrunner-task:jar:metadata:2.1.4.RELEASE
task.timestamp-batch=maven://org.springframework.cloud.task.app:timestamp-batch-task:2.1.1.RELEASE
task.timestamp-batch.metadata=maven://org.springframework.cloud.task.app:timestamp-batch-task:jar:metadata:2.1.1.RELEASE

每个 application 需要指定应用地址和 metadata 信息地址。metadata 会在使用过程中明确参数输入表单。如果未指定 metadata 地址,会尝试通过应用进行提取。

导入的应用可以在应用列表中查看,可以通过 show details 查看应用的 metadata

-w1439

metadata 通过 KV 形式展示

-w1437

**注意: 这里有一个坑。**如果你按照官方示例 https://github.com/spring-cloud/spring-cloud-task/tree/master/spring-cloud-task-samples/timestamp 新建了一个 Spring Cloud Task, 会发现没办法查看 metadata 信息,原因是因为官方项目中缺少配置文件

classpath*:/META-INF/dataflow-configuration-metadata.properties

内容为 configuration-properties.classes 执行配置类

configuration-properties.classes=wang.yuheng.SyncPrestoProperties

Spring Cloud Data Flowmetadata 解析过程如下:


另一个坑是,如果是 maven 地址,会先从**localRepository(.m2)**获取 jar 文件解析 metadata。而如果你设置的是 docker,每次都会通过网络进行 docker 鉴权并下载,无法直接判断本地是否存在,所以速度会比 maven 慢很多。

Task

dataflow-task-lifecycle

创建 Task

可以通过页面或者 DSL 定义 Task

-w1425

创建好的 Task 会存在于 Tasks 列表中,可以在列表页查看 Task 详情或者执行任务(生成 job)

-w1435

如果通过 Cloud 方式进行部署,可以指定定时任务,相比普通任务,可以通过 cron expression 设置定时


运行 Task

可以在 Task 列表中选择要执行的 Task,并通过 KV List 的方式指定入参及 properties。明确一下这 2 个概念

  • properties: 配置参数,比如数据库配置
  • Arguments: 方法入参,即 String[] args

-w1084

每次执行会生成一个 Job 记录,可以在 Task 或者 Job 列表中查看执行状态、日志等信息

-w1421

审计留痕

Audit Records 页面会看到触发数据及执行记录等信息,包括不同方式(Restful、Shell、WebUI)触发的行为。开启登录验证后可以查看操作人信息。

-w1437

Stream

使用 Stream 需要依赖 skipper-server 和消息中间件,通过 docker-compose 启动

version: '3'

services:
  mysql:
    image: mysql:5.7.25
    container_name: dataflow-mysql
    environment:
      MYSQL_DATABASE: dataflow
      MYSQL_USER: root
      MYSQL_ROOT_PASSWORD: rootpw
    expose:
      - 3306

  kafka-broker:
    image: confluentinc/cp-kafka:5.3.1
    container_name: dataflow-kafka
    expose:
      - "9092"
    environment:
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_HOST_NAME=kafka-broker
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
    depends_on:
      - zookeeper

  zookeeper:
    image: confluentinc/cp-zookeeper:5.3.1
    container_name: dataflow-kafka-zookeeper
    expose:
      - "2181"
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181

  dataflow-server:
    image: springcloud/spring-cloud-dataflow-server:2.6.3
    container_name: dataflow-server
    ports:
      - "9393:9393"
    environment:
      - spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.brokers=PLAINTEXT://kafka-broker:9092
      - spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.streams.binder.brokers=PLAINTEXT://kafka-broker:9092
      - spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.zkNodes=zookeeper:2181
      - spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.streams.binder.zkNodes=zookeeper:2181
      - spring.cloud.skipper.client.serverUri=http://skipper-server:7577/api
      - SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/dataflow
      - SPRING_DATASOURCE_USERNAME=root
      - SPRING_DATASOURCE_PASSWORD=rootpw
      - SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driver
    depends_on:
      - kafka-broker
    entrypoint: "./wait-for-it.sh mysql:3306 -- java -jar /maven/spring-cloud-dataflow-server.jar"
    volumes:
      - ${HOST_MOUNT_PATH:-.}:${DOCKER_MOUNT_PATH:-/root/scdf}

  app-import:
    image: springcloud/openjdk:2.0.0.RELEASE
    container_name: dataflow-app-import
    depends_on:
      - dataflow-server
    command: >
      /bin/sh -c "
        ./wait-for-it.sh -t 180 dataflow-server:9393;
        wget -qO- 'http://dataflow-server:9393/apps' --post-data='uri=${STREAM_APPS_URI:-https://dataflow.spring.io/kafka-maven-latest&force=true}';
        echo 'Stream apps imported'
        wget -qO- 'http://dataflow-server:9393/apps' --post-data='uri=${TASK_APPS_URI:-https://dataflow.spring.io/task-maven-latest&force=true}';
        echo 'Task apps imported'"

  skipper-server:
    image: springcloud/spring-cloud-skipper-server:2.5.2
    container_name: skipper
    ports:
      - "7577:7577"
      - "20000-20105:20000-20105"
    environment:
      - SPRING_CLOUD_SKIPPER_SERVER_PLATFORM_LOCAL_ACCOUNTS_DEFAULT_PORTRANGE_LOW=20000
      - SPRING_CLOUD_SKIPPER_SERVER_PLATFORM_LOCAL_ACCOUNTS_DEFAULT_PORTRANGE_HIGH=20100
      - SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/dataflow
      - SPRING_DATASOURCE_USERNAME=root
      - SPRING_DATASOURCE_PASSWORD=rootpw
      - SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driver
    entrypoint: "./wait-for-it.sh mysql:3306 -- java -jar /maven/spring-cloud-skipper-server.jar"
    volumes:
      - ${HOST_MOUNT_PATH:-.}:${DOCKER_MOUNT_PATH:-/root/scdf}

创建 Stream

同样可以通过页面或者 DSL 按照 source->processor->sink 定义 Stream。每个 source->processor->sink 是一个 Stream,可以同时创建多个独立的 Stream

-w1429

-w1343

发布 Stream

Stream 创建成功后,需要通过 skipper-server 发布至 Runtime,基于消息队列进行驱动并执行数据处理

注意: 如果要使用流,需要找一个支持流的持久化,如 Redis

在列表页可以查看 Stream 详情,或者发布 Stream

-w1430

发布时需要指定每个 Applicationproperties,并且指定资源限制。如果是发布至 k8s 环境,会根据集群配置进行分配。

截屏 2020-10-24 下午 9.24.09

细节: 会按照 labelkafka 中创建对应的 topic

-w1438

发布后可以在 Runtime 查看对应的 Stream 状态及详情

-w1357

通过 docker-compose 启动,所以会在 skipper-server 这台机器上运行相关 Jar 包作为消息 Consumer

-w1427

Shell

上述操作均可通过 命令行 or Restful 进行调用,并且配置都可以导出为对应的配置文件(DSLTaskStream 等)

-w1434

基于此就可配合当前使用的 CICD 完成 devops

想象一个场景,在你提交代码后,自动发布至某个环境等待运行。

1. push code
2. maven package
3. deploy maven repository
4. create application
5. create task
6. run task

观点

现实骨感,未来部分期待,如果你正在 All in Spring Cloud。
整体基于 Spring Cloud,但是又拥抱 docker。如果只是 docker 就可以去掉很多限制,就像是 k8sSpring Cloud 组件本身就存在诸多重复

优点

  1. 基于 Spring 微服务,无切换成本,可独立开发、测试
  2. 完整的闭环,提供了从服务定制、管理、运行、监控全生命周期解决方案
  3. 拖拽式 UI 操作界面,配合 DSL,配置简单,页面看起来很现代(你知道我在讽刺谁)

中立

  1. 未提供特定的计算引擎集群,类似 Flink、Spark 等
  2. 不能覆盖工作流场景
  3. 稳定性,目测现阶段上生产可以很快成为 contributors
  4. 资源占用(看到有吐槽,但是未测试,不发表评论)

缺点

  1. 仅基于 Spring 微服务,比如一行命令 or 一句 sql 必须通过 Spring Cloud Task(or Stream) 编写。通过 Java 编写 job,你需要一个高版本的 JDK
  2. 依赖 maven repo (可能提供了 http、ftp 等其他方式,但是笔者没找到。。)
  3. 如果涉及到大数据处理,还是要依靠 Hadoop 中的模块。那么为什么混用,而不是直接使用全家桶呢?

资料

  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    944 引用 • 1459 回帖 • 17 关注
  • 大数据

    大数据(big data)是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    93 引用 • 113 回帖
  • ETL
    4 引用 • 5 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...