之前有简单介绍过 Airflow ,参考Airflow 使用简单总结、Airflow 使用总结(二)、Airflow 使用——Variables, 最近一直在用 Airflow 处理调度任务涉及到了并发问题,任务的 graph 关系如下,图中每个方框是一个任务 task,标 N 的表示一次需要并发执行多个任务实例,比如 run_can、run_rk、run_sync 这些任务。但是我又希望同一时刻只有一个 pcd_2_mod 任务实例在运行,它需要显卡推理。
我的 airflow 配置是这样的
代码语言:javascript复制with DAG(
dag_id=f"DataGovernanceFrameSplitRewrite",
default_args=default_args,
tags=["DataGovernance"],
schedule_interval=None,
description="DataGovernanceFrameSplit",
dagrun_timeout=timedelta(days=31),
max_active_tasks=10,
concurrency=10 # 设置concurrency为1,确保只能运行一个实例
) as dag:
@task(executor_config=executor_config_20, task_concurrency=1)
def pcd_2_mot():
import pandas as pd
root_dir = "/workspace/ldnet_cw_0.2/"
BATCH_SIZE = 24
在这个配置中,有几个参数与并发相关:
max_active_tasks:
代码语言:javascript复制max_active_tasks=10
作用范围:这个参数是应用于单个任务实例(Task Instance)的,而不是整个 DAG。
含义:它指定了一个任务实例能够同时存在于系统中的最大数量。当任务数量超过这个值时,Airflow会等待之前的任务实例完成,以确保不超过设定的最大并发数。这可以帮助避免系统资源被过多任务占用,保持系统的稳定性。
例子:如果 max_active_tasks=10,则同一任务在同一时刻最多有5个实例在运行,超过这个数量的实例会排队等待。
concurrency:
代码语言:javascript复制concurrency=10
作用范围:这个参数是应用于整个 DAG 的,影响 DAG 中所有任务的并发性。
含义:它指定了在任何给定时刻可以在整个 DAG 中同时执行的任务实例的最大数量。这个参数对于控制整个 DAG 的并发级别非常有用,尤其是当 DAG 中包含多个任务时,可以确保整个 DAG 的运行不会消耗过多的系统资源。
例子:如果 concurrency=10,则在同一时刻整个 DAG 中最多允许10个任务实例同时运行。
总结一下,max_active_tasks 主要用于控制单个任务实例的并发性,而 concurrency 用于控制整个 DAG 中任务实例的总体并发性。
task_concurrency:
代码语言:javascript复制@task(task_concurrency=1)
这是在定义具体任务(task)时使用的参数。task_concurrency 指定了该任务实例的并发度,即允许同时执行的相同任务的实例数量。在这里,设置为1,表示这个任务每次只能运行一个实例。
总之,max_active_tasks 控制单个Dag 实例的最大并发数量,concurrency 控制所有 DAG 实例中任务实例的总体并发数量,而 task_concurrency 控制特定任务的实例并发数量。