Celery多个定时任务使用RabbitMQ,Queue冲突解决

2021-02-26 14:14:28 浏览数 (1)

一. 场景描述

1.使用celery实现定时任务后,任务会被定时添加到后端指定的队列里,队列可以是RabbitMQ,也可以是redis.

2.在创建Celery对象app的时候,指定了使用rabbitmq作为后端代理broker, celery会将定时任务异步添加到mq队列中,worker从队列中获取任务.

3.如果已经运行了一个celery定时任务A,定时任务A使用mq,此时要新增另一个celery定时任务B,定时任务B也直接使用mq,那么两个不同的定时任务在使用同一个队列,会出现任务混乱.

因为worker执行完任务后会自动去队列中取任务,也就是说,任务A的worker可能会从队列中获取到任务B的任务,任务B的worker也可能会从队列中获取到任务A的任务.

4.当worker获取到的任务不是本项目的任务时,程序就会报错.

二. 解决queue冲突的方法和原理

1.Celery会自动识别任务,自动将定时任务添加到队列.

2.Queue(队列)是RabbitMQ的内部对象,用于存储任务.

3.但celery不是直接将任务放到Queue(队列)中,而是先通过Exchange, Exchange控制任务存放到队列的路由Route,不同的Route指向不同的Queue.

4.使用者可以自定义不同的Queue和Route,并指定Queue和Route的对应关系,用来指定不同定时任务存放到不同队列.

5.在定时任务的配置文件中指定Queue和Route,Exchange就会将定时任务添加到对应的队列,worker也会到这个队列中取任务,避免冲突.

三. 编写代码解决Queue冲突

1.celery定时任务目录结构

代码语言:javascript复制
# 目录结构
- celery_crontab
    - config.py
    - main.py
    - tasks.py

2.在tasks.py中编写任务函数代码

代码语言:javascript复制
from config import app


@app.task
def crontab_func1():
    print('在此编写任务要实现的代码')
  
  
@app.task
def crontab_func2():
    print('在此调用实现了定时任务功能的函数或方法')
  

3.在配置文件config.py中自定义Queue,Route,并将Queue和Route配置到Celery对象中

代码语言:javascript复制
from celery import Celery
from kombu import Exchange, Queue


# celery
app = Celery('demo', broker='amqp://guest@localhost:5672//')
# Queue
queue = (
    # 定义专用的queue,定义Exchange,以及与route对应的key
    Queue('queue_demo', Exchange('exchange_demo', type='direct'),
    routing_key='queue_demo_key'),
)
# Route
route = {
    # 定义任务crontab_func1的queue,routing_key
    'tasks.crontab_func1': {'queue': 'queue_demo', 'routing_key': 'queue_demo_key'},
    'tasks.crontab_func2': {'queue': 'queue_demo', 'routing_key': 'queue_demo_key'},
}


# 指定queue和route的配置应用到celery定时任务的配置中
app.conf.update(CELERY_QUEUES=queue, CELERY_ROUTES=route)

4.在main.py中需要将app.conf.beat_schedule改为app.conf.update,具体如下

代码语言:javascript复制
from celery.schedules import crontab

from tasks import *


app.conf.update(
    CELERYBEAT_SCHEDULE={
        "crontab_func1": {
            'task': 'tasks.crontab_func1',
            'schedule': crontab(minute='*/1'),
            'args': ()
        },
        "crontab_func2": {
            'task': 'tasks.crontab_func2',
            'schedule': crontab(minute='*/1'),
            'args': ()
        },
    },
)

四. 定时任务的启动 在任务的启动命令中要加上-Q参数,指定任务的队列名,也就是在config.py中自定义的Queue名

代码语言:javascript复制
# -Q指定当前定时任务的队列,与config.py中定义的queue名保持一致
celery multi start demo_work -A main -Q queue_demo -l info -B --logfile=crontablog.log
# 停止将start改成stop
celery multi stop demo_work -A main -Q queue_demo -l info -B --logfile=crontablog.log
# 重启用restart
celery multi restart demo_work -A main -Q queue_demo -l info -B --logfile=crontablog.log

现在每个定时任务都有指定的队列,所以不管有多少定时任务,都不会出现冲突.

后续如果还有更多的Celery定时任务,均可使用这个方法,定义不重复(通过队列名区分)的Queue和Route,按照上面的步骤实现,避免不同项目之间存取任务的混乱.

0 人点赞