英语水一直不平,勉强翻译翻译(不完全照着译,夹带私货)。
原文:https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html
创建DAG有两个步骤:
- 用Python实现一个DAG对象;
- 测试代码是否符合我们的预期。
1. 创建DAG
创建一个新的DAG是非常简单的,但是还是有一些需要注意点,以确保DAG能正确的运行。
1.1 实现自定义算子(Operator)或者钩子(Hook)
具体看这里:https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html#custom-operator
1.2 创建任务Task
当任务失败的时候,Airflow可以自动重启,所以我们的任务应该要保证幂等性(无论执行多少次都应该得到一样的结果)。下面是一些可以避免产生不同结果的方式:
- 在操作数据库时,使用UPSERT替换INSERT,因为INSERT语句可能会导致重复插入数据。MySQL中可以使用:INSERT INTO ... VALUES ... ON DUPLICATE KEY UPDATE ...。
- 不要直接读取最近一段时间的数据,而是应该要按时间段来读取。
- now函数会得到一个当前时间对象,直接用在任务中会得到不同的结果。
类似connection_id或者S3存储路径之类重复的变量,应该定义在default_args中,而不是重复定义在每个任务里。定义在default_args中有助于避免一些类型错误之类的问题。
1.3 删除任务
不要从DAG中删除任务,因为一旦删除,任务的历史信息就无法再Airflow中找到了。如果确实需要,则建议创建一个新的DAG。
1.4 通讯
在不同服务器上执行DAG中的任务,应该使用k8s executor或者celery executor。于是,我们不应该在本地文件系统中保存文件或者配置。
如果可能,我们应该XCom来在不同的任务之间共享小数据,而如果如果数据量比较大,则应该使用分布式文件系统,如S3或者HDFS等,这时可以使用XCom来共享其在S3或者HDFS中的文件地址。
任何权限参数(例如密码或者Token之类的)也不应该存储在任务中,这些数据应该尽可能地使用Connection来存储,这样比较安全,而使用的时候,只要使用其唯一的connection id即可。
关于Connection:https://airflow.apache.org/docs/apache-airflow/stable/concepts/connections.html
1.5 变量Variables
如果可能,我们应该避免在算子的execute()方法或者Jinja模板外部使用Variables。在Airflow中,使用变量去连接到元数据DB,获取数据,这会减慢解释的速度,并给数据库增加额外的负担。
Airflow在后台解释所有DAG的期间,使用processor_poll_interval进行配置,其默认值为1秒。在解释过程中,Airflow会为每一个DAG连接数据库创建新的connection。这产生的一个后果是产生大量的open connection。
使用变量最好的方式就是通过Jinja模板,它能够延迟读取其值直到任务的执行(这句话的意思应该是延期加载,即实际用到的时候才去读取相应的值)。模板的语法如下:
代码语言:javascript复制{{ var.value.<variable_name> }}
或者如果你需要从变量中解释json对象,可以这样:
代码语言:javascript复制{{ var.json.<variable_name> }}
(变量Variable使用不多,还得斟酌)
1.6 Top level Python code
一般来说,我们不应该在Airflow结构(如算子等)之外写任何代码。每次Airflow解析符合条件的python文件时,任务外的代码都会被运行,它运行的最小间隔是使用min_file_process_interval来定义的。
2. 测试DAG
我们将Airflow用在生产环境中,应该让DAG接受充分的测试,以保证结果的是可以预期的。
2.1 DAG加载器测试
首先我们要保证的是,DAG在加载的过程中不会产生错误。我们无需编写其他代码即可进行此测试。
代码语言:javascript复制python your-dag-file.py
如此运行DAG脚本文件,如果没有产生异常,即保证了没有依赖或者语法等方面的问题。
2.2 单元测试
加载DAG的单元测试:
代码语言:javascript复制from airflow.models import DagBag
import unittest
class TestHelloWorldDAG(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.dagbag = DagBag()
def test_dag_loaded(self):
dag = self.dagbag.get_dag(dag_id='hello_world')
assert self.dagbag.import_errors == {}
assert dag is not None
assert len(dag.tasks) == 1
DAG结构的单元测试:
代码语言:javascript复制import unittest
class testClass(unittest.TestCase):
def assertDagDictEqual(self,source,dag):
assert dag.task_dict.keys() == source.keys()
for task_id, downstream_list in source.items():
assert dag.has_task(task_id)
task = dag.get_task(task_id)
assert task.downstream_task_ids == set(downstream_list)
def test_dag(self):
self.assertDagDictEqual({
"DummyInstruction_0": ["DummyInstruction_1"],
"DummyInstruction_1": ["DummyInstruction_2"],
"DummyInstruction_2": ["DummyInstruction_3"],
"DummyInstruction_3": []
}, dag)
自定义算子的单元测试
代码语言:javascript复制import unittest
from airflow.utils.state import State
DEFAULT_DATE = '2019-10-03'
TEST_DAG_ID = 'test_my_custom_operator'
class MyCustomOperatorTest(unittest.TestCase):
def setUp(self):
self.dag = DAG(TEST_DAG_ID, schedule_interval='@daily', default_args={'start_date' : DEFAULT_DATE})
self.op = MyCustomOperator(
dag=self.dag,
task_id='test',
prefix='s3://bucket/some/prefix',
)
self.ti = TaskInstance(task=self.op, execution_date=DEFAULT_DATE)
def test_execute_no_trigger(self):
self.ti.run(ignore_ti_state=True)
assert self.ti.state == State.SUCCESS
# Assert something related to tasks results
2.3 自查
我们还可以在DAG内部检查,以确保任务的执行结果符合预期。例如,如果我们有一个推送数据到S3的任务,于是我们能够在下一个任务中完成检查。
代码语言:javascript复制task = PushToS3(...)
check = S3KeySensor(
task_id='check_parquet_exists',
bucket_key="s3://bucket/key/foo.parquet",
poke_interval=0,
timeout=0
)
task >> check
其实就是使用一个独立的任务来校验前一个任务是否操作成功。
2.4 暂存(staging)环境变量
如果可能,在部署到生产环境运行起来之前,我们应该保持一个暂存环境去测试完整的DAG。需要确保我们的DAG是已经参数化了的,而不是在DAG中硬编码。
我们可以使用环境变量来参数化DAG:
代码语言:javascript复制import os
dest = os.environ.get(
"MY_DAG_DEST_PATH",
"s3://default-target/path/"
)
3. 模拟变量及连接
当我们写代码测试变量或者连接时,必须保证当运行测试时它们是存在的。一个可行的解决方案是把这些对象保存到数据库中,这样当代码执行的时候,它们就能被读取到。然而不管是从数据库读取数据还是写数据到数据库,都会产生额外的时间消耗。因此,为了加速测试的执行,不要将它们保存到数据库是有效的实践。为此,我们可以使用unittest.mock.patch.dict()创建环境变量来模拟os.environ
。
对于变量,使用AIRFLOW_VAR_{KEY}:
代码语言:javascript复制with mock.patch.dict('os.environ', AIRFLOW_VAR_KEY="env-value"):
assert "env-value" == Variable.get("key")
对于连接,使用AIRFLOW_CONN_{CONN_ID}:
代码语言:javascript复制conn = Connection(
conn_type="gcpssh",
login="cat",
host="conn-host",
)
conn_uri = conn.get_uri()
with mock.patch.dict("os.environ", AIRFLOW_CONN_MY_CONN=conn_uri):
assert "cat" == Connection.get("my_conn").login
使用Airflow的场景很多,官方有最佳实践,只可惜是英文版的,又找不到对应的中文版,也只能班门弄斧,献丑了。
于20210529凌晨