为了保证airflow任务调度的可用性,需要从DAG生命周期的各个方面进行监控。
DAG加载时
因为DAG文件会在调度器和worker执行时加载,如果在DAG中引用了第三方的库或进行了DB操作,则这些操作会在DAG文件加载时被频繁调用。举个例子,如果升级了第三方库,导致了加载时的不兼容问题,相关的DAG文件就会加载失败,导致整个调度失效。在这种场景下,我们需要对调度日志和worker日志进行监控。如果发现相关的异常日志,就需要告警。
Operator执行时
因为DAG的执行单元是BaseOperator,所以只需要判断Operator在执行时是否抛出异常就可以了,这里有3个相关参数
代码语言:javascript复制email: 设置为收件人,就可以开启邮件告警,多个收件人使用数组格式
email_on_retry: 如果设置了retries重试参数,则重试失败时会发送邮件告警
email_on_faillure: operator执行失败时告警
只需要在DAG的参数中设置email收件人参数,则operator执行失败时就会发送告警邮件
代码语言:javascript复制args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'email': 'luciferliu',
#'retries': 1,
#'retry_delay': timedelta(seconds=5),
#'retry_exponential_backoff': True,
'depends_on_past': True
}
也可以在每个任务中设置email参数,这样每个任务的告警可以发给不同的人处理。
Operator长时间未调度
Operator在超过2个调度周期,仍然没有执行,可能是调度的任务超出了集群的处理能力,也有可能是DAG中的bug导致的。在这种情况下,需要开启SLA。如果任务实例的下一次调度超时task.sla时间后没有执行,则记录到表sla_miss中,并发送告警。
代码语言:javascript复制i = 1
task = PythonOperator(
task_id='sleep_for_' str(i),
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(i) / 10},
sla=timedelta(minutes=1),
dag=dag)
Operator僵死
如果operator的任务实例一直处于运行态,或者长时间没有心跳,就会处于僵死的状态。这种情况在当前的airflow版本中会经常发生,应该是调度bug导致的。如果设置了"email"参数,则会发送邮件告警。