背景
在去年编写自动化测试平台的时候,因为存在发送邮件、异步执行自动化任务、执行定时任务、模块解耦等需求。需要使用MQ,我选择的是RabbitMQ。
但是另一个问题就是缺少一个管理任务的系统,因为开发语言是Python,我选择了Celery。
Celery名词解释
Task | 任务,如定时任务和异步任务 |
---|---|
Broker | 消息队列,可以选择Redis或者mq 更推荐mq |
Worker | 消费者,监听mq收到消息后执行 |
Backend | 存储任务执行结果,一般可选redis |
Beat | 定时任务配置以及执行模块 |
架构如下图
安装
1、linux下使用Docker安装Redis和RabbitMQ
代码语言:javascript复制docker run -d --name myredis -p 6379:6379 redis --requirepass "123456aBaB"docker run -d --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management
2、Python安装Celery
代码语言:javascript复制pip install celery==4.4.7
3、编写tasks.py
代码语言:javascript复制from celery import Celery #broker是mq的地址,backend是redis的celery = Celery('tasks', broker="amqp://admin:admin@192.168.3.53:5672/", backend="redis://:redis123456aB@192.168.3.53:6379/0") #name为指定任务的名字@celery.task(name='run_job_delay')def run_job_delay(a, b): print('执行异步任务') #使用delay发送异步任务run_job_delay.delay(1, 2)
4、编写worker代码
代码语言:javascript复制from celery import Celery #broker是mq的地址,backend是redis的celery = Celery('tasks', broker="amqp://admin:admin@192.168.3.53:5672/", backend="redis://:redis123456aB@192.168.3.53:6379/0") @celery.task(name='run_job_delay')def run_job_delay(a,b): print(a b)
5、启动worker,在worker目录所在的cmd命令行下执行命令,我没写错是的在命令行下
代码语言:javascript复制celery worker -A worker -l info -P eventlet
worker是worker.py的模块名字,-l是日志级别 -P eventlet是windows下启动报错所以加这个参数,需要自己手动安装一下eventlet。
6、运行task.py,发送任务到MQ,此时可以打开mq的控制台看到一条消息插入了队列。
7、worker可以看到日志输出
收到了任务ID为"1bbf4e58-70ec-457c-9762-4ff0157863fd"
任务名称为"run_job_delay"的任务,worker执行任务输出了结果3。
代码语言:javascript复制[2022-01-02 17:12:01,435: INFO/MainProcess] celery@WIN-M3P18OTBSS1 ready.[2022-01-02 17:12:01,443: INFO/MainProcess] pidbox: Connected to amqp://admin:**@192.168.3.53:5672//.[2022-01-02 17:14:52,613: INFO/MainProcess] Received task: run_job_delay[1bbf4e58-70ec-457c-9762-4ff0157863fd][2022-01-02 17:14:52,614: WARNING/MainProcess] 3[2022-01-02 17:14:52,616: INFO/MainProcess] Task run_job_delay[1bbf4e58-70ec-457c-9762-4ff0157863fd] succeeded in 0.0s: None
8、实际项目中使用时我会在场景执行的方法中传递场景ID等参数到worker,然后由worker进行异步执行。
最后
这篇完全属于hello world级别,有兴趣但是无任何经验的同学可以跟着试试,在接下来的几篇分享中会介绍:如何结合flask在项目中使用、如何指定队列、使用beat动态配置定时任务等实际案例。