Argo Workflows v3.3 发布 支持插件、多租户、调试模式

2023-03-19 16:01:04 浏览数 (2)

Argo Workflows v3.3 发布,支持插件、调试模式、多租户,修改默认执行器,引入新 Python SDK

  • 新特性:插件模板允许任何开发人员使用任何语言编写扩展到他们的工作流
  • 新特性:使用工作流钩子基于条件执行模板
  • 新的 SDK: Hera 是一个新的用于指定 Argo 工作流的 Python SDK
  • 新特性:使用ARGO_DEBUG_PAUSE将任务置于调试模式
  • 增强:Pod 名称包括模板名称
  • 增强:多租户支持SSO RBAC
  • 增强:将默认执行器更改为Emissary
  • 增强:Java 和 Python 客户端库加入了核心的 Argo 工作流代码库

插件模板

目前,工作流中的每个任务要么运行一个 pod(例如“容器”或“脚本”),要么发出一个 HTTP 请求。插件模板允许您编写自己的 HTTP 服务器,插入您的任何工作流来完成任务。

插件的一大好处是,你不需要学习 Golang,也不需要等待 Argo 团队添加功能。你可以自己用 Python 来做,然后立即部署它,这样你就可以在工作流程中使用它了。

插件有很多用例:

  • 发送 Slack 或电子邮件信息
  • 更新 Tello board
  • 启动 Spark EMR 或 Tekton job
  • 与 Airflow 或任何类似的系统集成
  • 向报表系统发送数据

插件被实现为 HTTP 服务器。下面是一个用 Python 编写的发送 Slack 消息的程序:

代码语言:javascript复制
import json
import os
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.request import urlopen, Request


class Plugin(BaseHTTPRequestHandler):

    def args(self):
        return json.loads(self.rfile.read(int(self.headers.get('Content-Length'))))

    def reply(self, reply):
        self.send_response(200)
        self.end_headers()
        self.wfile.write(json.dumps(reply).encode("UTF-8"))

    def unsupported(self):
        self.send_response(404)
        self.end_headers()

    def do_POST(self):
        if self.path == '/api/v1/template.execute':
            args = self.args()
            if 'slack' in args['template'].get('plugin', {}):
                x = urlopen(
                    Request(os.getenv('URL'),
                            data=json.dumps({'text': args['template']['plugin']['slack']['text']}).encode()))
                if x.status != 200:
                    raise Exception("not 200")
                self.reply({'node': {'phase': 'Succeeded', 'message': 'Slack message sent'}})
            else:
                self.reply({})
        else:
            self.unsupported()


if __name__ == '__main__':
    httpd = HTTPServer(('', 7522), Plugin)
    httpd.serve_forever()

一旦你写了一个插件,就可以把插件打包成一个 configmap。用kubectl apply安装它,会自动加载插件:

代码语言:javascript复制
argo executor-plugin build ./slack-plugin
kubectl apply ./slack-plugin/slack-executor-plugin-configmap.yaml

最后,你可以使用新插件运行工作流:

代码语言:javascript复制
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
   generateName: slack-example-
spec:
   entrypoint: main
   templates:
     - name: main
       plugin:
         slack:
           text: "{{workflow.name}} finished!"

插件将改变用户使用 Argo 工作流构建平台的方式。了解更多关于插件模板在文档[1]

Workflow Hooks

工作流钩子在满足配置的表达式时执行模板。工作流钩子就像带有条件的退出处理程序。钩子可以在工作流级和模板级配置。

钩子可以用来根据工作流状态的改变或步骤/任务状态的改变来配置通知,就像下面的例子:

代码语言:javascript复制
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
 generateName: lifecycle-hook-
spec:
 entrypoint: main
 hooks:
   exit:
     template: http
   running:
     expression: workflow.status == "Running"
     template: http
 templates:
   - name: main
     steps:
       - - name: step1
           template: heads

   - name: heads
     container:
       image: alpine:3.6
       command: [sh, -c]
       args: ["echo \"it was heads\""]

   - name: http
     http:
       url: [http://dummy.restapiexample.com/api/v1/employees](http://dummy.restapiexample.com/api/v1/employees "http://dummy.restapiexample.com/api/v1/employees")

Hera: Argo 工作流的一个新的 Python SDK

Hera (Hera -workflow[2])是一个新的高效的 SDK,用于在 Python 中指定 Argo 工作流。Hera 的目标是为 Python 开发人员提供一种更简单的方式来构建和提交实验工作流,特别是机器学习。

Hera 是围绕 Argo 工作流的两个核心概念构建的:

  • Task:保存用于远程执行的 Python 函数的对象
  • 工作流:任务的集合

下面是一个使用 Hera 的 DAG 工作流示例:

代码语言:javascript复制
from hera.task import Task
from hera.workflow import Workflow
from hera.workflow_service import WorkflowService
def say(message: str):
    """
    This can be anything as long as the Docker image satisfies the dependencies. You can import anything Python
    that is in your container e.g torch, tensorflow, scipy, biopython, etc - just provide an image to the task!
    """
    print(message)
ws = WorkflowService('my-argo-domain.com', 'my-argo-server-token')
w = Workflow('diamond', ws)
a = Task('A', say, [{'message': 'This is task A!'}])
b = Task('B', say, [{'message': 'This is task B!'}])
c = Task('C', say, [{'message': 'This is task C!'}])
d = Task('D', say, [{'message': 'This is task D!'}])
a.next(b).next(d)  # a >> b >> d
a.next(c).next(d)  # a >> c >> dw.add_tasks(a, b, c, d)
w.submit()

调试暂停

许多用户都要求改进调试功能。到目前为止,还不能将任务设置为调试模式。现在有了ARGO_DEBUG_PAUSE, Argo 将暂停你的任务执行器,这样你就可以调试它。指定一些环境变量,选择是否在任务之前或之后暂停,然后将kubctl exec放入容器以调试它。

Pod 名称包括模板名称

在 v3.2 中,pod 名称是通过接受工作流名称并根据任务 ID 添加散列后缀来生成的。在 v3.3 中,pod 名称还包含模板的名称。这使得在使用kubectl get pod时,更容易看到哪个 pod 是哪个任务:

Before (v1):

代码语言:javascript复制
NAME                     READY STATUS AGE
coinflip-jjzd8–1241984900 0/2 Completed 0
coinflip-jjzd8–2544588297 0/2 Completed 0

After (v2):

代码语言:javascript复制
NAME                               READY STATUS RESTARTS
coinflip-lg6w4-flip-coin-1886328558 0/2 Completed 0
coinflip-lg6w4-heads-661049787      0/2 Completed 0

这个特性是可选择的。增加POD_NAMES=v2标志启动控制器.

SSO RBAC Namespace Delegation

在 v3.2 中,必须在argo Namespace 中设置 SSO RBAC 特性。这对于小团队来说很有效。但是,在每个团队都有自己的 Namespace 的多租户系统中,这可能会变得笨拙。

在 v3.3 中,我们支持在user Namespace 中设置 RBAC。这个更改允许每个团队设置自己的 RBAC,当有许多团队时,可以更容易地管理 RBAC。

将默认执行器更改为 Emissary

Kubernetes 对 Docker 的支持正在消失见之前的帖子[3]。我们将用Argo Emissary 执行器[4]取代它。

Emissary 执行器提供几个优势:

  • 比现有的执行器更安全
  • 比现有的执行器更快,甚至比 PNS(Process Namespace Sharing)执行器[5]更快
  • 支持 ContainerSet 模板(允许您运行更快的步骤和降低成本)

支持新的“调试暂停”功能(帮助调试工作流程中的容器)

支持 Java 和 Python 客户端库

您可能已经使用了社区维护的客户端库之一,将 Argo 工作流集成到您的应用程序中。然而,保持这些最新的总是一个挑战。

现在,我们在核心代码库中包含了 Java 和 Python 客户端库,这样它们就可以与 Argo 工作流同步进行维护和发布。我们的目标是确保它们始终是最新的和功能齐全的。

在 Github 这里找到这些Argo sdk[6]

升级到 v3.3

查看GitHub 上最新的 Argo 工作流版本[7]

在升级到 3.3 版本之前,请确保在这里查看所有更改[8]

0 人点赞