大数据调度平台Airflow(六):Airflow Operators及案例

2022-09-21 09:41:05 浏览数 (1)

Airflow Operators及案例

Airflow中最重要的还是各种Operator,其允许生成特定类型的任务,这个任务在实例化时称为DAG中的任务节点,所有的Operator均派生自BaseOparator,并且继承了许多属性和方法。关于BaseOperator的参数可以参照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator

BaseOperator中常用参数如下:

task_id(str) : 唯一task_id标记

owner(str):任务的所有者,建议使用linux用户名

email(str or list[str]):出问题时,发送报警Email的地址,可以填写多个,用逗号隔开。

email_on_retry(bool):当任务重试时是否发送电子邮件

email_on_failure(bool):当任务执行失败时是否发送电子邮件

retries(int):在任务失败之前应该重试的次数

retry_delay(datetime.timedelta):重试间隔,必须是timedelta对象

start_date(datetime.datetime):DAG开始执行时间,这个参数必须是datetime对象,不可以使用字符串。

end_date(datetime.datetime):DAG运行结束时间,任务启动后一般都会一直执行下去,一般不设置此参数。

depends_on_past(bool,默认False):是否依赖于过去,如果为True,那么必须之前的DAG调度成功了,现在的DAG调度才能执行。

dag(airflow.models.DAG):指定的dag。

execution_timeout(datetime.timedelta):执行此任务实例允许的最长时间,超过最长时间则任务失败。

trigger_rule(str):定义依赖的触发规则,包括选项如下:{ all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy(无条件执行)} default is all_success。

一、​​​​​​​BashOperator及调度Shell命令及脚本

BashOperator主要执行bash脚本或命令,BashOperator参数如下:

代码语言:javascript复制
bash_command(str):要执行的命令或脚本(脚本必须是.sh结尾)
  • BashOperator 调度Shell命令案例
代码语言:javascript复制
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner':'zhangsan',
    'start_date':datetime(2021, 9, 23),
    'email':'kettle_test1@163.com', #pwd:kettle123456
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_shell_cmd',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

t1=BashOperator(
    task_id='print_date',
    bash_command='date',
    dag = dag
)

t2=BashOperator(
    task_id='print_helloworld',
    bash_command='echo "hello world!"',
    dag=dag
)

t3=BashOperator(
    task_id='tempplated',
    bash_command="""
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ params.name}}"
        echo "{{ params.age}}"
    {% endfor %}
    """,
    params={'name':'wangwu','age':10},
    dag=dag
)

t1 >> t2 >> t3

注意在t3中使用了Jinja模板,“{% %}”内部是for标签,用于循环操作,但是必须以{% endfor %}结束。“{{}}”内部是变量,其中ds是执行日期,是airflow的宏变量,params.name和params.age是自定义变量。

在default_args中的email是指当DAG执行失败时,发送邮件到指定邮箱,想要使用airflow发送邮件,需要在$AIRFLOW_HOME/airflow.cfg中配置如下内容:

代码语言:javascript复制
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user =kettle_test2
# Example: smtp_password = airflow
smtp_password =VIOFSYMFDIKKIUEA
smtp_port = 25
smtp_mail_from =kettle_test2@163.com
smtp_timeout = 30
smtp_retry_limit = 5

此外,关于邮箱的信息如下:

邮箱1:kettle_test1@163.com password:kettle123456

邮箱2:kettle_test2@163.com password:kettle123456

163邮箱SMTP服务器地址: smtp.163.com 端口:25

配置163邮箱时需要开启“POP3/SMTP/IMAP服务”服务,设置如下:

kettle_test1@163.com FECJJVEPGPTZJYMQ

kettle_test2@163.com VIOFSYMFDIKKIUEA

  1. BashOperator 调度Shell脚本案例

准备如下两个shell脚本,将以下两个脚本放在$AIRFLOW_HOME/dags目录下,

BashOperator默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。

first_shell.sh

代码语言:javascript复制
#!/bin/bash

dt=$1

echo "==== execute first shell ===="

echo "---- first : time is ${dt}"

second_shell.sh

代码语言:javascript复制
#!/bin/bash
dt=$1
echo "==== execute second shell ===="
echo "---- second : time is ${dt}"

编写airflow python 配置:

代码语言:javascript复制
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner':'zhangsan',
    'start_date':datetime(2021, 9, 23),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_shell_sh',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=BashOperator(
    task_id='first',
    #脚本路径建议写绝对路径
    bash_command='sh /root/airflow/dags/first_shell.sh %s'�tetime.now().strftime("%Y-%m-%d"),
    dag = dag
)

second=BashOperator(
    task_id='second',
    #脚本路径建议写绝对路径
    bash_command='sh /root/airflow/dags/second_shell.sh %s'�tetime.now().strftime("%Y-%m-%d"),
    dag=dag
)

first >> second

执行结果:

特别注意:在“bash_command”中写执行脚本时,一定要在脚本后跟上空格,有没有参数都要跟上空格,否则会找不到对应的脚本。如下:

二、​​​​​​​SSHOperator及调度远程Shell脚本

在实际的调度任务中,任务脚本大多分布在不同的机器上,我们可以使用SSHOperator来调用远程机器上的脚本任务。SSHOperator使用ssh协议与远程主机通信,需要注意的是SSHOperator调用脚本时并不会读取用户的配置文件,最好在脚本中加入以下代码以便脚本被调用时会自动读取当前用户的配置信息:

代码语言:javascript复制
#Ubunto系统
. ~/.profile

#CentoOS或者RedHat系统
. ~/.bashrc

关于SSHOperator参数详解可以参照:

airflow.providers.ssh.operators.ssh — apache-airflow-providers-ssh Documentation

 SSHOperator的常用参数如下:

代码语言:javascript复制
ssh_conn_id(str):ssh连接id,名称自取,需要在airflow webserver界面配置,具体配置参照案例。
remote_host(str):远程连接节点host,如果配置,可替换ssh_conn_id中配置的远程host,可选。
command(str):在远程主机上执行的命令或脚本。
  • SSHOperator调度远程节点脚本案例

按照如下步骤来使用SSHOperator调度远程节点脚本:

1、安装“apache-airflow-providers-ssh ”provider package

首先停止airflow webserver与scheduler,在node4节点切换到python37环境,安装ssh Connection包。另外,关于Providers package安装方式可以参照如下官网地址:

 https://airflow.apache.org/docs/apache-airflow-providers/packages-ref.html#apache-airflow-providers-ssh

代码语言:javascript复制
#切换Python37环境
[root@node4 ~]# conda activate python37

#安装ssh provider package
(python37) [root@node4 ~]# pip install apache-airflow-providers-ssh==2.1.1

#启动airflow
(python37) [root@node4 ~]# airflow webserver --port 8080
(python37) [root@node4 ~]# airflow scheduler

2、配置SSH Connection连接

登录airflow webui ,选择“Admin”->“Connections”:

点击“ ”添加连接,这里host连接的是node5节点:

3、准备远程执行脚本

在node5节点/root路径下创建first_shell.sh,内容如下:

代码语言:javascript复制
#!/bin/bash
echo "==== execute first shell ===="

在node3节点/root路径下创建second_shell.sh,内容如下:

代码语言:javascript复制
#!/bin/bash
echo "==== execute second shell ===="

4、编写DAG python配置文件

注意在本地开发工具编写python配置时,需要用到SSHOperator,需要在本地对应的python环境中安装对应的provider package。

代码语言:javascript复制
C:Userswubai>d:
D:>cd d:ProgramDataAnaconda3envspython37Scripts
d:ProgramDataAnaconda3envspython37Scripts>pip install apache-airflow-providers-ssh==2.1.1

python配置文件:

代码语言:javascript复制
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.ssh.operators.ssh import SSHOperator

default_args = {
    'owner':'lisi',
    'start_date':datetime(2021, 9, 23),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_remote_shell',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=SSHOperator(
    task_id='first',
    ssh_conn_id='ssh-node5',# 配置在Airflow webui Connection中配置的SSH Conn id
    command='sh /root/first_shell.sh ',
    dag = dag
)

second=SSHOperator(
    task_id='second',
    ssh_conn_id='ssh-node5',# 配置在Airflow webui Connection中配置的SSH Conn id
    command='sh /root/second_shell.sh ',
    remote_host="192.168.179.6",#如果配置remote_host ,将会替换Connection中的SSH 配置的host
    dag=dag
)

first >> second

5、调度python配置脚本

将以上配置好的python文件上传至node4节点$AIRFLOW_HOME/dags下,重启Airflow websever与scheduler,登录webui,开启调度:

调度结果如下:

三、​​​​​​​HiveOperator及调度HQL

 可以通过HiveOperator直接操作Hive SQL ,HiveOperator的参数如下:

代码语言:javascript复制
hql(str):需要执行的Hive SQL。
hive_cli_conn_id(str):连接Hive的conn_id,在airflow webui connection中配置的。

想要在airflow中使用HiveOperator调用Hive任务,首先需要安装以下依赖并配置Hive Metastore: 

代码语言:javascript复制
#切换Python37环境
[root@node4 ~]# conda activate python37

#安装hive provider package
(python37) [root@node4 ~]# pip install apache-airflow-providers-apache-hive==2.0.2

#启动airflow
(python37) [root@node4 ~]# airflow webserver --port 8080
(python37) [root@node4 ~]# airflow scheduler

登录Airflow webui并设置Hive Metastore,登录后找到”Admin”->”Connections”,点击“ ”新增配置:

  • HiveOperator调度HQL案例

1、启动Hive,准备表

启动HDFS、Hive Metastore,在Hive中创建以下三张表:

代码语言:javascript复制
create table person_info(id int,name string,age int) row format delimited fields terminated by 't';

create table score_info(id int,name string,score int) row format delimited fields terminated by 't';

向表 person_info加载如下数据:

1 zs 18

2 ls 19

3 ww 20

向表score_info加载如下数据:

1 zs 100

2 ls 200

3 ww 300

2、在node4节点配置Hive 客户端

由于Airflow 使用HiveOperator时需要在Airflow安装节点上有Hive客户端,所以需要在node4节点上配置Hive客户端。

将Hive安装包上传至node4 “/software”下解压,并配置Hive环境变量

代码语言:javascript复制
#在/etc/profile文件最后配置Hive环境变量
export HIVE_HOME=/software/hive-1.2.1
export PATH=$PATH:$HIVE_HOME/bin

#使环境变量生效
source /etc/profile

修改HIVE_HOME/conf/hive-site.xml ,写入如下内容:

代码语言:javascript复制
<configuration>
 <property>
  <name>hive.metastore.warehouse.dir</name>
  <value>/user/hive/warehouse</value>
 </property>
 <property>
  <name>hive.metastore.local</name>
  <value>false</value>
 </property>
 <property>
  <name>hive.metastore.uris</name>
  <value>thrift://node1:9083</value>
 </property>
</configuration>

3、编写DAG python配置文件

注意在本地开发工具编写python配置时,需要用到HiveOperator,需要在本地对应的python环境中安装对应的provider package。

代码语言:javascript复制
C:Userswubai>d:
D:>cd d:ProgramDataAnaconda3envspython37Scripts
d:ProgramDataAnaconda3envspython37Scripts>pip install apache-airflow-providers-apache-hive==2.0.2
注意:这里本地安装也有可能缺少对应的C  环境,我们也可以不安装,直接跳过也可以。

Python配置文件:

代码语言:javascript复制
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.hive.operators.hive import HiveOperator

default_args = {
    'owner':'wangwu',
    'start_date':datetime(2021, 9, 23),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_hive_sql',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=HiveOperator(
    task_id='person_info',
    hive_cli_conn_id="node1-hive-metastore",
    hql='select id,name,age from person_info',
    dag = dag
)

second=HiveOperator(
    task_id='score_info',
    hive_cli_conn_id="node1-hive-metastore",
    hql='select id,name,score from score_info',
    dag=dag
)

third=HiveOperator(
    task_id='join_info',
    hive_cli_conn_id="node1-hive-metastore",
    hql='select a.id,a.name,a.age,b.score from person_info a join score_info b on a.id = b.id',
    dag=dag
)

first >> second >>third

4、调度python配置脚本

将以上配置好的python文件上传至node4节点$AIRFLOW_HOME/dags下,重启Airflow websever与scheduler,登录webui,开启调度:

调度结果如下:  

四、​​​​​​​PythonOperator

PythonOperator可以调用Python函数,由于Python基本可以调用任何类型的任务,如果实在找不到合适的Operator,将任务转为Python函数,使用PythonOperator即可。

关于PythonOperator常用参数如下,更多参数可以查看官网:airflow.operators.python — Airflow Documentation

代码语言:javascript复制
python_callable(python callable):调用的python函数
op_kwargs(dict):调用python函数对应的 **args 参数,dict格式,使用参照案例。
op_args(list):调用python函数对应的 *args 参数,多个封装到一个tuple中,list格式,使用参照案例。
  • PythonOperator调度案例
代码语言:javascript复制
import random
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

# python中 *  关键字参数允许你传入0个或任意个参数,这些可变参数在函数调用时自动组装为一个tuple。
# python中 ** 关键字参数允许你传入0个或任意个含参数名的参数,这些关键字参数在函数内部自动组装为一个dict。
def print__hello1(*a,**b):
    print(a)
    print(b)
    print("hello airflow1")
# 返回的值只会打印到日志中
    return{"sss1":"xxx1"}

def print__hello2(random_base):
    print(random_base)
    print("hello airflow2")
# 返回的值只会打印到日志中
    return{"sss2":"xxx2"}

default_args = {
    'owner':'maliu',
    'start_date':datetime(2021, 10, 1),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_pythoncode',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=PythonOperator(
    task_id='first',
    #填写  print__hello1 方法时,不要加上“()”
    python_callable=print__hello1,
    # op_args 对应 print_hello1 方法中的a参数
    op_args=[1,2,3,"hello","world"],
    # op_kwargs 对应 print__hello1 方法中的b参数
    op_kwargs={"id":"1","name":"zs","age":18},
    dag = dag
)

second=PythonOperator(
    task_id='second',
    #填写  print__hello2 方法时,不要加上“()”
    python_callable=print__hello2,
    # random_base 参数对应 print_hello2 方法中参数“random_base”
    op_kwargs={"random_base":random.randint(0,9)},
    dag=dag
)

first >> second

0 人点赞