Airflow 任务并发使用总结

2024-01-25 18:08:48 浏览数 (1)

之前有简单介绍过 Airflow ,参考Airflow 使用简单总结、Airflow 使用总结(二)、Airflow 使用——Variables, 最近一直在用 Airflow 处理调度任务涉及到了并发问题,任务的 graph 关系如下,图中每个方框是一个任务 task,标 N 的表示一次需要并发执行多个任务实例,比如 run_can、run_rk、run_sync 这些任务。但是我又希望同一时刻只有一个 pcd_2_mod 任务实例在运行,它需要显卡推理。

我的 airflow 配置是这样的

代码语言:javascript复制
with DAG(
        dag_id=f"DataGovernanceFrameSplitRewrite",
        default_args=default_args,
        tags=["DataGovernance"],
        schedule_interval=None,
        description="DataGovernanceFrameSplit",
        dagrun_timeout=timedelta(days=31),
        max_active_tasks=10,
        concurrency=10  # 设置concurrency为1,确保只能运行一个实例
) as dag:

    @task(executor_config=executor_config_20, task_concurrency=1)
    def pcd_2_mot():
        import pandas as pd
        root_dir = "/workspace/ldnet_cw_0.2/"
        BATCH_SIZE = 24

在这个配置中,有几个参数与并发相关:

max_active_tasks:

代码语言:javascript复制
max_active_tasks=10

作用范围:这个参数是应用于单个任务实例(Task Instance)的,而不是整个 DAG。

含义:它指定了一个任务实例能够同时存在于系统中的最大数量。当任务数量超过这个值时,Airflow会等待之前的任务实例完成,以确保不超过设定的最大并发数。这可以帮助避免系统资源被过多任务占用,保持系统的稳定性。

例子:如果 max_active_tasks=10,则同一任务在同一时刻最多有5个实例在运行,超过这个数量的实例会排队等待。

concurrency:

代码语言:javascript复制
concurrency=10

作用范围:这个参数是应用于整个 DAG 的,影响 DAG 中所有任务的并发性。

含义:它指定了在任何给定时刻可以在整个 DAG 中同时执行的任务实例的最大数量。这个参数对于控制整个 DAG 的并发级别非常有用,尤其是当 DAG 中包含多个任务时,可以确保整个 DAG 的运行不会消耗过多的系统资源。

例子:如果 concurrency=10,则在同一时刻整个 DAG 中最多允许10个任务实例同时运行。

总结一下,max_active_tasks 主要用于控制单个任务实例的并发性,而 concurrency 用于控制整个 DAG 中任务实例的总体并发性。

task_concurrency:

代码语言:javascript复制
@task(task_concurrency=1)

这是在定义具体任务(task)时使用的参数。task_concurrency 指定了该任务实例的并发度,即允许同时执行的相同任务的实例数量。在这里,设置为1,表示这个任务每次只能运行一个实例。

总之,max_active_tasks 控制单个Dag 实例的最大并发数量,concurrency 控制所有 DAG 实例中任务实例的总体并发数量,而 task_concurrency 控制特定任务的实例并发数量。

0 人点赞