airflow—服务失效监控(5)

2018-08-29 18:12:50 浏览数 (1)

为了保证airflow任务调度的可用性,需要从DAG生命周期的各个方面进行监控。

DAG加载时

因为DAG文件会在调度器和worker执行时加载,如果在DAG中引用了第三方的库或进行了DB操作,则这些操作会在DAG文件加载时被频繁调用。举个例子,如果升级了第三方库,导致了加载时的不兼容问题,相关的DAG文件就会加载失败,导致整个调度失效。在这种场景下,我们需要对调度日志和worker日志进行监控。如果发现相关的异常日志,就需要告警。

Operator执行时

因为DAG的执行单元是BaseOperator,所以只需要判断Operator在执行时是否抛出异常就可以了,这里有3个相关参数

代码语言:javascript复制
email: 设置为收件人,就可以开启邮件告警,多个收件人使用数组格式
email_on_retry: 如果设置了retries重试参数,则重试失败时会发送邮件告警
email_on_faillure: operator执行失败时告警

只需要在DAG的参数中设置email收件人参数,则operator执行失败时就会发送告警邮件

代码语言:javascript复制
args = {
 'owner': 'airflow',
 'start_date': airflow.utils.dates.days_ago(2),
 'email': 'luciferliu',
 #'retries': 1,
 #'retry_delay': timedelta(seconds=5),
 #'retry_exponential_backoff': True,
 'depends_on_past': True
}

也可以在每个任务中设置email参数,这样每个任务的告警可以发给不同的人处理。

Operator长时间未调度

Operator在超过2个调度周期,仍然没有执行,可能是调度的任务超出了集群的处理能力,也有可能是DAG中的bug导致的。在这种情况下,需要开启SLA。如果任务实例的下一次调度超时task.sla时间后没有执行,则记录到表sla_miss中,并发送告警。

代码语言:javascript复制
i = 1
task = PythonOperator(
 task_id='sleep_for_'   str(i),
 python_callable=my_sleeping_function,
 op_kwargs={'random_base': float(i) / 10},
 sla=timedelta(minutes=1),
 dag=dag)

Operator僵死

如果operator的任务实例一直处于运行态,或者长时间没有心跳,就会处于僵死的状态。这种情况在当前的airflow版本中会经常发生,应该是调度bug导致的。如果设置了"email"参数,则会发送邮件告警。

seo

0 人点赞