【译】Celery文档3:在Django中使用Celery

2024-05-10 19:02:05 浏览数 (3)

https://docs.celeryq.dev/en/latest/django/first-steps-with-django.html#django-first-steps

First steps with Django

Django3.1后默认支持Celery,不再需要安装额外的库。

Django项目布局大概是这样的:

代码语言:javascript复制
- proj/
  - manage.py
  - proj/
    - __init__.py
    - settings.py
    - urls.py

想要添加celery,推荐在proj/proj目录下创建一个celery.py 模块,并定义Celery实例:

代码语言:javascript复制
import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
# 设置环境变量,使得不必将设置模块传入celery。
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY') # 使用Django的设置模块作为celery的配置源

# Load task modules from all registered Django apps.
app.autodiscover_tasks()


@app.task(bind=True, ignore_result=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

然后,您需要在 proj/proj/__init__.py模块中导入此应用程序。这确保了在 Django 启动时加载应用程序,以便 @shared_task 装饰器(稍后提到)将使用它: proj/proj/__init__.py:

代码语言:javascript复制
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

Django的配置文件可能包括: settings.py

代码语言:javascript复制
...

# Celery Configuration Options
CELERY_TIMEZONE = "Australia/Tasmania"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60

接下来,通常在单独的 tasks.py 模块中定义所有任务,而 Celery实际上有一种方法可以自动发现这些模块: app.autodiscover_tasks() 通过上面的一行,Celery 将按照 tasks.py 约定自动发现所有已安装应用程序中的任务:

代码语言:javascript复制
- app1/
    - tasks.py
    - models.py
- app2/
    - tasks.py
    - models.py

这样,您就不必手动将各个模块添加到设置中 CELERY_IMPORTS 。

proj/proj/celery.py文件的最后定义了debug_task任务,该任务是一个转储自己的请求信息的任务。使用 Celery 3.1 中引入的bind=True选项来轻松使用当前任务实例。

使用@shared_task 装饰器

@shared_task装饰器允许您创建任务,而无需任何具体的app实例:demoapp/tasks.py:

代码语言:javascript复制
 Create your tasks here

from demoapp.models import Widget

from celery import shared_task


@shared_task
def add(x, y):
    return x   y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)


@shared_task
def count_widgets():
    return Widget.objects.count()


@shared_task
def rename_widget(widget_id, name):
    w = Widget.objects.get(id=widget_id)
    w.name = name
    w.save()

Django示例的完整代码:https://github.com/celery/celery/tree/main/examples/django/

在数据库事务结束时触发任务

Django 的一个常见陷阱是立即触发任务,而不是等到数据库事务结束,这意味着 Celery 任务可能会在所有更改都持久化到数据库之前运行。例如:

代码语言:javascript复制
# views.py
def create_user(request):
    # Note: simplified example, use a form to validate input
    user = User.objects.create(username=request.POST['username'])
    send_email.delay(user.pk)
    return HttpResponse('User created')

# task.py
@shared_task
def send_email(user_pk):
    user = User.objects.get(pk=user_pk)
    # send email ...

在这种情况下, send_email 任务可能在视图将事务提交到数据库之前启动,因此任务可能无法找到用户。一个常见的解决方案是在事务提交后使用 Django 的 on_commit 钩子来触发任务:

代码语言:javascript复制
- send_email.delay(user.pk)
  transaction.on_commit(lambda: send_email.delay(user.pk))

由于这是一种常见的模式,Celery 5.4 为此引入了一个方便的快捷方式,使用 DjangoTask。你可以使用delay_on_commit() 替代delay():

代码语言:javascript复制
- send_email.delay(user.pk)
  send_email.delay_on_commit(user.pk)

Extensions

django-celery-results 使用 Django ORM/Cache 作为结果后端

https://pypi.org/project/django-celery-results/

django-celery-beat - 具有管理界面的数据库支持的定期任务

启动工作进程

在生产环境中,你会希望在后台运行 worker 作为守护进程 - 参见 Daemonization[1] - 但对于测试和开发来说,能够使用 celery worker manage 命令启动工作线程实例很有用. celery -A proj worker -l INFO

引用链接

[1] Daemonization: https://docs.celeryq.dev/en/latest/userguide/daemonizing.html#daemonizing

1 人点赞