Celery+Rabbitmq实现异步执行任务

2021-02-26 14:14:06 浏览数 (1)

Celery是Python的一个第三方库,中文为"芹菜"的意思,是一个生产者消费者模式的框架,我们使用Celery时主要用来异步执行任务或执行定时任务,这篇文章介绍实现异步执行任务的方法.

一. 安装celery,再安装rabbitmq或redis

代码语言:javascript复制
# 安装celery
pip install celery

rabbitmq和redis安装其中一个就可以了,他们的作用是作为celery的后端代理,任务的队列.celery官方文档里说了,用两者其一就可以,但优先推荐rabbitmq,具体怎么安装可以自己找一下教程(安装会依赖Erlang,教程很容易找到,如果不装,也可以装redis).

二. 搭建celery任务架构

在项目中适合的位置创建一个celery_tasks目录,在这个目录下写celery的代码,将celery代码与项目业务逻辑代码独立开.(当然也可以不分开,具体根据项目的代码量和实际需要来使用.)

注意:目录名不要直接叫celery,不要与python关键字,第三方模块的名字冲突,否则导致导包出错

在新建的目录下创建config.py, tasks.py, main.py三个python文件分别用于编写celery的配置代码,任务函数代码和任务启动代码

代码语言:javascript复制
# 目录结构
- celery_tasks
    - config.py
    - main.py
    - tasks.py

三. 编写代码实现异步调用任务

--config.py

代码语言:javascript复制
from celery import Celery


# 创建celery对象app,demo是对celery对象的命名,自定义,见名知义即可
# broker指定后端代理,可以使用mq或redis,主要起到任务队列的作用
app = Celery('demo', broker='amqp://guest@localhost:5672//')
# app = Celery('demo', broker='redis://127.0.0.1:6379/15')

--tasks.py

代码语言:javascript复制
from config import app


# 定义任务,使用celery对象.task装饰任务,celery即可自动识别任务
@app.task(name='celery_task1_name')
def celery_task1_name(arg):
    print('编写需要执行的任务代码', arg)


@app.task(name='celery_task2_name')
def celery_task2_name():
    print('将需要执行的代码导入tasks.py文件,然后在这里调用即可')

--main.py

代码语言:javascript复制
from tasks import *


# 设置celery对象自动识别任务,
# 'celery_tasks'指定tasks.py的目录,保证程序能找到tasks.py
app.autodiscover_tasks(['celery_tasks'])

四. 启动celery任务 找到main.py所在目录下,执行如下命令,如果不在此目录,则main前要写相对路径,如:celery_tasks.main

代码语言:javascript复制
celery -A main worker -l info

参数说明:-A 指定celery的启动入口main, worker为celery执行任务的后端工人,-l指定日志级别为info 执行成功后,celery就会启动worker,从代理队列中获取任务并执行,如果任务队列为空,则一直等待到有任务

Windows Bug:如果Celery4.0以上的版本在Windows上使用,通过上面的启动命令启动,在执行task.delay()时会报错:ValueError: not enough values to unpack (expected 3, got 0)

Linux不会出现此问题,Windows才有,与“绿色线程”有关,具体阅读eventlet相关资料

解决办法:

安装eventlet

代码语言:javascript复制
pip install eventlet

启动worker时增加-P eventlet参数

代码语言:javascript复制
celery -A main worker -l info -P eventlet

五. 调用celery异步执行任务 在需要执行异步任务的地方导入任务,使用task.delay(参数)调用任务 如:与celery_tasks目录同级的demo目录下有一个demo.py文件,我在demo.py中异步执行任务

代码语言:javascript复制
from celery_tasks.main import celery_task1_name, celery_task2_name


def demo_func(a):
    # 调用格式:任务名.delay(参数)
    celery_task1_name.delay(a)
    print('celery_task1_name执行完成:{}!'.format(a))
    celery_task2_name.delay()
    print('celery_task1_name执行完成!')


demo_func('hello celery!')

现在已经实现了celery异步调用任务了,复制以上步骤中的代码即可实现异步任务的demo.

0 人点赞