【Celery实践一】安装以及入门

2022-01-11 10:40:04 浏览数 (1)

背景

在去年编写自动化测试平台的时候,因为存在发送邮件、异步执行自动化任务、执行定时任务、模块解耦等需求。需要使用MQ,我选择的是RabbitMQ。

但是另一个问题就是缺少一个管理任务的系统,因为开发语言是Python,我选择了Celery。

Celery名词解释

Task

任务,如定时任务和异步任务

Broker

消息队列,可以选择Redis或者mq 更推荐mq

Worker

消费者,监听mq收到消息后执行

Backend

存储任务执行结果,一般可选redis

Beat

定时任务配置以及执行模块

架构如下图 

安装

1、linux下使用Docker安装Redis和RabbitMQ

代码语言:javascript复制
docker run -d --name myredis -p 6379:6379 redis --requirepass "123456aBaB"docker run -d --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management

2、Python安装Celery

代码语言:javascript复制
pip install celery==4.4.7

3、编写tasks.py

代码语言:javascript复制
from celery import Celery #broker是mq的地址,backend是redis的celery = Celery('tasks', broker="amqp://admin:admin@192.168.3.53:5672/",                backend="redis://:redis123456aB@192.168.3.53:6379/0") #name为指定任务的名字@celery.task(name='run_job_delay')def run_job_delay(a, b):    print('执行异步任务') #使用delay发送异步任务run_job_delay.delay(1, 2)

4、编写worker代码

代码语言:javascript复制
from celery import Celery #broker是mq的地址,backend是redis的celery = Celery('tasks', broker="amqp://admin:admin@192.168.3.53:5672/",                backend="redis://:redis123456aB@192.168.3.53:6379/0") @celery.task(name='run_job_delay')def run_job_delay(a,b):    print(a b)

5、启动worker,在worker目录所在的cmd命令行下执行命令,我没写错是的在命令行下

代码语言:javascript复制
celery worker -A worker -l info -P eventlet

    worker是worker.py的模块名字,-l是日志级别 -P eventlet是windows下启动报错所以加这个参数,需要自己手动安装一下eventlet。

6、运行task.py,发送任务到MQ,此时可以打开mq的控制台看到一条消息插入了队列。

7、worker可以看到日志输出

收到了任务ID为"1bbf4e58-70ec-457c-9762-4ff0157863fd"

任务名称为"run_job_delay"的任务,worker执行任务输出了结果3。

代码语言:javascript复制
[2022-01-02 17:12:01,435: INFO/MainProcess] celery@WIN-M3P18OTBSS1 ready.[2022-01-02 17:12:01,443: INFO/MainProcess] pidbox: Connected to amqp://admin:**@192.168.3.53:5672//.[2022-01-02 17:14:52,613: INFO/MainProcess] Received task: run_job_delay[1bbf4e58-70ec-457c-9762-4ff0157863fd][2022-01-02 17:14:52,614: WARNING/MainProcess] 3[2022-01-02 17:14:52,616: INFO/MainProcess] Task run_job_delay[1bbf4e58-70ec-457c-9762-4ff0157863fd] succeeded in 0.0s: None

8、实际项目中使用时我会在场景执行的方法中传递场景ID等参数到worker,然后由worker进行异步执行。

最后

这篇完全属于hello world级别,有兴趣但是无任何经验的同学可以跟着试试,在接下来的几篇分享中会介绍:如何结合flask在项目中使用、如何指定队列、使用beat动态配置定时任务等实际案例。

0 人点赞