在开发过程中,处理异步任务是一项重要而常见的任务。为了更好地管理和处理这些任务,目前比较强大与实用的有 Celery。Celery 是一个基于 Python 的分布式任务队列,旨在帮助开发者处理异步任务,从而提高应用程序的可伸缩性和性能。
Celery 简介
Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. It’s a task queue with focus on real-time processing, while also supporting task scheduling.
Celery 是一个开源 Python 库,用于异步运行任务。它是一个任务队列,保存任务并以适当的方式将它们分发给工作人员。它主要侧重于实时操作,但也支持调度(运行定期间隔任务)。为我们提供了高效的异步任务处理解决方案。Celery 引入了各种消息代理,例如RabbitMQ和Redis。
Celery 结合了各种 Web 框架,包括 Flask、Pylons、web2py、Tryton 和 Tornado。
Celery的优点
- 异步任务处理:Celery允许将耗时的任务异步执行,避免阻塞主应用程序。这对于需要长时间处理的任务,如发送电子邮件、处理大量数据,特别有用。
- 分布式计算:Celery支持将任务分发到多台计算机或节点上,从而实现分布式计算。这使得可以轻松地将任务分散到多个服务器上,以提高任务处理能力。
- 定时任务调度:Celery支持定时任务的调度,可以在预定的时间点或周期性地执行任务。这对于自动化重复性任务非常有用,如定时数据备份或数据清理。
- 可扩展性:Celery的架构支持水平扩展,可以根据需要增加更多的任务队列和工作进程,以适应不断增长的任务负载。
- 容错性:Celery提供了一些机制来处理失败的任务,例如重试机制和错误处理。它还支持将任务结果存储在持久化存储中,以防止任务结果丢失。
Celery的架构
Celery的架构由多个组件组成,包括任务发布者、任务队列和工作进程。以下是它们的主要角色:
- 任务发布者(Producer):任务发布者负责将需要执行的任务发布到任务队列中。这可以是Web应用、命令行工具或其他应用程序。
- 任务队列(Broker):任务队列是用于存储和传递待执行任务的中间件。Celery支持多种消息中间件,如RabbitMQ、Redis、Amazon SQS等。
- 工作进程(Worker):工作进程从任务队列中获取任务,执行任务,并将执行结果返回。您可以配置多个工作进程来处理任务,从而实现并行处理和高吞吐量。
消息代理
Celery 支持多种消息代理,其中两个常用的选择是 RabbitMQ 和 Redis。选择合适的消息代理取决于你的项目需求。
RabbitMQ
RabbitMQ 是一个高度可靠的消息代理,适用于大规模和复杂的应用程序。你可以使用 RabbitMQ 来实现任务的分发和处理,同时它支持高级的消息队列特性,如消息确认和持久化。
代码语言:javascript复制# 使用 RabbitMQ 作为消息代理
app = Celery('myapp', broker='pyamqp://guest@localhost//')
Redis
Redis 是一个快速的消息代理,适用于小型和中小型应用程序。它的速度和简单性使其成为一个不错的选择。你可以使用 Redis 来加速任务的分发和处理。
代码语言:javascript复制# 使用 Redis 作为消息代理
app = Celery('myapp', broker='redis://localhost:6379/0')
celery的安装与使用
创建python虚拟环境
代码语言:javascript复制python3.9 -m venv py39
进入虚拟环境
代码语言:javascript复制source py39/bin/activate
pip install celery==5.1.2 pip install redis==3.5.3
异步任务示例
写一个task.py 文件,用于异步任务调度的demo
代码语言:javascript复制from celery import Celery
#定义消息代理(broker)的地址
BROKER_URL = 'redis://localhost:6379/0'
#定义结果后端(backend)的地址
BACKEND_URL = 'redis://localhost:6379/1'
#创建一个 Celery 应用实例
app = Celery('tasks', broker=BROKER_URL, backend=BACKEND_URL, )
#使用装饰器定义一个 Celery 异步任务,任务名为 'celery demo run'
@app.task(name='celery demo run')
def add(x, y):
return x y
代码语言:javascript复制写一个run_task.py 文件,用于调度异步任务入口
代码语言:javascript复制from tasks import add
if __name__ == '__main__':
print('task start....')
result = add.delay(2,3)
print('task end....')
print(result)
在不启动 Celery 工作者(worker)的时候直接执行run_task.py,可以看到直接返回了celery异步任务的task id。
代码语言:javascript复制python run_task.py
task start....
task end....
cf191a9d-ef91-46ee-b0c4-5153e853079d
我们启动 Celery 工作者(worker),可以看到下面的输出,celery 从redis拿到任务,并且执行输出
启动 Celery 工作者参数:
代码语言:javascript复制
-A, --app: 指定 Celery 应用模块的名称。这是必要参数,用于加载应用程序的配置。例如:-A tasks 表示加载名为 tasks 的 Celery 应用。
--loglevel: 指定日志级别,控制日志的输出详细程度。常用的级别包括 info、warning、error 等。
--concurrency: 设置工作者的并发数,即同时处理任务的数量。默认值是 CPU 核心数的 2 倍。
--queues: 指定工作者处理的队列。可以使用逗号分隔的队列名列表,例如 queue1,queue2。
--hostname: 设置工作者的主机名,用于识别不同的工作者实例。
--prefetch-multiplier: 设置工作者从队列中预取的任务数量。默认值为 4。
--max-tasks-per-child: 设置工作者在重新启动之前可以处理的最大任务数。用于防止内存泄漏。
--time-limit: 限制单个任务的最大执行时间(秒)。
代码语言:javascript复制celery -A tasks worker -l INFO
-------------- celery@C02G20FCMD6M v5.1.2 (sun-harmonics)
--- ***** -----
-- ******* ---- macOS-11.7.4-x86_64-i386-64bit 2023-08-30 02:35:35
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x10edd0550
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost:6379/1
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery demo run
[2023-08-30 02:35:35,451: INFO/MainProcess] Connected to redis://localhost:6379/0
[2023-08-30 02:35:35,455: INFO/MainProcess] mingle: searching for neighbors
[2023-08-30 02:35:36,469: INFO/MainProcess] mingle: all alone
[2023-08-30 02:35:36,484: INFO/MainProcess] celery@C02G20FCMD6M ready.
[2023-08-30 02:35:36,488: INFO/MainProcess] Task celery demo run[cf191a9d-ef91-46ee-b0c4-5153e853079d] received
[2023-08-30 02:35:36,510: INFO/ForkPoolWorker-1] Task celery demo run[39c2e810-4337-4133-b5ff-ea64dfc0f49f] succeeded in 0.016289540000000047s: 5
代码语言:javascript复制 再执行一次run_task 脚本,可以看到work 正常调度了
代码语言:javascript复制python run_task.py
task start....
task end....
f6137e38-35a2-4df7-be8a-cccd4035c293
[2023-08-30 02:37:33,978: INFO/MainProcess] Task celery demo run[f6137e38-35a2-4df7-be8a-cccd4035c293] received
[2023-08-30 02:37:33,980: INFO/ForkPoolWorker-8] Task celery demo run[f6137e38-35a2-4df7-be8a-cccd4035c293] succeeded in 0.00101882100000239s: 5
定时任务示例
在Celery中,定时任务通常被称为"periodic tasks",它允许你在指定的时间间隔内自动执行任务。
代码语言:javascript复制from celery import Celery
#定义消息代理(broker)的地址
BROKER_URL = 'redis://localhost:6379/0'
#定义结果后端(backend)的地址
BACKEND_URL = 'redis://localhost:6379/1'
#创建一个 Celery 应用实例
app = Celery('tasks', broker=BROKER_URL, backend=BACKEND_URL, )
# 在 Celery 应用配置中设置消息序列化方式
app.conf.update(
task_serializer='json', # 任务消息序列化方式
result_serializer='json', # 任务结果序列化方式
)
# 在 Celery 应用配置中设置并发参数
app.conf.update(
worker_concurrency=4, # 同时执行的工作进程数量
task_max_retries=3, # 单个任务的最大执行次数(重试次数)
)
app.conf.beat_schedule = {
'my-periodic-task': {
'task': 'tasks.my_periodic_task',
'schedule': 10.0, # 每10秒执行一次
},
}
@app.task
def my_periodic_task():
print("test1111111")
启动beat 进程
代码语言:javascript复制celery -A tasks beat --loglevel=info
可以看到beat调度的任务日志
代码语言:javascript复制celery beat v5.1.2 (sun-harmonics) is starting.
__ - ... __ - _
LocalTime -> 2023-09-22 10:58:09
Configuration ->
. broker -> redis://localhost:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> 5.00 minutes (300s)
[2023-09-22 10:58:09,863: INFO/MainProcess] beat: Starting...
[2023-09-22 10:58:09,883: INFO/MainProcess] Scheduler: Sending due task my-periodic-task (tasks.my_periodic_task)
[2023-09-22 10:58:19,882: INFO/MainProcess] Scheduler: Sending due task my-periodic-task (tasks.my_periodic_task)
代码语言:javascript复制可以在worker 进程看到对应的打印的日志
代码语言:javascript复制[2023-09-22 11:00:19,884: INFO/MainProcess] Task tasks.my_periodic_task[9c8e8734-b738-4fb4-9dfb-67a97031b007] received
[2023-09-22 11:00:19,886: WARNING/ForkPoolWorker-2] into beat demo:
[2023-09-22 11:00:19,886: WARNING/ForkPoolWorker-2]
[2023-09-22 11:00:19,888: WARNING/ForkPoolWorker-2] test1111111
[2023-09-22 11:00:19,889: WARNING/ForkPoolWorker-2]
[2023-09-22 11:00:19,893: INFO/ForkPoolWorker-2] Task tasks.my_periodic_task[9c8e8734-b738-4fb4-9dfb-67a97031b007] succeeded in 0.006949007999992318s: None
这篇文章我们先介绍下celery的基础入门,在后续的文章我们将继续学习下celery与django的结合使用与具体的案例。