Centos7下Airflow(1.10) celery redis 安装
ps:Airflow 2.0 点击这里
安装环境及版本
centos7
Airflow 1.10.6
Python 3.6.8
Mysql 5.6
redis 3.3
安装
数据库安装
略(自行百度)
- 注意开启远程连接(关闭防火墙)
- 字符集统一修改为UTF8(utf8mb4也可以)防止乱码
- 高版本的mysql 或者Maria DB 会出现VARCHAR(5000)的报错 建议低版本
- 原因是高版本的数据库为了效率限制了VARCHER的最大长度
- postgresql还没有试以后补充
- python安装略(自行百度)
- 请将python加入环境变量(方便)
airflow安装
- 参考https://airflow.apache.org/howto/executor/use-celery.html?highlight=celery
- 添加环境变量
vim ~/.bashrc
# 添加一行环境变量
export AIRFLOW_HOME=/opt/airflow
source ~/.bashrc
- 安装airflow及相关组件此环境变量仅需要设置成临时变量即可并不需要配置成永久变量
export SLUGIFY_USES_TEXT_UNIDECODE=yes
安装airflow
代码语言:shell复制# 生成配置文件,可能会报一些错请忽略,保证AIRFLOW_HOME目录下生成了.cfg及相关文件即证明本次执行成功
# 如果配置了pytho的环境变量直接执行
# 没配置在${PYTHON_HOME}/lib/python3.6/sit-packages/airflow/bin目录下执行`./airflow`
pip install apache-airflow
安装airflow 相关依赖
代码语言:shell复制pip install 'apache-airflow[mysql]'
pip install 'apache-airflow[celery]'
pip install 'apache-airflow[redis]'
pip install pymysql
配置
修改配置文件
- 修改${AIRFLOW_HOME}/airflow.cfg
# sqlalchemy链接
sql_alchemy_conn = mysql pymysql://root:root@10.1.49.71:3306/airflow?charset=utf8
# 配置执行器
executor=CeleryExecutor
# 配置celery的broker_url
broker_url = redis://lochost:5379/0
# 配置元数据信息管理
result_backend = db mysql://username:password@localhost:3306/airflow
创建用户(worker 不允许在root用户下执行)
代码语言:shell复制# 创建用户组和用户
groupadd airflow
useradd airflow -g airflow
# 将 {AIRFLOW_HOME}目录修用户组
cd /opt/
chgrp -R airflow airflow
初始化数据库
初始化前请先创建airflow
数据库以免报错
airflow db init
启动
代码语言:shell复制# 前台启动web服务
airflow webserver
# 后台启动web服务
airflow webserver -D
# 前台启动scheduler
airflow schedule
# 后台启动scheduler
airflow scheduler -D
启动worker
- 方法一
# worker主机只需用普通用户打开airflow worker
# 创建用户airflow
useradd airflow
# 对用户test设置密码
passwd airflow
# 在root用户下,改变airflow文件夹的权限,设为全开放
chmod -R 777 /opt/airflow
# 切换为普通用户,执行airflow worker命令就行
# 启动时发现普通用户读取的~/.bashrc文件 不一致 重新加入AIRFLOW_HOME 就可以了
# 如果在新建普通用户前配置好环境变量可能没有这个问题了 本人是在创建用户后修改了环境变量
airflow worker
- 启动成功显示如下
- 方法二
# 执行worker之前运行临时变量(临时的不能永久使用)
export C_FORCE_ROOT="true"
# 不需要切换用户
cd /usr/local/python3/bin/
# 前台启动worker服务
airflow worker
# 后台启动work服务
airflow worker -D
修改时区
- 修改airflow.cfg文件
default_timezone = Asia/Shanghai
- 找到airflow安装路径
参考如下:
cd /usr/local/lib/python3.6/site-packages/airflow
- 修改airflow/utils/timezone.py
# 在 utc = pendulum.timezone(‘UTC’) 这行(第27行)代码下添加
from airflow.configuration import conf
try:
tz = conf.get("core", "default_timezone")
if tz == "system":
utc = pendulum.local_timezone()
else:
utc = pendulum.timezone(tz)
except Exception:
pass
# 修改utcnow()函数 (在第69行)
原代码 d = dt.datetime.utcnow()
修改为 d = dt.datetime.now()
- 修改airflow/utils/sqlalchemy.py
# 在utc = pendulum.timezone(‘UTC’) 这行(第37行)代码下添加
from airflow.configuration import conf
try:
tz = conf.get("core", "default_timezone")
if tz == "system":
utc = pendulum.local_timezone()
else:
utc = pendulum.timezone(tz)
except Exception:
pass
- 注释airflow/utils/sqlalchemy.py中的cursor.execute(“SET time_zone = ‘ 00:00’”) (第65行)
- 修改airflow/www/templates/admin/master.html(第31行)
把代码 var UTCseconds = (x.getTime() x.getTimezoneOffset()*60*1000);
改为 var UTCseconds = x.getTime();
把代码 "timeFormat":"H:i:s %UTC%",
改为 "timeFormat":"H:i:s",
- 参考airflow时区修改
配置email报警在airflow配置文件airflow.cfg中修改
- 参考aiflow官方文档 email_backend = airflow.utils.email.send_email_smtp
- smtp在你要设置的邮箱服务器地址在邮箱设置中查看(此处为163
- smtp_host = smtp.163.com
- 邮箱通讯协议
- smtp_starttls = False
- smtp_ssl = True
- 你的邮箱地址
- smtp_user = demo@163.com
- 你的邮箱授权码在邮箱设置中查看或百度
- smtp_password = 16位授权码
- 邮箱服务端口
- smtp_port = 端口
- 你的邮箱地址smtp_mail_from = demo@163.com
- 在dag中default_args添加参数
default_args = {
# 接受邮箱
'email': ['demo@qq.com''],
# task失败是否发送邮件
'email_on_failure': True,
# task重试是否发送邮件
'email_on_retry': False,
}
——————————————————————————————————————————————
补充
在跑任务时发现部分任务在并行时会出现数据的异常解决方案:
airflow的全局变量中设置
- parallelism :这是用来控制每个airflow worker 可以同时运行多少个task实例。这是airflow集群的全局变量。在airflow.cfg里面配置
- concurrency :每个dag运行过程中最大可同时运行的task实例数。如果你没有设置这个值的话,scheduler 会从airflow.cfg里面读取默认值 dag_concurrency
在DAG中加入参数用于控制整个dag
- max_active_runs : 来控制在同一时间可以运行的最多的dag runs 数量。 假如我们一个DAG同一时间只能被运行一次,那么一定要指明 max_active_runs = 1 如果我们DAG中有10个Task,我们如果希望10个Task可以在触发后可以同时执行,那么我们的concurrency需要不小于10才行,若小于10,那么会有任务需要等待之前的任务执行完成才会开始执行。
dag = DAG(f"dag_name",
default_args=default_args,
schedule_interval="0 12 * * *",
max_active_runs = 1
)
在每个task中的Operator中设置参数
- task_concurrency:来控制在同一时间可以运行的最多的task数量 假如task_concurrency=1一个task同一时间只能被运行一次其他task不受影响
t3 = PythonOperator(
task_id='demo_task',
provide_context=True,
python_callable=demo_task,
task_concurrency=1,
dag=dag)