云原生的工作流引擎 ---argo

本贴最后更新于 1478 天前,其中的信息可能已经事过境迁

argo.png

Argo Workflows 是一个开源的容器本机工作流引擎,用于在 Kubernetes 上协调并行作业。 Argo Workflows 被实现为 Kubernetes CRD(自定义资源定义)。跟其他传统的工作流引擎不同的是,他的每一个步骤都是一个容器。将多步骤工作流建模为一系列任务,或者使用有向无环图(DAG)捕获任务之间的依赖关系。
使用 Kubernetes 上的 Argo Workflow,可以在短时间内轻松运行用于计算机学习或数据处理的计算密集型作业。

安装 argo

安装 argo 十分容易

第一步先创建 namespace

kubectl create ns argo

第二步执行 kubectl -n argo apply -f install.yaml

# This is an auto-generated file. DO NOT EDIT
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: clusterworkflowtemplates.argoproj.io
spec:
  group: argoproj.io
  names:
    kind: ClusterWorkflowTemplate
    listKind: ClusterWorkflowTemplateList
    plural: clusterworkflowtemplates
    shortNames:
    - clusterwftmpl
    - cwft
    singular: clusterworkflowtemplate
  scope: Cluster
  version: v1alpha1
  versions:
  - name: v1alpha1
    served: true
    storage: true
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: cronworkflows.argoproj.io
spec:
  group: argoproj.io
  names:
    kind: CronWorkflow
    listKind: CronWorkflowList
    plural: cronworkflows
    shortNames:
    - cwf
    - cronwf
    singular: cronworkflow
  scope: Namespaced
  version: v1alpha1
  versions:
  - name: v1alpha1
    served: true
    storage: true
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: workfloweventbindings.argoproj.io
spec:
  group: argoproj.io
  names:
    kind: WorkflowEventBinding
    listKind: WorkflowEventBindingList
    plural: workfloweventbindings
    shortNames:
    - wfeb
    singular: workfloweventbinding
  scope: Namespaced
  version: v1alpha1
  versions:
  - name: v1alpha1
    served: true
    storage: true
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: workflows.argoproj.io
spec:
  additionalPrinterColumns:
  - JSONPath: .status.phase
    description: Status of the workflow
    name: Status
    type: string
  - JSONPath: .status.startedAt
    description: When the workflow was started
    format: date-time
    name: Age
    type: date
  group: argoproj.io
  names:
    kind: Workflow
    listKind: WorkflowList
    plural: workflows
    shortNames:
    - wf
    singular: workflow
  scope: Namespaced
  subresources: {}
  version: v1alpha1
  versions:
  - name: v1alpha1
    served: true
    storage: true
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: workflowtemplates.argoproj.io
spec:
  group: argoproj.io
  names:
    kind: WorkflowTemplate
    listKind: WorkflowTemplateList
    plural: workflowtemplates
    shortNames:
    - wftmpl
    singular: workflowtemplate
  scope: Namespaced
  version: v1alpha1
  versions:
  - name: v1alpha1
    served: true
    storage: true
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: argo
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: argo-server
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: argo-role
rules:
- apiGroups:
  - ""
  resources:
  - secrets
  verbs:
  - get
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  labels:
    rbac.authorization.k8s.io/aggregate-to-admin: "true"
  name: argo-aggregate-to-admin
rules:
- apiGroups:
  - argoproj.io
  resources:
  - workflows
  - workflows/finalizers
  - workflowtemplates
  - workflowtemplates/finalizers
  - cronworkflows
  - cronworkflows/finalizers
  - clusterworkflowtemplates
  - clusterworkflowtemplates/finalizers
  verbs:
  - create
  - delete
  - deletecollection
  - get
  - list
  - patch
  - update
  - watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  labels:
    rbac.authorization.k8s.io/aggregate-to-edit: "true"
  name: argo-aggregate-to-edit
rules:
- apiGroups:
  - argoproj.io
  resources:
  - workflows
  - workflows/finalizers
  - workflowtemplates
  - workflowtemplates/finalizers
  - cronworkflows
  - cronworkflows/finalizers
  - clusterworkflowtemplates
  - clusterworkflowtemplates/finalizers
  verbs:
  - create
  - delete
  - deletecollection
  - get
  - list
  - patch
  - update
  - watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  labels:
    rbac.authorization.k8s.io/aggregate-to-view: "true"
  name: argo-aggregate-to-view
rules:
- apiGroups:
  - argoproj.io
  resources:
  - workflows
  - workflows/finalizers
  - workflowtemplates
  - workflowtemplates/finalizers
  - cronworkflows
  - cronworkflows/finalizers
  - clusterworkflowtemplates
  - clusterworkflowtemplates/finalizers
  verbs:
  - get
  - list
  - watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: argo-cluster-role
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - pods/exec
  verbs:
  - create
  - get
  - list
  - watch
  - update
  - patch
  - delete
- apiGroups:
  - ""
  resources:
  - configmaps
  verbs:
  - get
  - watch
  - list
- apiGroups:
  - ""
  resources:
  - persistentvolumeclaims
  verbs:
  - create
  - delete
- apiGroups:
  - argoproj.io
  resources:
  - workflows
  - workflows/finalizers
  verbs:
  - get
  - list
  - watch
  - update
  - patch
  - delete
  - create
- apiGroups:
  - argoproj.io
  resources:
  - workflowtemplates
  - workflowtemplates/finalizers
  - clusterworkflowtemplates
  - clusterworkflowtemplates/finalizers
  verbs:
  - get
  - list
  - watch
- apiGroups:
  - ""
  resources:
  - serviceaccounts
  verbs:
  - get
  - list
- apiGroups:
  - argoproj.io
  resources:
  - cronworkflows
  - cronworkflows/finalizers
  verbs:
  - get
  - list
  - watch
  - update
  - patch
  - delete
- apiGroups:
  - ""
  resources:
  - events
  verbs:
  - create
  - patch
- apiGroups:
  - policy
  resources:
  - poddisruptionbudgets
  verbs:
  - create
  - get
  - delete
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: argo-server-cluster-role
rules:
- apiGroups:
  - ""
  resources:
  - configmaps
  verbs:
  - get
  - watch
  - list
- apiGroups:
  - ""
  resources:
  - secrets
  verbs:
  - get
- apiGroups:
  - ""
  resources:
  - pods
  - pods/exec
  - pods/log
  verbs:
  - get
  - list
  - watch
  - delete
- apiGroups:
  - ""
  resources:
  - events
  verbs:
  - watch
  - create
  - patch
- apiGroups:
  - ""
  resources:
  - secrets
  - serviceaccounts
  verbs:
  - get
- apiGroups:
  - argoproj.io
  resources:
  - workflows
  - workfloweventbindings
  - workflowtemplates
  - cronworkflows
  - clusterworkflowtemplates
  verbs:
  - create
  - get
  - list
  - watch
  - update
  - patch
  - delete
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: argo-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: argo-role
subjects:
- kind: ServiceAccount
  name: argo
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: argo-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: argo-cluster-role
subjects:
- kind: ServiceAccount
  name: argo
  namespace: argo
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: argo-server-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: argo-server-cluster-role
subjects:
- kind: ServiceAccount
  name: argo-server
  namespace: argo
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: workflow-controller-configmap
---
apiVersion: v1
kind: Service
metadata:
  name: argo-server
spec:
  externalIPs:               # 暴露Service到外部IP
  - 192.168.1.146                # IP
  ports:
  - name: web
    port: 2746
    targetPort: 2746
  selector:
    app: argo-server
---
apiVersion: v1
kind: Service
metadata:
  name: workflow-controller-metrics
spec:
  ports:
  - name: metrics
    port: 9090
    protocol: TCP
    targetPort: 9090
  selector:
    app: workflow-controller
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: argo-server
spec:
  selector:
    matchLabels:
      app: argo-server
  template:
    metadata:
      labels:
        app: argo-server
    spec:
      containers:
      - args:
        - server
        image: argoproj/argocli:v2.11.7
        name: argo-server
        ports:
        - containerPort: 2746
          name: web
        readinessProbe:
          httpGet:
            path: /
            port: 2746
            scheme: HTTP
          initialDelaySeconds: 10
          periodSeconds: 20
        volumeMounts:
        - mountPath: /tmp
          name: tmp
      nodeSelector:
        kubernetes.io/os: linux
      serviceAccountName: argo-server
      volumes:
      - emptyDir: {}
        name: tmp
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: workflow-controller
spec:
  selector:
    matchLabels:
      app: workflow-controller
  template:
    metadata:
      labels:
        app: workflow-controller
    spec:
      containers:
      - args:
        - --configmap
        - workflow-controller-configmap
        - --executor-image
        - argoproj/argoexec:v2.11.7
        command:
        - workflow-controller
        image: argoproj/workflow-controller:v2.11.7
        name: workflow-controller
      nodeSelector:
        kubernetes.io/os: linux
      serviceAccountName: argo

第三步 安装 argo-cli

# Download the binary
curl -sLO https://github.com/argoproj/argo/releases/download/v2.11.7/argo-darwin-amd64.gz

# Unzip
gunzip argo-darwin-amd64.gz

# Make binary executable
chmod +x argo-darwin-amd64

# Move binary to path
mv ./argo-darwin-amd64 /usr/local/bin/argo

# Test installation
argo version

如出现已下输出则 argo-cli 安装成功 我安装的是老版本 不用担心 是向下兼容的

然后使用 kubectl get all -n argo 查看 argo-server 的启动情况

启动完毕后,我们可以访问暴露出来的 argo-server-ui 界面访问 argo-dashboard 默认端口 2746

遇到的问题

安装完 argo 并且提交工作流后,发现工作流执行不成功,查看日志发现

这是因为默认的工作流 pod 是在默认的 namespace 也就是 default 下执行的,而这个 namespace 下名为 default 的默认的 serviceAccount 不具备操作资源的权限,则我们可以给他绑定权限

kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  namespace: default
  name: pod-reader
rules:
- apiGroups:
  - ""
  resources:
  - configmaps
  verbs:
  - get
  - watch
  - list
- apiGroups:
  - "batch"
  resources:
  - jobs
  verbs:
  - get
  - watch
  - list
  - create
- apiGroups:
  - ""
  resources:
  - secrets
  verbs:
  - get
- apiGroups:
  - ""
  resources:
  - pods
  - pods/exec
  - pods/log
  verbs:
  - get
  - list
  - watch
  - delete
  - patch
- apiGroups:
  - ""
  resources:
  - events
  verbs:
  - watch
  - create
  - patch
- apiGroups:
  - ""
  resources:
  - secrets
  - serviceaccounts
  verbs:
  - get
- apiGroups:
  - argoproj.io
  resources:
  - workflows
  - workfloweventbindings
  - workflowtemplates
  - cronworkflows
  - clusterworkflowtemplates
  verbs:
  - create
  - get
  - list
  - watch
  - update
  - patch
  - delete
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: pod-reader-pod
  namespace: default
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: pod-reader
subjects:
- kind: ServiceAccount
  name: default
  namespace: default

然后执行 kubectl apply -f role.yaml 就可以解决这个问题

我们也可以在创建工作流的 yaml 指定我们创建好的有权限的 serviceAccount

Hello World

我们可以使用官网的例子简单的开始第一个 Workflow,一个 Workflow 基本也是这样的构造

  • generateName: workflow 会在 k8s 环境内产生一个 job 来执行 workflow(job 指的是 k8s 中一定会结束的任务),然后 job 则会产生以 generateName 规定的字符为前缀的 pod(比如上面的例子,它会产生如 whalesay-abcde 字样的 pod)
  • entrypoint:这里规定了一个入口,即我们的 workflow 会以哪一个模板作为第一个模板来启动
  • templates:定义了 templates,templates 是 Argo 中比较重要的一块,我们的 Workflow 运行都是基于各种 template

argo-helloworld

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: whalesay-
spec:
  entrypoint: whalesay
 
  templates:
  - name: whalesay              # name of the template
    container:
      image: docker/whalesay
      command: [cowsay]
      args: ["hello world"]

以上是一个官网给出的 helloworld 样例。它规定了 workflow 会调用一个 docker/whalesay 的容器来打印 helloworld。使用以下命令可以启动这个 Workflow

argo submit helloworld.yaml

就可以看到 workflow 的启动,登录 ui 也可以看到此时有 workflow 的执行,打印 helloworld

查看执行结果

使用 argo watch 持续监控工作流状态

使用 argo logs 查看日志输出

步骤操作

步骤操作包含了比较多的模块类型,它提供了各种常见的对于步骤之间的操作,基本满足了我们对于步骤间操作的需求

container

这也是一个最常见的 templates 类型,它会创建一个容器,然后使用容器来完成我们的任务

- name: whalesay              # name of the template
  container:
    image: docker/whalesay
    command: [cowsay]
    args: ["hello world"]

script

有时我们只希望我们的模板来运行一个脚本,那么 Argo 提供了 Scripts 来让我们运行脚本。

Script 允许我们使用 source 标签来创建一个脚本(临时文件),然后这个临时文件的名称将会作为参数传递给 command 来执行。

使用 script,会将运行脚本的标准输出分配给输出参数 result,让其他的步骤来调用

针对不同的开发着,我们可以用不同的镜像来支持不同的执行方式

# shell脚本
- name: gen-random-int-bash
  script:
    image: debian:9.4
    command: [bash]
    source: |                                         # Contents of the here-script
      cat /dev/urandom | od -N2 -An -i | awk -v f=1 -v r=100 '{printf "%i\n", f + r * $1 / 65536}'
 
# python脚本
- name: gen-random-int-python
  script:
    image: python:alpine3.6
    command: [python]
    source: |
      import random
      i = random.randint(1, 100)
      print(i)
 
# js脚本
- name: gen-random-int-javascript
  script:
    image: node:9.1-alpine
    command: [node]
    source: |
      var rand = Math.floor(Math.random() * 100);
      console.log(rand);

steps

steps 规定了执行的步骤,有并行也有串行模式,它以双横杠(- -)的形式来定义串行,然后以单横杠的形式来定义并行

- name: hello-hello-hello
  steps:
  - - name: hello1            # hello1 is run before the following steps
      template: whalesay
      arguments:
        parameters:
        - name: message
          value: "hello1"
  - - name: hello2a           # double dash => run after previous step
      template: whalesay
      arguments:
        parameters:
        - name: message
          value: "hello2a"
    - name: hello2b           # single dash => run in parallel with previous step
      template: whalesay
      arguments:
        parameters:
        - name: message
          value: "hello2b"

dag

DAG 是一个有向无环图,Argo 使用 DAG 来定义一些比较复杂的 workflow 关系

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: dag-diamond-
spec:
  entrypoint: diamond
  templates:
  - name: echo
    inputs:
      parameters:
      - name: message
    container:
      image: alpine:3.7
      command: [echo, "{{inputs.parameters.message}}"]
  - name: diamond
    dag:
      tasks:
      - name: A
        template: echo
        arguments:
          parameters: [{name: message, value: A}]
      - name: B
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: B}]
      - name: C
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: C}]
      - name: D
        dependencies: [B, C]
        template: echo
        arguments:
          parameters: [{name: message, value: D}]

如上所示,上面的 dag 定义了一个钻石类型的图 A -> (B C) -> D

loop

使用 loop 我们可以定义循环

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-
spec:
  entrypoint: loop-example
  templates:
  - name: loop-example
    steps:
    - - name: print-message
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "{{item}}"
        withItems:              # invoke whalesay once for each item in parallel
        - hello world           # item 1
        - goodbye world         # item 2

  - name: whalesay
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay:latest
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]

这个例子中,我们通过 withItems 传入了两个参数,然后 Workflow 就会并行执行这个 templates 两次,依次使用我们给出的参数

当然我们也可以动态的使用循环,循环的参数基于其他模板来控制

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-param-result-
spec:
  entrypoint: loop-param-result-example
  templates:
  - name: loop-param-result-example
    steps:
    - - name: generate
        template: gen-number-list
    # Iterate over the list of numbers generated by the generate step above
    - - name: sleep
        template: sleep-n-sec
        arguments:
          parameters:
          - name: seconds
            value: "{{item}}"
        withParam: "{{steps.generate.outputs.result}}"

  # Generate a list of numbers in JSON format
  - name: gen-number-list
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import json
        import sys
        json.dump([i for i in range(20, 31)], sys.stdout)

  - name: sleep-n-sec
    inputs:
      parameters:
      - name: seconds
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo sleeping for {{inputs.parameters.seconds}} seconds; sleep {{inputs.parameters.seconds}}; echo done"]

条件控制

条件的控制需要用到 when 的关键字

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: coinflip-
spec:
  entrypoint: coinflip
  templates:
  - name: coinflip
    steps:
    # flip a coin
    - - name: flip-coin
        template: flip-coin
    # evaluate the result in parallel
    - - name: heads
        template: heads                 # call heads template if "heads"
        when: "{{steps.flip-coin.outputs.result}} == heads"
      - name: tails
        template: tails                 # call tails template if "tails"
        when: "{{steps.flip-coin.outputs.result}} == tails"

  # Return heads or tails based on a random number
  - name: flip-coin
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import random
        result = "heads" if random.randint(0,1) == 0 else "tails"
        print(result)

  - name: heads
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads\""]

  - name: tails
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was tails\""]

在这个例子中,Workflow 通过 when 来判断第一步获取的值是 head 还是 tails,根据获取的值来条件判断下一步会执行的步骤

重试

重试模块会定义如果 Job 执行出现 failures 或 errors 时的情况,

  • limit:指重试的最大次数
  • retryOn:指定重启策略
    • Always: errors 和 failures 时重启
    • OnFailure: failures 时重启,默认采用
    • OnError: error 时重启
  • backoff:定义重启的一些参数
# This example demonstrates the use of retry back offs
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: retry-backoff-
spec:
  entrypoint: retry-backoff
  templates:
  - name: retry-backoff
    retryStrategy:
      limit: 10
      retryPolicy: "Always"
      backoff:
        duration: "1"      # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
        factor: 2
        maxDuration: "1m"  # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
    container:
      image: python:alpine3.6
      command: ["python", -c]
      # fail with a 66% probability
      args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]

以上的例子定义了一个会重启的任务

递归

argo 也是支持递归的,我们只要将步骤的下一步指定为本步骤即可

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: coinflip-recursive-
spec:
  entrypoint: coinflip
  templates:
  - name: coinflip
    steps:
    # flip a coin
    - - name: flip-coin
        template: flip-coin
    # evaluate the result in parallel
    - - name: heads
        template: heads                 # call heads template if "heads"
        when: "{{steps.flip-coin.outputs.result}} == heads"
      - name: tails                     # keep flipping coins if "tails"
        template: coinflip
        when: "{{steps.flip-coin.outputs.result}} == tails"

  - name: flip-coin
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import random
        result = "heads" if random.randint(0,1) == 0 else "tails"
        print(result)

  - name: heads
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads\""]

在以上的步骤中,step 执行会做一个判断,如果一直是反面,会递归执行直到为正面

Exit handlers

Argo 可以定义一个出口,它会直接退出工作流,无论成功与否。

常见于:

  • 工作流运行后清理
  • 发送工作流状态的通知(例如,电子邮件/ Slack)
  • 将通过/失败状态发布到 webhook 结果(例如 GitHub 构建结果)
  • 重新提交或提交另一个工作流程
spec:
  entrypoint: intentional-fail
  onExit: exit-handler                  # invoke exit-hander template at end of the workflow
  templates:
  - name: exit-handler
    steps:
    - - name: notify
        template: send-email
      - name: celebrate
        template: celebrate
        when: "{{workflow.status}} == Succeeded"
      - name: cry
        template: cry
        when: "{{workflow.status}} != Succeeded"

在这里使用 Onexit 参数指定了结束模板,则执行 exit-handler 这个模板时,无论是否成功,都会直接结束

timeout

Argo 定义了一个超时的限定,如果容器超时了,则直接结束 Job

- name: sleep
  container:
    image: alpine:latest
    command: [sh, -c]
    args: ["echo sleeping for 1m; sleep 60; echo done"]
  activeDeadlineSeconds: 10

suspending

- name: approve
  suspend: {}
 
- name: delay
  suspend:
    duration: 20

如果我们使用了 duration 关键字,它会等待 20 秒后才唤醒执行

argo resume WORKFLOW    # 这里的Workflow是指workflow的pod容器名

参数操作

参数传递对于工作流也是一个比较关键的问题。对于工作流来说,不同 tempaltes 之间的传递,是通过 jinja 来定义。目前 Argo 只接受以下几种前缀

  • item
  • steps
  • inputs
  • outputs
  • workflow
  • tasks

parameters

通常的参数传递是通过 parameters 关键字来定义的

- name: whalesay
  inputs:
    parameters:
    - name: message       # parameter declaration
  container:
    # run cowsay with that message input parameter as args
    image: docker/whalesay
    command: [cowsay]
    args: ["{{inputs.parameters.message}}"]

arguments

我们可以通过 arguments 关键字来定义一个全局参数

entrypoint: whalesay
arguments:
  parameters:
  - name: message
    value: hello world
  - name: os-list                                     # a list of items
    value: |
      [
        { "image": "debian", "tag": "9.1" },
        { "image": "debian", "tag": "8.9" },
        { "image": "alpine", "tag": "3.6" },
        { "image": "ubuntu", "tag": "17.10" }
      ]

在启动任务时,我们可以通过-p 的参数来做实际参数,如果没有指定,则会使用默认参数(argument 中定义)

argo submit arguments.yaml -p messgae="helloworld" -p oslist=[{ "image": "ubuntu", "tag": "17.10" }]

step 间的传参

对于 step 间的传参,是通过 step 关键字来定义的

- name: test
  steps:
  - - name: A
      template: A
  - - name: B
      template: B
      when: "\"{{steps.A.outputs.result}}\" == \"B\""
    - name: C
      template: C
      when: "\"{{steps.A.outputs.result}}\" == \"C\""

在以上例子中,定义了一个 step,它会首先执行 A 步骤,然后根据启动结果,如果输出是 B,则执行 B 步骤,否则执行 C 步骤

dag 间的传参

对于 Dag,其实它与 step 是十分相似的

dag:
  tasks:
  - name: ip
    template: param
    arguments:
      parameters: [{name: request, value: "ip"}, {name: ip, value: "{{inputs.parameters.ip}}"}]
  - name: port
    template: param
    arguments:
      parameters: [{name: request, value: "port"}, {name: ip, value: "{{inputs.parameters.ip}}"}]
  - name: username
    template: param
    arguments:
      parameters: [{name: request, value: "username"}, {name: ip, value: "{{inputs.parameters.ip}}"}]
  - name: password
    template: param
    arguments:
      parameters: [{name: request, value: "password"}, {name: ip, value: "{{inputs.parameters.ip}}"}]
  - name: server
    template: server
    dependencies: [ip, port, username, password]
    arguments:
      parameters:
      - name: ip
        value: "{{tasks.ip.outputs.result}}"
      - name: password
        value: "{{tasks.password.outputs.result}}"
      - name: username
        value: "{{tasks.username.outputs.result}}"
      - name: port
        value: "{{tasks.port.outputs.result}}"

在上面的例子中,dag 并行执行(ip, port, username, password)四个步骤,然后将执行的结果传递给 server 模块,然后 server 模块会以这四个参数来完成工作。

script result

当我们运行一个 script 时,运行的标准输出会以 result 的方式来传递

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: scripts-bash-
spec:
  entrypoint: bash-script-example
  templates:
  - name: bash-script-example
    steps:
    - - name: generate
        template: gen-random-int-bash
    - - name: print
        template: print-message
        arguments:
          parameters:
          - name: message
            value: "{{steps.generate.outputs.result}}"  # The result of the here-script

  - name: gen-random-int-bash
    script:
      image: debian:9.4
      command: [bash]
      source: |                                         # Contents of the here-script
        cat /dev/urandom | od -N2 -An -i | awk -v f=1 -v r=100 '{printf "%i\n", f + r * $1 / 65536}'

  - name: gen-random-int-python
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import random
        i = random.randint(1, 100)
        print(i)

  - name: gen-random-int-javascript
    script:
      image: node:9.1-alpine
      command: [node]
      source: |
        var rand = Math.floor(Math.random() * 100);
        console.log(rand);

  - name: print-message
    inputs:
      parameters:
      - name: message
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo result was: {{inputs.parameters.message}}"]

比如上面的例子,generate 这个模块执行完成之后,print 模块会获取 generate 模块的输出结果作为参数来执行

output Parameters

我们可以使用步骤的输出作为参数传递

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: output-parameter-
spec:
  entrypoint: output-parameter
  templates:
  - name: output-parameter
    steps:
    - - name: generate-parameter
        template: whalesay
    - - name: consume-parameter
        template: print-message
        arguments:
          parameters:
          # Pass the hello-param output from the generate-parameter step as the message input to print-message
          - name: message
            value: "{{steps.generate-parameter.outputs.parameters.hello-param}}"

  - name: whalesay
    container:
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["echo -n hello world > /tmp/hello_world.txt"]  # generate the content of hello_world.txt
    outputs:
      parameters:
      - name: hello-param       # name of output parameter
        valueFrom:
          path: /tmp/hello_world.txt    # set the value of hello-param to the contents of this hello-world.txt

  - name: print-message
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay:latest
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]

在上面的例子中,whalesay 模块会将执行结果打印到 hello-world.txt 文本,然后将这个文本的内容定为输出结果然后 consume-parameter 模块会去获取 whalesay 模块的输出结果做为输入参数

资源操作

Argo 可以操作的资源也有很多,它不仅仅是能操作容器,kubernetes 的资源、容器资源、计算资源等也均可调配

Secret

volumes:
  - name: my-secret-vol
    secret:
      secretName: my-secret     # name of an existing k8s secret
templates:
  - name: whalesay
    container:
      image: alpine:3.7
      command: [sh, -c]
      args: ['
        echo "secret from env: $MYSECRETPASSWORD";
        echo "secret from file: `cat /secret/mountpath/mypassword`"
      ']
      env:
      - name: MYSECRETPASSWORD  # name of env var
        valueFrom:
          secretKeyRef:
            name: my-secret     # name of an existing k8s secret
            key: mypassword     # 'key' subcomponent of the secret
      volumeMounts:
      - name: my-secret-vol     # mount file containing secret at /secret/mountpath
        mountPath: "/secret/mountpath"

在这里,我们使用了 Secret 作为一个 volume 供模板调用

daemon

使用守护进程可以使得作业在后台运行,他们的存在使得有的工作可以跨工作流运行

- name: influxdb
  daemon: true                        # start influxdb as a daemon
  retryStrategy:
    limit: 10                         # retry container if it fails
  container:
    image: influxdb:1.2
    readinessProbe:                   # wait for readinessProbe to succeed
      httpGet:
        path: /ping
        port: 8086

在这里使用了 daemon: true 来开启 daemon,以保护 influxdb 持续运行

sidecar

边车模式,指著容器在同一容器中同时执行另一个容器来支持主容器的工作。Argo 也支持边车模式,可以启动一个辅助容器来协助作业的进行

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: sidecar-nginx-
spec:
  entrypoint: sidecar-nginx-example
  templates:
  - name: sidecar-nginx-example
    container:
      image: appropriate/curl
      command: [sh, -c]
      # Try to read from nginx web server until it comes up
      args: ["until `curl -G 'http://127.0.0.1/' >& /tmp/out`; do echo sleep && sleep 1; done && cat /tmp/out"]
    # Create a simple nginx web server
    sidecars:
    - name: nginx
      image: nginx:1.13

Artifacts

在 Argo 中也有集成一些比较常用的库,比如 http 和 git

templates:
- name: hardwired-artifact
  inputs:
    artifacts:
    # Check out the master branch of the argo repo and place it at /src
    # revision can be anything that git checkout accepts: branch, commit, tag, etc.
    - name: argo-source
      path: /src
      git:
        repo: https://github.com/argoproj/argo.git
        revision: "master"
    # Download kubectl 1.8.0 and place it at /bin/kubectl
    - name: kubectl
      path: /bin/kubectl
      mode: 0755
      http:
        url: https://storage.googleapis.com/kubernetes-release/release/v1.8.0/bin/linux/amd64/kubectl

Resource

Argo 可以操作 k8s 资源 创建一个 job

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: k8s-jobs-
spec:
  entrypoint: pi-tmpl
  templates:
  - name: pi-tmpl
    resource:                   # indicates that this is a resource template
      action: create            # can be any kubectl action (e.g. create, delete, apply, patch)
      successCondition: status.succeeded > 0
      failureCondition: status.failed > 3
      manifest: |               #put your kubernetes spec here
        apiVersion: batch/v1
        kind: Job
        metadata:
          generateName: pi-job-
        spec:
          template:
            metadata:
              name: pi
            spec:
              containers:
              - name: pi
                image: perl
                command: ["perl",  "-Mbignum=bpi", "-wle", "print bpi(2000)"]
              restartPolicy: Never
          backoffLimit: 4

可以使用以下 Argo Workflow 修改此 Crontab:

apiVersion: "stable.example.com/v1"
kind: CronTab
spec:
  cronSpec: "* * * * */5"
  image: my-awesome-cron-image
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: k8s-patch-
spec:
  entrypoint: cront-tmpl
  templates:
  - name: cront-tmpl
    resource:
      action: patch
      mergeStrategy: merge                 # Must be one of [strategic merge json]
      manifest: |
        apiVersion: "stable.example.com/v1"
        kind: CronTab
        spec:
          cronSpec: "* * * * */10"
          image: my-awesome-cron-image

docker

Argo 实现了 Docker in Docker 的形式

- name: dind-sidecar-example
  container:
    image: docker:17.10
    command: [sh, -c]
    args: ["until docker ps; do sleep 3; done; docker run --rm debian:latest cat /etc/os-release"]
    env:
    - name: DOCKER_HOST               # the docker daemon can be access on the standard port on localhost
      value: 127.0.0.1
  sidecars:
  - name: dind
    image: docker:17.10-dind          # Docker already provides an image for running a Docker daemon
    securityContext:
      privileged: true                # the Docker daemon can only run in a privileged container
    # mirrorVolumeMounts will mount the same volumes specified in the main container
    # to the sidecar (including artifacts), at the same mountPaths. This enables
    # dind daemon to (partially) see the same filesystem as the main container in
    # order to use features such as docker volume binding.
    mirrorVolumeMounts: true

以上工作流实现了主容器运行等待容器创建,边车容器启动一个容器并将这个容器镜像交给主容器资源

volumes

在 Argo 中,我们也可以直接传递容器卷,方便处理大量数据

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: volumes-pvc-
spec:
  entrypoint: volumes-pvc-example
  volumeClaimTemplates:                 # define volume, same syntax as k8s Pod spec
  - metadata:
      name: workdir                     # name of volume claim
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 1Gi                  # Gi => 1024 * 1024 * 1024

  templates:
  - name: volumes-pvc-example
    steps:
    - - name: generate
        template: whalesay
    - - name: print
        template: print-message

  - name: whalesay
    container:
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt"]
      # Mount workdir volume at /mnt/vol before invoking docker/whalesay
      volumeMounts:                     # same syntax as k8s Pod spec
      - name: workdir
        mountPath: /mnt/vol

  - name: print-message
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo getting message from volume; find /mnt/vol; cat /mnt/vol/hello_world.txt"]
      # Mount workdir volume at /mnt/vol before invoking docker/whalesay
      volumeMounts:                     # same syntax as k8s Pod spec
      - name: workdir
        mountPath: /mnt/vol

在上面的例子中,workflow 初始化了一个容器卷,然后下面的 whalesay 和 print-message 模块都调用了这个容器卷

不过大多数情况下,我们会去调用一个已存在的卷

# Define Kubernetes PVC
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: my-existing-volume
spec:
  accessModes: [ "ReadWriteOnce" ]
  resources:
    requests:
      storage: 1Gi
 
---
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: volumes-existing-
spec:
  entrypoint: volumes-existing-example
  volumes:
  # Pass my-existing-volume as an argument to the volumes-existing-example template
  # Same syntax as k8s Pod spec
  - name: workdir
    persistentVolumeClaim:
      claimName: my-existing-volume
 
  templates:
  - name: volumes-existing-example
    steps:
    - - name: generate
        template: whalesay
    - - name: print
        template: print-message
 
  - name: whalesay
    container:
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt"]
      volumeMounts:
      - name: workdir
        mountPath: /mnt/vol
 
  - name: print-message
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo getting message from volume; find /mnt/vol; cat /mnt/vol/hello_world.txt"]
      volumeMounts:
      - name: workdir
        mountPath: /mnt/vol

在上面的例子中,我们外部已经定义了一个 pvc,然后在 workflow 中,我们通过声明一个 pvc 为卷来调用它

总结

argo 是一个云原生的基于 k8s 的工作流引擎,如果基础环境是 k8s 的话,不管是 ci/cd 还是其他工作流用途,argo 都是非常好的选择,上手非常简单,使用 yaml 作为模板语法 与 k8s 几乎一模一样。

  • Argo
    1 引用 • 1 关注
  • Kubernetes

    Kubernetes 是 Google 开源的一个容器编排引擎,它支持自动化部署、大规模可伸缩、应用容器化管理。

    110 引用 • 54 回帖
1 操作
Gakkiyomi2019 在 2020-11-05 10:06:17 更新了该帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...