Argo Workflows v3.3 发布,支持插件、调试模式、多租户,修改默认执行器,引入新 Python SDK
- 新特性:插件模板允许任何开发人员使用任何语言编写扩展到他们的工作流
- 新特性:使用工作流钩子基于条件执行模板
- 新的 SDK: Hera 是一个新的用于指定 Argo 工作流的 Python SDK
- 新特性:使用
ARGO_DEBUG_PAUSE
将任务置于调试模式 - 增强:Pod 名称包括模板名称
- 增强:多租户支持
SSO RBAC
- 增强:将默认执行器更改为
Emissary
- 增强:Java 和 Python 客户端库加入了核心的 Argo 工作流代码库
插件模板
目前,工作流中的每个任务要么运行一个 pod(例如“容器”或“脚本”),要么发出一个 HTTP 请求。插件模板允许您编写自己的 HTTP 服务器,插入您的任何工作流来完成任务。
插件的一大好处是,你不需要学习 Golang,也不需要等待 Argo 团队添加功能。你可以自己用 Python 来做,然后立即部署它,这样你就可以在工作流程中使用它了。
插件有很多用例:
- 发送 Slack 或电子邮件信息
- 更新 Tello board
- 启动 Spark EMR 或 Tekton job
- 与 Airflow 或任何类似的系统集成
- 向报表系统发送数据
插件被实现为 HTTP 服务器。下面是一个用 Python 编写的发送 Slack 消息的程序:
代码语言:javascript复制import json
import os
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.request import urlopen, Request
class Plugin(BaseHTTPRequestHandler):
def args(self):
return json.loads(self.rfile.read(int(self.headers.get('Content-Length'))))
def reply(self, reply):
self.send_response(200)
self.end_headers()
self.wfile.write(json.dumps(reply).encode("UTF-8"))
def unsupported(self):
self.send_response(404)
self.end_headers()
def do_POST(self):
if self.path == '/api/v1/template.execute':
args = self.args()
if 'slack' in args['template'].get('plugin', {}):
x = urlopen(
Request(os.getenv('URL'),
data=json.dumps({'text': args['template']['plugin']['slack']['text']}).encode()))
if x.status != 200:
raise Exception("not 200")
self.reply({'node': {'phase': 'Succeeded', 'message': 'Slack message sent'}})
else:
self.reply({})
else:
self.unsupported()
if __name__ == '__main__':
httpd = HTTPServer(('', 7522), Plugin)
httpd.serve_forever()
一旦你写了一个插件,就可以把插件打包成一个 configmap。用kubectl apply
安装它,会自动加载插件:
argo executor-plugin build ./slack-plugin
kubectl apply ./slack-plugin/slack-executor-plugin-configmap.yaml
最后,你可以使用新插件运行工作流:
代码语言:javascript复制apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: slack-example-
spec:
entrypoint: main
templates:
- name: main
plugin:
slack:
text: "{{workflow.name}} finished!"
插件将改变用户使用 Argo 工作流构建平台的方式。了解更多关于插件模板在文档[1]。
Workflow Hooks
工作流钩子在满足配置的表达式时执行模板。工作流钩子就像带有条件的退出处理程序。钩子可以在工作流级和模板级配置。
钩子可以用来根据工作流状态的改变或步骤/任务状态的改变来配置通知,就像下面的例子:
代码语言:javascript复制apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: lifecycle-hook-
spec:
entrypoint: main
hooks:
exit:
template: http
running:
expression: workflow.status == "Running"
template: http
templates:
- name: main
steps:
- - name: step1
template: heads
- name: heads
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was heads\""]
- name: http
http:
url: [http://dummy.restapiexample.com/api/v1/employees](http://dummy.restapiexample.com/api/v1/employees "http://dummy.restapiexample.com/api/v1/employees")
Hera: Argo 工作流的一个新的 Python SDK
Hera (Hera -workflow[2])是一个新的高效的 SDK,用于在 Python 中指定 Argo 工作流。Hera 的目标是为 Python 开发人员提供一种更简单的方式来构建和提交实验工作流,特别是机器学习。
Hera 是围绕 Argo 工作流的两个核心概念构建的:
Task
:保存用于远程执行的 Python 函数的对象工作流
:任务的集合
下面是一个使用 Hera 的 DAG 工作流示例:
代码语言:javascript复制from hera.task import Task
from hera.workflow import Workflow
from hera.workflow_service import WorkflowService
def say(message: str):
"""
This can be anything as long as the Docker image satisfies the dependencies. You can import anything Python
that is in your container e.g torch, tensorflow, scipy, biopython, etc - just provide an image to the task!
"""
print(message)
ws = WorkflowService('my-argo-domain.com', 'my-argo-server-token')
w = Workflow('diamond', ws)
a = Task('A', say, [{'message': 'This is task A!'}])
b = Task('B', say, [{'message': 'This is task B!'}])
c = Task('C', say, [{'message': 'This is task C!'}])
d = Task('D', say, [{'message': 'This is task D!'}])
a.next(b).next(d) # a >> b >> d
a.next(c).next(d) # a >> c >> dw.add_tasks(a, b, c, d)
w.submit()
调试暂停
许多用户都要求改进调试功能。到目前为止,还不能将任务设置为调试模式。现在有了ARGO_DEBUG_PAUSE
, Argo 将暂停你的任务执行器,这样你就可以调试它。指定一些环境变量,选择是否在任务之前或之后暂停,然后将kubctl exec
放入容器以调试它。
Pod 名称包括模板名称
在 v3.2 中,pod 名称是通过接受工作流名称并根据任务 ID 添加散列后缀来生成的。在 v3.3 中,pod 名称还包含模板的名称。这使得在使用kubectl get pod
时,更容易看到哪个 pod 是哪个任务:
Before (v1):
代码语言:javascript复制NAME READY STATUS AGE
coinflip-jjzd8–1241984900 0/2 Completed 0
coinflip-jjzd8–2544588297 0/2 Completed 0
After (v2):
代码语言:javascript复制NAME READY STATUS RESTARTS
coinflip-lg6w4-flip-coin-1886328558 0/2 Completed 0
coinflip-lg6w4-heads-661049787 0/2 Completed 0
这个特性是可选择的。增加POD_NAMES=v2
标志启动控制器.
SSO RBAC Namespace Delegation
在 v3.2 中,必须在argo
Namespace 中设置 SSO RBAC 特性。这对于小团队来说很有效。但是,在每个团队都有自己的 Namespace 的多租户系统中,这可能会变得笨拙。
在 v3.3 中,我们支持在user
Namespace 中设置 RBAC。这个更改允许每个团队设置自己的 RBAC,当有许多团队时,可以更容易地管理 RBAC。
将默认执行器更改为 Emissary
Kubernetes 对 Docker 的支持正在消失见之前的帖子[3]。我们将用Argo Emissary 执行器[4]取代它。
Emissary 执行器提供几个优势:
- 比现有的执行器更安全
- 比现有的执行器更快,甚至比 PNS(Process Namespace Sharing)执行器[5]更快
- 支持 ContainerSet 模板(允许您运行更快的步骤和降低成本)
支持新的“调试暂停”功能(帮助调试工作流程中的容器)
支持 Java 和 Python 客户端库
您可能已经使用了社区维护的客户端库之一,将 Argo 工作流集成到您的应用程序中。然而,保持这些最新的总是一个挑战。
现在,我们在核心代码库中包含了 Java 和 Python 客户端库,这样它们就可以与 Argo 工作流同步进行维护和发布。我们的目标是确保它们始终是最新的和功能齐全的。
在 Github 这里找到这些Argo sdk[6]。
升级到 v3.3
查看GitHub 上最新的 Argo 工作流版本[7]。
在升级到 3.3 版本之前,请确保在这里查看所有更改[8]。