使用 GitHub Actions 构建有效的 CI/CD 管道以测试您的 Apache Airflow DAG 并将其部署到 Amazon MWAA
介绍
在这篇文章中,我们将学习如何使用 GitHub Actions 为我们的 Apache Airflow DAG 构建有效的 CI/CD 工作流。我们将使用持续集成和持续交付的 DevOps 概念来自动测试和部署 Airflow DAG 到 AWS 上的 Amazon Managed Workflows for Apache Airflow (Amazon MWAA)。
技术
Apache Airflow
根据文档,Apache Airflow 是一个开源平台,用于以编程方式编写、调度和监控工作流。使用 Airflow,您可以将工作流创作为用 Python 编写的任务(Task)的有向无环图 (DAG)。
适用于 Apache Airflow 的 Amazon 托管工作流
据AWS称,Amazon Managed Workflows for Apache Airflow (Amazon MWAA) 是一种高度可用、安全且完全托管的Apache Airflow工作流程编排。MWAA 自动扩展其工作流程执行能力以满足您的需求,并与 AWS 安全服务集成以帮助提供对数据的快速安全访问。
image.png
GitHub Actions
据GitHub称,GitHub Actions 使CI/CD自动化软件工作流变得容易。GitHub Actions 允许您直接从 GitHub 构建、测试和部署代码。GitHub Actions 是由 GitHub 事件触发的工作流,例如推送、问题创建或新版本。您可以利用由社区预先构建和维护的 GitHub Actions。
术语
DataOps
根据Wikipedia的说法,DataOps 是一种自动化的、面向过程的方法,分析和数据团队使用它来提高数据分析的质量并缩短数据分析的周期时间。虽然 DataOps 最初是一套最佳实践,但它现在已经成熟,成为一种新的数据分析方法。 DataOps 适用于从数据准备到报告的整个数据生命周期,并认识到数据分析团队和 IT 运营的相互关联性。DataOps 采用敏捷方法来缩短分析开发的软件开发生命周期 (SDLC)。
DevOps
根据Wikipedia的说法,DevOps 是一套结合了软件开发 (Dev) 和 IT 运营 (Ops) 的实践。它旨在缩短系统开发生命周期并提供具有高质量软件的持续交付。
DevOps 是一组实践,旨在缩短将更改提交到系统和将更改投入正常生产之间的时间,同时确保高质量。-维基百科
快速失败
根据Wikipedia的说法,快速失败系统是一种可以立即报告任何可能表明发生故障的情况的系统。使用 DevOps 快速失败的概念,我们在工作流中构建步骤,以更快地发现 SDLC 中的错误。我们将测试尽可能向左移动(指的是从左到右移动的步骤管道),并在沿途的多个点进行测试。
源代码
此演示的所有源代码,包括GitHub Actions、Pytest 单元测试和Git Hooks,都是开源的,位于GitHub 上。
架构
下图展示了最近一篇博文和视频演示的架构,即Lakehouse Automation on AWS with Apache Airflow。该帖子和视频展示了如何使用 Apache Airflow 以编程方式将数据从 Amazon Redshift 加载和上传到基于 Amazon S3 的数据湖。
在这篇文章中,我们将回顾以前的 DAG 是如何使用各种逐渐更有效的 CI/CD 工作流程开发、测试和部署到 MWAA 的。除了 DAG 之外,演示的工作流还可以轻松应用于其他 Airflow 资源,例如 SQL 脚本、配置和数据文件、Python 需求文件和插件。
工作流程
没有 DevOps
下面我们看到了一个将 DAG 加载到 Amazon MWAA 中的最低限度可行的工作流程,它不使用 CI/CD 的原则。在本地 Airflow 开发人员的环境中进行更改。修改后的 DAG 直接复制到 Amazon S3 存储桶,然后自动与 Amazon MWAA 同步,除非出现任何错误。这些更改也(希望)被推回集中式版本控制或源代码管理 (SCM) 系统,即本文中的 GitHub。
这种容易出错的工作流程至少存在两个重大问题。首先,DAG 在 Amazon S3 存储桶和 GitHub 之间始终不同步。这是两个独立的步骤——将 DAG 复制或同步到 S3 并将 DAG 推送到 GitHub。开发人员可能会继续进行更改并将 DAG 推送到 S3,而无需推送到 GitHub,反之亦然。 其次,缺少_快速失败_的 DevOps 概念。您第一次知道您的 DAG 包含错误可能是在它同步到 MWAA 并引发导入错误时。到那时,DAG 已经被复制到 S3,同步到 MWAA,并可能推送到 GitHub,然后其他开发人员可以拉取。
image.png
GitHub Actions
与之前的工作流程相比,一个重要的进步是在将代码推送到 GitHub 后使用GitHub Actions来测试和部署代码。尽管在此工作流程中,代码仍被“直接推送到 Trunk ”(GitHub 中的_主_分支)并冒着协作环境中的其他开发人员提取潜在错误代码的风险,但 DAG 错误进入 MWAA 的可能性要小得多。
使用 GitHub Actions,您还可以消除可能导致 DAG 更改未同步到 Amazon S3 的人为错误。最后,使用此工作流程无需向 Airflow 开发人员提供对 Airflow Amazon S3 存储桶的直接访问权限,从而提高了安全性。
测试类型
第一个 GitHub Actiontest_dags.yml是在推送到存储库分支中的dags目录时触发的。每当对分支main发出拉取请求时,也会触发它。main第一个 GitHub Action 运行一系列测试,包括检查 Python 依赖项、代码样式、代码质量、DAG 导入错误和单元测试。这些测试在通过第二个 GitHub Action 同步到 S3 之前发现了 DAG 的问题。
代码语言:javascript复制name: Test DAGs
on:
push:
paths:
- 'dags/**'
pull_request:
branches:
- main
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.7'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements/requirements.txt
pip check
- name: Lint with Flake8
run: |
pip install flake8
flake8 --ignore E501 dags --benchmark -v
- name: Confirm Black code compliance (psf/black)
run: |
pip install pytest-black
pytest dags --black -v
- name: Test with Pytest
run: |
pip install pytest
cd tests || exit
pytest tests.py -v
Python 依赖项
第一个测试安装在requirements.txt本地用于开发应用程序的文件中列出的模块。此测试旨在发现任何缺失或冲突的模块。
代码语言:javascript复制- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements/requirements.txt
pip check
必须针对相同版本的 Python 和 Airflow 环境中使用的相同版本的 Python 模块开发 DAG。您可以使用BashOperator运行 shell 命令来获取安装在 Airflow 环境中的 Python 和模块的版本:
python3 --version; python3 -m pip list
DAG 的日志输出片段显示了 MWAA 2.0.2 中可用的 Python 版本和 Python 模块:
Airflow 的最新稳定版本目前是2.2.2版本,于 2021 年 11 月 15 日发布。但是,截至 2021 年 12 月,亚马逊最新的 MWAA 2.x 版本是2.0.2版本,发布于 2021-04-19。MWAA 2.0.2 当前运行 Python3 版本 3.7.10。
Flake8
Flake8被称为“您的样式指南执行工具”,被描述为模块化源代码检查器。它是一个命令行实用程序,用于在 Python 项目中强制样式一致性。Flake8 是PyFlakes、pycodestyle和 Ned Batchelder 的McCabe 脚本的包装器。模块是一个工具,可以根据PEP 8pycodestyle中的一些样式约定检查您的 Python 代码。 Flake8 是高度可配置的,如果您的开发团队不需要,可以选择忽略特定规则。例如,在这个演示中,我故意忽略了规则 E501,其中规定 '行长度应限制为 72 个字符。'
代码语言:javascript复制- name: Lint with Flake8
run: |
pip install flake8
flake8 --ignore E501 dags --benchmark -v
Black
被称为“不妥协的代码格式化程序”,无论您正在阅读什么项目,使用Black格式化的 Python 代码(称为 Blackened 代码)看起来都是一样的。格式变得透明,让团队可以专注于内容。Black 通过产生尽可能小的差异来加快代码审查速度,假设所有开发人员都在使用black它们来格式化他们的代码。 此 GitHub 存储库中的 Airflow DAG 在提交并推送到 GitHub 之前black使用pre-commit Git Hooks自动格式化。测试确认black代码合规性。
代码语言:javascript复制- name: Confirm Black code compliance (psf/black)
run: |
pip install pytest-black
pytest dags --black -v
pytest
pytest框架将自己描述为一个成熟的、功能齐全的 Python 测试工具,可以帮助您编写更好的程序。Pytest 框架使编写小型测试变得容易,但可以扩展以支持应用程序和库的复杂功能测试。 GitHub 项目中的 GitHub Actiontest_dags.yml调用tests.py文件,该文件也包含在项目中。
代码语言:javascript复制- name: Test with Pytest
run: |
pip install pytest
cd tests || exit
pytest tests.py -v
该tests.py文件包含几个pytest单元测试。测试基于我的项目要求;你的测试会有所不同。这些测试确认所有 DAG:
- 不包含 DAG 导入错误(_测试捕获了我 75% 的错误_);
- 遵循特定的文件命名约定;
- 包括“气流”以外的描述和所有者;
- 包含所需的项目标签;
- 不要发送电子邮件(我的项目使用 SNS 或 Slack 发送通知);
- 重试次数不要超过 3 次;
import os
import sys
import pytest
from airflow.models import DagBag
sys.path.append(os.path.join(os.path.dirname(__file__), "../dags"))
sys.path.append(os.path.join(os.path.dirname(__file__), "../dags/utilities"))
# Airflow variables called from DAGs under test are stubbed out
os.environ["AIRFLOW_VAR_DATA_LAKE_BUCKET"] = "test_bucket"
os.environ["AIRFLOW_VAR_ATHENA_QUERY_RESULTS"] = "SELECT 1;"
os.environ["AIRFLOW_VAR_SNS_TOPIC"] = "test_topic"
os.environ["AIRFLOW_VAR_REDSHIFT_UNLOAD_IAM_ROLE"] = "test_role_1"
os.environ["AIRFLOW_VAR_GLUE_CRAWLER_IAM_ROLE"] = "test_role_2"
@pytest.fixture(params=["../dags/"])
def dag_bag(request):
return DagBag(dag_folder=request.param, include_examples=False)
def test_no_import_errors(dag_bag):
assert not dag_bag.import_errors
def test_requires_tags(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert dag.tags
def test_requires_specific_tag(dag_bag):
for dag_id, dag in dag_bag.dags.items():
try:
assert dag.tags.index("data lake demo") >= 0
except ValueError:
assert dag.tags.index("redshift demo") >= 0
def test_desc_len_greater_than_fifteen(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert len(dag.description) > 15
def test_owner_len_greater_than_five(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert len(dag.owner) > 5
def test_owner_not_airflow(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert str.lower(dag.owner) != "airflow"
def test_no_emails_on_retry(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert not dag.default_args["email_on_retry"]
def test_no_emails_on_failure(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert not dag.default_args["email_on_failure"]
def test_three_or_less_retries(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert dag.default_args["retries"] <= 3
def test_dag_id_contains_prefix(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert str.lower(dag_id).find("__") != -1
def test_dag_id_requires_specific_prefix(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert str.lower(dag_id).startswith("data_lake__")
or str.lower(dag_id).startswith("redshift_demo__")
Fork&Pull
我们可以通过实施 GitHub 推荐的两种协作开发模型之一来改进直接推送到 Trunk 的做法:
- 共享存储库模型:使用“主题”分支,这些分支经过审查、批准并合并到主分支中。
- 分叉和拉取模型:分叉一个仓库,进行更改,创建一个拉取请求,审查请求,如果获得批准,则合并到主分支。
在 fork and pull 模型中,我们创建了 DAG 存储库的一个分支,我们在其中进行更改。然后,我们提交并将这些更改推送回分叉的存储库。准备好后,我们创建一个拉取请求。如果拉取请求被批准并通过所有测试,它会被手动或自动合并到主分支中。然后将 DAG 同步到 S3,并最终同步到 MWAA。我通常更喜欢在所有测试都通过后手动触发合并。 fork and pull 模型极大地减少了在通过所有测试之前将不良代码合并到主分支的机会。
将 DAG 同步到 S3
GitHub 项目中的第二个 GitHub Action, sync_dags.yml, 是在前一个 Action, , 成功完成时触发的test_dags.yml,或者在 follow 和 pull 方法的情况下,合并到main分支成功。
代码语言:javascript复制name: Sync DAGs
on:
workflow_run:
workflows:
- 'Test DAGs'
types:
- completed
pull_request:
types:
- closed
jobs:
deploy:
runs-on: ubuntu-latest
if: ${{ github.event.workflow_run.conclusion == 'success' }}
steps:
- uses: actions/checkout@master
- uses: jakejarvis/s3-sync-action@master
env:
AWS_S3_BUCKET: ${{ secrets.AWS_S3_BUCKET }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_REGION: 'us-east-1'
SOURCE_DIR: 'dags'
DEST_DIR: 'dags'
GitHub Actionsync_dags.yml需要三个GitHub 加密机密,它们是预先创建并与 GitHub 存储库相关联的。根据GitHub,机密是您在组织、存储库或存储库环境中创建的加密环境变量。加密的机密允许您在存储库中存储敏感信息,例如访问令牌。您创建的密钥可用于 GitHub Actions 工作流程。
DAG 同步到 Amazon S3,并最终自动同步到 MWAA.
本地测试和 Git Hooks
要进一步改进您的 CI/CD 工作流程,您应该考虑使用Git Hooks。使用 Git Hooks,我们可以确保在提交和推送更改到 GitHub 之前对代码进行本地测试。本地测试使我们能够更快地失败,在开发过程中发现错误,而不是在将代码推送到 GitHub 之后。
根据文档,当某些重要操作发生时,Git 有办法触发自定义脚本。有两种类型的钩子:客户端和服务器端。客户端钩子由提交和合并等操作触发,而服务器端钩子在网络操作上运行,例如接收推送的提交。 您可以出于各种原因使用这些挂钩。我经常使用客户端pre-commit挂钩来格式化使用black. 使用客户端pre-pushGit Hook,我们将确保在将 DAG 推送到 GitHub 之前运行测试。根据 Git,当远程 refs 更新之后但在任何对象传输之前执行命令pre-push时,钩子就会运行。git push您可以在推送发生之前使用它来验证一组 ref 更新。非零退出代码将中止推送。pre-commit如果测试不太耗时,则可以将测试作为钩子的一部分运行。 要使用该pre-push钩子,请在本地存储库中创建以下文件 .git/hooks/pre-push:
代码语言:javascript复制#!/bin/sh
# do nothing if there are no commits to push
if [ -z "$(git log @{u}..)" ]; then
exit 0
fi
sh ./run_tests_locally.sh
然后,运行以下chmod命令使钩子可执行:chmod 755 .git/hooks/pre-push
pre-push钩子运行 shell 脚本,run_tests_locally.sh. 该脚本在本地执行几乎相同的测试,就像在 GitHubtest_dags.yml上远程执行的 GitHub Action 一样:
#!/bin/sh
echo "Starting Flake8 test..."
flake8 --ignore E501 dags --benchmark || exit 1
echo "Starting Black test..."
python3 -m pytest --cache-clear
python3 -m pytest dags/ --black -v || exit 1
echo "Starting Pytest tests..."
cd tests || exit
python3 -m pytest tests.py -v || exit 1
echo "All tests completed successfully!