【译】Celery文档1:First Steps with Celery——安装和配置Celery

2024-04-30 16:17:36 浏览数 (3)

https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html#first-steps

Celery的第一步

Celery时一个自带电池的任务队列。本教程内容:

  • 安装消息传输代理(broker)
  • • 安装Celery并创建第一个任务(task)
  • 启动Celery工作进程(worker)并执行任务
  • 追踪任务的状态

选择Broker

Celery需要一个方法来发送和接受消息,这个方法被称为消息代理(message broker)。Celery支持多种消息代理,如RabbitMQ、Redis等。

安装RabbitMQ:(推荐)在Dockers上运行RabbitMQ:

代码语言:javascript复制
docker run -d -p 5672:5672 rabbitmq

或者在Ubuntu上安装RabbitMQ:

代码语言:javascript复制
sudo apt-get install rabbitmq-server

运行Celery worker server

代码语言:javascript复制
celery -A tasks worker --loglevel=INFO

Windows下有个坑:celery正常启动和接收任务但不能执行,报错:ValueError: not enough values to unpack (expected 3, got 0)。需要借助eventlet:1.安装eventlet: pip install eventlet 2.借助eventlet启动celery: celery -A tasks worker --loglevel=INFO -P eventlet 参考1:https://www.cnblogs.com/qumogu/p/13284173.html 参考2:https://stackoverflow.com/questions/37255548/how-to-run-celery-on-windows 但这只是一个临时解决方案, celery对windows的支持很差,最好还是在Linux下运行。windows系统可以用WSL。

调用task

使用delay()方法调用task:

在Python shell中:

代码语言:javascript复制
from tasks import add
add.delay(4, 4)

注:delay()方法是apply_async()方法的快捷方式。

然后,之前启动的worker进程会执行这个任务。可以在worker进程的日志中看到任务的执行情况:

代码语言:javascript复制
[2024-04-10 21:58:25,217: INFO/MainProcess] Task tasks.add[987d2e18-0090-4b5b-bcb5-bd038b9690a3] received
[2024-04-10 21:58:25,221: INFO/MainProcess] Task tasks.add[987d2e18-0090-4b5b-bcb5-bd038b9690a3] succeeded in 0.0s: 8   

保留结果

如果要跟踪任务的状态, Celery需要将状态存储或发送到某个地方,如SQLAlchemy/Django ORM、MongoDB、Memcached、Redis、RPC(RabbitMQ/AMQP),并且可以自定义。

在此示例中,我们使用 rpc作为结果后端(result backend),它将状态作为暂时性消息发送回。Celery通过 backend参数 指定后端(如果选择使用配置模块,则通过result_backend设置指定)。因此,您可以在 tasks.py 文件中修改此行以启用 rpc:// 后端:

代码语言:javascript复制
app = Celery('tasks', backend='rpc://', broker='pyamqp://')

或者,如果您想使用 Redis 作为结果后端,但仍然使用 RabbitMQ 作为消息代理(一种流行的组合):

代码语言:javascript复制
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

现在配置了结果后端,关闭当前 python 会话并再次导入 tasks 模块以使更改生效。这一次,您将保留调用任务时返回的 AsyncResult 实例:

代码语言:javascript复制
from tasks import add   
result = add.delay(4, 4)

然后可以用ready()方法检查任务是否完成:

代码语言:javascript复制
result.ready()

您可以等待结果完成,但很少使用,因为这会将异步调用转换为同步调用:

代码语言:javascript复制
result.get(timeout=1)
8

Configuration

Celery就像家用电器一样,不需要太多配置。只需要配置输入(连接到代理 broker)和输出(连接到结果后端)即可使用。但是,如果你仔细观察,你会发现有很多按钮。这就是配置选项。默认的配置通常是足够的,但是也可以通过修改配置让Celery更适合你的需求。

可以直接在app上修改配置:

代码语言:javascript复制
app.conf.task_serializer = 'json'

如果一次性修改多个配置,可以使用update方法:

代码语言:javascript复制
app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

对于较大的项目,建议使用专用的配置模块。可以用app.config_from_object()告诉 Celery 使用配置模块:

代码语言:javascript复制
app.config_from_object('celeryconfig')

配置模块名称通常是celeryconfig

该模块必须在当前目录可以访问, celeryconfig.py:

代码语言:javascript复制
broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

要验证配置文件是否正常工作且不包含任何语法错误,可以尝试导入它: python -m celeryconfig

下面是两个配置示例:将行为异常的任务路由到专用队列的方式

代码语言:javascript复制
task_routes = {
    'tasks.add': 'low-priority',
}

对任务进行速率限制

代码语言:javascript复制
task_annotations = {
    'tasks.add': {'rate_limit': '10/m'}
}

1 人点赞