Airflow 和 DataX 的结合

2021-09-08 11:53:35 浏览数 (1)

这是一年前做的东西了,稳定运行至今,感觉现在可以写写总结。

我们团队用的调度系统是 Apache Airflow(https://github.com/apache/airflow),数据传输工具是 DataX(https://github.com/alibaba/DataX),这两个工具的介绍读者可以自行查看对应的链接,不多叙述。

两个工具的应用都很广泛,但是依然有一些不足。

Apache Airflow 自身也带了一些数据传输的 Operator ,比如这里的https://github.com/apache/airflow/blob/main/airflow/operators/generic_transfer.py,自带的 Operator 代码写的很优雅,但是我要将 Hive 的数据传输到 MySQL,就要写一个HiveToMySqlTransfer;Presto 数据传输到 MySQL,就要写一个PrestoToMySqlTransfer,这就是 DataX 提到的

复杂的网状的同步链路

而 DataX 将复杂的网状的同步链路变成了星型数据链路,DataX 作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到 DataX,便能跟已有的数据源做到无缝数据同步。

DataX 作为一款传输工具是优秀的,但是开源版本的 DataX 不支持分布式运行,需要手工写复杂的配置文件(JSON),针对某些特殊的 writer 而言,比如 hdfswriter 还会有脏数据的问题(DataX 的 hdfswriter 是使用临时文件夹去临时存放数据,遇到一些意外情况导致 DataX 挂掉时,这个临时文件夹和临时数据就无法删除了,从而导致集群里有一堆脏数据)。而这些问题都可以由 Apache Airflow 去弥补,写一个 Operator ,去自动完成复杂的配置文件以及分布式运行和弥补一些 reader 和 writer 的 bug。

网上也有一些文章讲如何将 Airflow 和 DataX 结合起来,比如有:

  1. https://www.cnblogs.com/woshimrf/p/airflow-plugin.html
  2. https://tech.youzan.com/data_platform/

对于文章 1,虽然结合了 Airflow 和 DataX,但是它并没有解决 Airflow 的网状链路问题,只是用 Airflow 调用 DataX 运行命令而已。

对于文章 2,只说了定制化,没有具体的细节。

在 Airflow 原始的任务类型基础上,DP 定制了多种任务(实现 Operator ),包括基于 Datax 的导入导出任务、基于 Binlog 的 Datay 任务、Hive 导出 Email 任务、 Hive 导出 ElasticSearch 任务等等。

这里提供一下我们的思路。

可以把 DataX 的 reader 和 writer 作为一个个的 hook,每一个 hook 对应着一个 reader 或者是一个 writer,在 hook 里完成每一个 reader 和 writer 的 json 形成(在 Python 里是字典)。

Operator 作为 DataX 的实现。负责执行 DataX 命令,渲染 Hook 传过来的字典,将字典 dump 到本地文件系统变成 json 文件等等,顺便解决 reader 和 writer 遗留下的一些问题,当然还可以支持我们团队的数据血缘追踪。

甚至到最后,比如要将 Oracle 数据传输到 Hive,开发就变成了写一句 Oracle 里的 Select SQL 语句和Oracle对应的 Airflow 的 connection id,再写一下 Hive 里对应的的表名和 Airflow 的 connection id,最后再补充下定时调度的相关配置信息,就完成了一次数据传输的开发。

相比于之前要先去找 Oracle 和 Hive 元数据信息,再写一个json文件,然后在 Airflow 里写一个bash命令,效率不知道提到多少倍。

0 人点赞