Celery入门与实战

2023-12-05 18:16:27 浏览数 (3)

在开发过程中,处理异步任务是一项重要而常见的任务。为了更好地管理和处理这些任务,目前比较强大与实用的有 Celery。Celery 是一个基于 Python 的分布式任务队列,旨在帮助开发者处理异步任务,从而提高应用程序的可伸缩性和性能。

Celery 简介

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. It’s a task queue with focus on real-time processing, while also supporting task scheduling.

Celery 是一个开源 Python 库,用于异步运行任务。它是一个任务队列,保存任务并以适当的方式将它们分发给工作人员。它主要侧重于实时操作,但也支持调度(运行定期间隔任务)。为我们提供了高效的异步任务处理解决方案。Celery 引入了各种消息代理,例如RabbitMQRedis。

Celery 结合了各种 Web 框架,包括 Flask、Pylons、web2py、Tryton 和 Tornado。

Celery的优点

  1. 异步任务处理:Celery允许将耗时的任务异步执行,避免阻塞主应用程序。这对于需要长时间处理的任务,如发送电子邮件、处理大量数据,特别有用。
  2. 分布式计算:Celery支持将任务分发到多台计算机或节点上,从而实现分布式计算。这使得可以轻松地将任务分散到多个服务器上,以提高任务处理能力。
  3. 定时任务调度:Celery支持定时任务的调度,可以在预定的时间点或周期性地执行任务。这对于自动化重复性任务非常有用,如定时数据备份或数据清理。
  4. 可扩展性:Celery的架构支持水平扩展,可以根据需要增加更多的任务队列和工作进程,以适应不断增长的任务负载。
  5. 容错性:Celery提供了一些机制来处理失败的任务,例如重试机制和错误处理。它还支持将任务结果存储在持久化存储中,以防止任务结果丢失。

Celery的架构

Celery的架构由多个组件组成,包括任务发布者、任务队列和工作进程。以下是它们的主要角色:

  1. 任务发布者(Producer):任务发布者负责将需要执行的任务发布到任务队列中。这可以是Web应用、命令行工具或其他应用程序。
  2. 任务队列(Broker):任务队列是用于存储和传递待执行任务的中间件。Celery支持多种消息中间件,如RabbitMQ、Redis、Amazon SQS等。
  3. 工作进程(Worker):工作进程从任务队列中获取任务,执行任务,并将执行结果返回。您可以配置多个工作进程来处理任务,从而实现并行处理和高吞吐量。

消息代理

Celery 支持多种消息代理,其中两个常用的选择是 RabbitMQ 和 Redis。选择合适的消息代理取决于你的项目需求。

RabbitMQ

RabbitMQ 是一个高度可靠的消息代理,适用于大规模和复杂的应用程序。你可以使用 RabbitMQ 来实现任务的分发和处理,同时它支持高级的消息队列特性,如消息确认和持久化。

代码语言:javascript复制
# 使用 RabbitMQ 作为消息代理
app = Celery('myapp', broker='pyamqp://guest@localhost//')

Redis

Redis 是一个快速的消息代理,适用于小型和中小型应用程序。它的速度和简单性使其成为一个不错的选择。你可以使用 Redis 来加速任务的分发和处理。

代码语言:javascript复制
# 使用 Redis 作为消息代理
app = Celery('myapp', broker='redis://localhost:6379/0')

celery的安装与使用

创建python虚拟环境

代码语言:javascript复制
python3.9 -m venv py39

进入虚拟环境

代码语言:javascript复制
source py39/bin/activate 
pip install celery==5.1.2 pip install redis==3.5.3

异步任务示例

写一个task.py 文件,用于异步任务调度的demo

代码语言:javascript复制
from celery import Celery
#定义消息代理(broker)的地址
BROKER_URL = 'redis://localhost:6379/0'
#定义结果后端(backend)的地址
BACKEND_URL = 'redis://localhost:6379/1'
#创建一个 Celery 应用实例
app = Celery('tasks', broker=BROKER_URL, backend=BACKEND_URL, )

#使用装饰器定义一个 Celery 异步任务,任务名为 'celery demo run'
@app.task(name='celery demo run')
def add(x, y):
    return x   y

代码语言:javascript复制
写一个run_task.py 文件,用于调度异步任务入口
代码语言:javascript复制
from tasks import add

if __name__ == '__main__':
    print('task start....')
    result = add.delay(2,3)
    print('task end....')
    print(result)

在不启动 Celery 工作者(worker)的时候直接执行run_task.py,可以看到直接返回了celery异步任务的task id。

代码语言:javascript复制
python run_task.py
task start....
task end....
cf191a9d-ef91-46ee-b0c4-5153e853079d

我们启动 Celery 工作者(worker),可以看到下面的输出,celery 从redis拿到任务,并且执行输出

启动 Celery 工作者参数:

代码语言:javascript复制

-A, --app: 指定 Celery 应用模块的名称。这是必要参数,用于加载应用程序的配置。例如:-A tasks 表示加载名为 tasks 的 Celery 应用。
--loglevel: 指定日志级别,控制日志的输出详细程度。常用的级别包括 info、warning、error 等。
--concurrency: 设置工作者的并发数,即同时处理任务的数量。默认值是 CPU 核心数的 2 倍。
--queues: 指定工作者处理的队列。可以使用逗号分隔的队列名列表,例如 queue1,queue2。
--hostname: 设置工作者的主机名,用于识别不同的工作者实例。
--prefetch-multiplier: 设置工作者从队列中预取的任务数量。默认值为 4。
--max-tasks-per-child: 设置工作者在重新启动之前可以处理的最大任务数。用于防止内存泄漏。
--time-limit: 限制单个任务的最大执行时间(秒)。
代码语言:javascript复制
celery  -A tasks worker -l INFO

 
 -------------- celery@C02G20FCMD6M v5.1.2 (sun-harmonics)
--- ***** ----- 
-- ******* ---- macOS-11.7.4-x86_64-i386-64bit 2023-08-30 02:35:35
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x10edd0550
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/1
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . celery demo run

[2023-08-30 02:35:35,451: INFO/MainProcess] Connected to redis://localhost:6379/0
[2023-08-30 02:35:35,455: INFO/MainProcess] mingle: searching for neighbors
[2023-08-30 02:35:36,469: INFO/MainProcess] mingle: all alone
[2023-08-30 02:35:36,484: INFO/MainProcess] celery@C02G20FCMD6M ready.
[2023-08-30 02:35:36,488: INFO/MainProcess] Task celery demo run[cf191a9d-ef91-46ee-b0c4-5153e853079d] received
[2023-08-30 02:35:36,510: INFO/ForkPoolWorker-1] Task celery demo run[39c2e810-4337-4133-b5ff-ea64dfc0f49f] succeeded in 0.016289540000000047s: 5
代码语言:javascript复制
    再执行一次run_task 脚本,可以看到work 正常调度了
代码语言:javascript复制
python run_task.py
task start....
task end....
f6137e38-35a2-4df7-be8a-cccd4035c293

[2023-08-30 02:37:33,978: INFO/MainProcess] Task celery demo run[f6137e38-35a2-4df7-be8a-cccd4035c293] received
[2023-08-30 02:37:33,980: INFO/ForkPoolWorker-8] Task celery demo run[f6137e38-35a2-4df7-be8a-cccd4035c293] succeeded in 0.00101882100000239s: 5

定时任务示例

在Celery中,定时任务通常被称为"periodic tasks",它允许你在指定的时间间隔内自动执行任务。

代码语言:javascript复制
from celery import Celery
#定义消息代理(broker)的地址
BROKER_URL = 'redis://localhost:6379/0'
#定义结果后端(backend)的地址
BACKEND_URL = 'redis://localhost:6379/1'
#创建一个 Celery 应用实例
app = Celery('tasks', broker=BROKER_URL, backend=BACKEND_URL, )
# 在 Celery 应用配置中设置消息序列化方式
app.conf.update(
    task_serializer='json',  # 任务消息序列化方式
    result_serializer='json',  # 任务结果序列化方式
)
# 在 Celery 应用配置中设置并发参数
app.conf.update(
    worker_concurrency=4,  # 同时执行的工作进程数量
    task_max_retries=3,  # 单个任务的最大执行次数(重试次数)
)


app.conf.beat_schedule = {
    'my-periodic-task': {
        'task': 'tasks.my_periodic_task',
        'schedule': 10.0,  # 每10秒执行一次
    },
}

@app.task
def my_periodic_task():
    print("test1111111")

启动beat 进程

代码语言:javascript复制
celery -A tasks beat --loglevel=info

可以看到beat调度的任务日志

代码语言:javascript复制
celery beat v5.1.2 (sun-harmonics) is starting.
__    -    ... __   -        _
LocalTime -> 2023-09-22 10:58:09
Configuration ->
    . broker -> redis://localhost:6379/0
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 minutes (300s)
[2023-09-22 10:58:09,863: INFO/MainProcess] beat: Starting...
[2023-09-22 10:58:09,883: INFO/MainProcess] Scheduler: Sending due task my-periodic-task (tasks.my_periodic_task)
[2023-09-22 10:58:19,882: INFO/MainProcess] Scheduler: Sending due task my-periodic-task (tasks.my_periodic_task)
代码语言:javascript复制
可以在worker 进程看到对应的打印的日志
代码语言:javascript复制
[2023-09-22 11:00:19,884: INFO/MainProcess] Task tasks.my_periodic_task[9c8e8734-b738-4fb4-9dfb-67a97031b007] received
[2023-09-22 11:00:19,886: WARNING/ForkPoolWorker-2] into beat demo: 
[2023-09-22 11:00:19,886: WARNING/ForkPoolWorker-2] 

[2023-09-22 11:00:19,888: WARNING/ForkPoolWorker-2] test1111111
[2023-09-22 11:00:19,889: WARNING/ForkPoolWorker-2] 

[2023-09-22 11:00:19,893: INFO/ForkPoolWorker-2] Task tasks.my_periodic_task[9c8e8734-b738-4fb4-9dfb-67a97031b007] succeeded in 0.006949007999992318s: None

这篇文章我们先介绍下celery的基础入门,在后续的文章我们将继续学习下celery与django的结合使用与具体的案例。

1 人点赞