Celery 用来处理工作流和多个队列

2023-10-14 16:14:25 浏览数 (3)

Celery 是一个与django很好地集成的异步任务队列。在这篇文章中,我不会写一篇关于如何设置和使用 celery 的教程,已经有很多文章了。我将讨论我在我从事的一些项目中使用的 celery 的一些高级功能。

任务的分组和链接

考虑一个场景,你正在做一个电子商务项目,你想编写一个任务来更新产品详细信息,并且只在所有更新时调用 API 来更新状态。一种方法是在没有 celery 的情况下编写 cron 作业,但这将是同步的。每个产品都会阻塞线程,直到它完成。但是,对于 celery group primitives,它将是异步的,即将为每个产品创建一个新任务,并且它们异步运行而不会相互阻塞。

以下是我们更新产品详细信息的功能

代码语言:javascript复制
@app.task(name="update_product_details" )
def update_product_details(product_id):
    try:
        product_info = make_http_call(product_id)
        obj = Product.objects.get(id=product_id)
        for key, val in product_info.items():
            setattr(obj, key, val)
            obj.save()
    except Exception:
        return {"status": False, "message" : "error in updating"}
    return {"status": True, "message": "successfully updated"}

下面是每天运行的用于更新产品详细信息的 cron。

代码语言:javascript复制
from celery import group, chain

def cron_to_update_product(products):
    group_tasks = []
    for product in products:
        group_tasks.append(update_product_details.s(product.product_id))
    async_result = chain(
        group(group_tasks), update_status_through_callback.s()
    ).apply_async()
    print(
        "a task with id %s is created to update product details" % async_result.task_id
    )
代码分解

.s- 添加到任务称为signature

group(group_tasks)- 芹菜创建n产品数量,其中n产品数量为。所有这些任务将并发执行而不会相互阻塞。

chain(group(group_tasks), update_status_through_callback.s())- 顾名思义,任务是按顺序执行的。一旦组中的所有任务都完成,然后update_status_through_callback运行

apply_async- 运行任务

这里有一个关键点需要注意,函数update_status_through_callback应该grouped_result作为第一个参数。grouped_result将是所有分组任务的返回值列表。 例如,有 5 个组任务运​​行,其中 3 个失败。然后grouped_result将看起来像这样

代码语言:javascript复制
[
    {"status": False, "message": "error in updating"},
    {"status": True, "message": "successfully updated"},
    {"status": False, "message": "error in updating"},
    {"status": True, "message": "successfully updated"},
    {"status": False, "message": "error in updating"},
]

最后,我们的update_status_through_callback样子是这样的

代码语言:javascript复制
def update_status_through_callback(grouped_result):
    if not all([result["status"] for result in grouped_result]):
        return {"status": False,
                "message": "not all products are updated"
        }
    response = make_http_call()
    if not is_valid_response(rseponse):
        return {"status": False,
                "message": "error in making callback"
               }
    return {"status": True, "message": "updated status"}

在函数的第一行,我们检查是否所有组任务都已成功执行,因为我们应该只更新所有产品的状态。

任务路由

我们都使用像这样的简单命令来运行 celery celery worker -A proj_name。当项目的任务数量较少时,只运行一个工人规模。但是,考虑一下您正在从事电子商务项目的相同场景,您想要运行不同类型的报告。假设您只运行一个队列,很少有报告会花费很多时间(说出它们long_running_tasks),而很少有报告会花费更少的时间(说出它们short_running_tasks)。假设当你得到很多long_running_tasks使得队列填满并且short_running_tasks必须等到他们完成。这可能无法很好地扩展。因此,可扩展的解决方案是为每种报告类型创建单独的队列。但是这种方法也有一个问题。如果没有针对特定报告类型的任务,运行这些队列是一种资源浪费。因此,根据业务用例使用第一种方法还是第二种方法是一种权衡。

要根据报告类型运行多个队列,您需要使用此芹菜配置

代码语言:javascript复制
CELERY_BROKER_URL = "redis://localhost:6379" # if your broker
# is different change this
CELERY_RESULT_BACKEND = "redis://localhost:6379" # change this
# if this is different for you
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TASK_DEFAULT_QUEUE = "default"
CELERY_TASK_DEFAULT_EXCHANGE = "default"
CELERY_TASK_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_TASK_DEFAULT_ROUTING_KEY = "task.default"

CELERY_TASK_ROUTES = {  
  "foo.tasks.report_type1_aggregator": {
      "queue": "report_type1_aggregator_queue"
  },
  "foo.tasks.report_type2_aggregator": {
      "queue": "report_type2_aggregator_queue"
  },
  "foo.tasks.report_type3_aggregator": {
      "queue": "report_type3_aggregator_queue"
  },
  "foo.tasks.report_type1_report_queue": {
      "queue": "report_type1_report_queue"
  },
  "foo.tasks.report_type2_report_queue": {
      "queue": "report_type2_report_queue"
  },
  "foo.tasks.report_type3_report_queue": {
      "queue": "report_type3_report_queue"
  }
}

我们为每个任务定义了路线并为其分配了一个队列。但是,我们还没有在 celery 中创建工人。我们可以使用以下命令创建工人

代码语言:javascript复制
celery worker -A proj_name -O fair -Q {queue_name}
 -P gevent --autoscale=32,16 --loglevel=INFO 
 --logfile={queue_name}_celery.log

对我们定义的所有队列重复上述命令。

提示 :

不要运行许多命令,而是使用该celery multi实用程序。此处给出示例

现在,让我们创建一个默认工作人员

代码语言:javascript复制
celery worker -A proj_name -O fair -Q default 
-P gevent --loglevel=INFO --logfile=celery.log

就是这样!当您运行任务时,它们将被路由到相应的队列。

1 人点赞