Celery 是一个与django很好地集成的异步任务队列。在这篇文章中,我不会写一篇关于如何设置和使用 celery 的教程,已经有很多文章了。我将讨论我在我从事的一些项目中使用的 celery 的一些高级功能。
任务的分组和链接
考虑一个场景,你正在做一个电子商务项目,你想编写一个任务来更新产品详细信息,并且只在所有更新时调用 API 来更新状态。一种方法是在没有 celery 的情况下编写 cron 作业,但这将是同步的。每个产品都会阻塞线程,直到它完成。但是,对于 celery group primitives,它将是异步的,即将为每个产品创建一个新任务,并且它们异步运行而不会相互阻塞。
以下是我们更新产品详细信息的功能
代码语言:javascript复制@app.task(name="update_product_details" )
def update_product_details(product_id):
try:
product_info = make_http_call(product_id)
obj = Product.objects.get(id=product_id)
for key, val in product_info.items():
setattr(obj, key, val)
obj.save()
except Exception:
return {"status": False, "message" : "error in updating"}
return {"status": True, "message": "successfully updated"}
下面是每天运行的用于更新产品详细信息的 cron。
代码语言:javascript复制from celery import group, chain
def cron_to_update_product(products):
group_tasks = []
for product in products:
group_tasks.append(update_product_details.s(product.product_id))
async_result = chain(
group(group_tasks), update_status_through_callback.s()
).apply_async()
print(
"a task with id %s is created to update product details" % async_result.task_id
)
代码分解
.s
- 添加到任务称为signature
。
group(group_tasks)
- 芹菜创建n
产品数量,其中n
产品数量为。所有这些任务将并发执行而不会相互阻塞。
chain(group(group_tasks), update_status_through_callback.s())
- 顾名思义,任务是按顺序执行的。一旦组中的所有任务都完成,然后update_status_through_callback
运行
apply_async
- 运行任务
这里有一个关键点需要注意,函数update_status_through_callback
应该grouped_result
作为第一个参数。grouped_result
将是所有分组任务的返回值列表。
例如,有 5 个组任务运行,其中 3 个失败。然后grouped_result
将看起来像这样
[
{"status": False, "message": "error in updating"},
{"status": True, "message": "successfully updated"},
{"status": False, "message": "error in updating"},
{"status": True, "message": "successfully updated"},
{"status": False, "message": "error in updating"},
]
最后,我们的update_status_through_callback
样子是这样的
def update_status_through_callback(grouped_result):
if not all([result["status"] for result in grouped_result]):
return {"status": False,
"message": "not all products are updated"
}
response = make_http_call()
if not is_valid_response(rseponse):
return {"status": False,
"message": "error in making callback"
}
return {"status": True, "message": "updated status"}
在函数的第一行,我们检查是否所有组任务都已成功执行,因为我们应该只更新所有产品的状态。
任务路由
我们都使用像这样的简单命令来运行 celery celery worker -A proj_name
。当项目的任务数量较少时,只运行一个工人规模。但是,考虑一下您正在从事电子商务项目的相同场景,您想要运行不同类型的报告。假设您只运行一个队列,很少有报告会花费很多时间(说出它们long_running_tasks
),而很少有报告会花费更少的时间(说出它们short_running_tasks
)。假设当你得到很多long_running_tasks
使得队列填满并且short_running_tasks
必须等到他们完成。这可能无法很好地扩展。因此,可扩展的解决方案是为每种报告类型创建单独的队列。但是这种方法也有一个问题。如果没有针对特定报告类型的任务,运行这些队列是一种资源浪费。因此,根据业务用例使用第一种方法还是第二种方法是一种权衡。
要根据报告类型运行多个队列,您需要使用此芹菜配置
代码语言:javascript复制CELERY_BROKER_URL = "redis://localhost:6379" # if your broker
# is different change this
CELERY_RESULT_BACKEND = "redis://localhost:6379" # change this
# if this is different for you
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TASK_DEFAULT_QUEUE = "default"
CELERY_TASK_DEFAULT_EXCHANGE = "default"
CELERY_TASK_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_TASK_DEFAULT_ROUTING_KEY = "task.default"
CELERY_TASK_ROUTES = {
"foo.tasks.report_type1_aggregator": {
"queue": "report_type1_aggregator_queue"
},
"foo.tasks.report_type2_aggregator": {
"queue": "report_type2_aggregator_queue"
},
"foo.tasks.report_type3_aggregator": {
"queue": "report_type3_aggregator_queue"
},
"foo.tasks.report_type1_report_queue": {
"queue": "report_type1_report_queue"
},
"foo.tasks.report_type2_report_queue": {
"queue": "report_type2_report_queue"
},
"foo.tasks.report_type3_report_queue": {
"queue": "report_type3_report_queue"
}
}
我们为每个任务定义了路线并为其分配了一个队列。但是,我们还没有在 celery 中创建工人。我们可以使用以下命令创建工人
代码语言:javascript复制celery worker -A proj_name -O fair -Q {queue_name}
-P gevent --autoscale=32,16 --loglevel=INFO
--logfile={queue_name}_celery.log
对我们定义的所有队列重复上述命令。
提示 :
不要运行许多命令,而是使用该celery multi
实用程序。此处给出示例
现在,让我们创建一个默认工作人员
代码语言:javascript复制celery worker -A proj_name -O fair -Q default
-P gevent --loglevel=INFO --logfile=celery.log
就是这样!当您运行任务时,它们将被路由到相应的队列。