Python Celery 库详解

2024-02-06 14:45:42 浏览数 (1)

Celery 是一个基于分布式消息传递的任务队列,用于异步处理任务。它可以与各种消息代理(如RabbitMQ、Redis等)配合使用,支持任务调度、消息传递等功能。本教程将介绍如何使用 Celery 库来创建和管理异步任务。

安装 Celery

首先,我们需要安装 Celery。你可以通过 pip 来安装 Celery:

代码语言:javascript复制
bashCopy codepip install celery

创建 Celery 应用

在使用 Celery 之前,我们需要创建一个 Celery 应用。创建一个名为 celery_app.py 的文件,并在其中定义 Celery 应用:

代码语言:javascript复制
pythonCopy codefrom celery import Celery

# 创建 Celery 应用实例
app = Celery('tasks', broker='redis://localhost:6379/0')

# 定义任务
@app.task
def add(x, y):
    return x   y

在这个示例中,我们使用 Redis 作为消息代理,你也可以选择其他的消息代理,如 RabbitMQ。

定义任务

在 Celery 应用中,任务是通过装饰器 @app.task 来定义的。下面是一个简单的任务示例:

代码语言:javascript复制
pythonCopy code@app.task
def add(x, y):
    return x   y

这个任务接受两个参数 xy,并返回它们的和。

启动 Worker

要执行任务,我们需要启动 Celery worker。在命令行中执行以下命令:

代码语言:javascript复制
bashCopy codecelery -A celery_app worker --loglevel=info

这将启动一个 Celery worker 来处理任务。

调用任务

在其他地方调用 Celery 任务非常简单。只需导入任务函数并调用它即可:

代码语言:javascript复制
pythonCopy codefrom celery_app import add

result = add.delay(4, 5)
print(result.get())

在这个示例中,我们调用了之前定义的 add 任务,并传递了参数 4 和 5。delay() 方法用于将任务放入队列中,然后我们使用 get() 方法来获取任务的结果。

异步执行任务

Celery 的主要优势之一是它可以异步执行任务。这意味着任务可以在后台执行,而不会阻塞主程序的执行。下面是一个异步执行任务的示例:

代码语言:javascript复制
pythonCopy codefrom celery_app import add

result = add.delay(4, 5)

# 执行其他操作

print("其他操作执行中...")

# 等待任务完成并获取结果
print(result.get())

在这个示例中,任务被放入队列后,程序可以继续执行其他操作,而不必等待任务完成。

监控任务状态

有时候,我们需要监控任务的状态,以便知道任务是成功完成、失败还是正在执行中。Celery 提供了状态监控的功能。下面是一个示例:

代码语言:javascript复制
pythonCopy codefrom celery_app import add

result = add.delay(4, 5)

# 监控任务状态
while not result.ready():
    print("任务正在执行中...")
    # 可选:获取任务状态
    print("任务状态:", result.status)

print("任务完成")
print("任务结果:", result.get())

在这个示例中,我们使用 result.ready() 方法来检查任务是否完成。如果任务完成,我们可以使用 result.get() 方法来获取任务的结果。

错误处理

当任务执行出错时,我们可以捕获异常并处理。下面是一个示例:

代码语言:javascript复制
pythonCopy codefrom celery.exceptions import SoftTimeLimitExceeded

try:
    result = add.delay(4, "invalid")
    print(result.get())
except SoftTimeLimitExceeded as e:
    print("任务执行超时:", e)
except Exception as e:
    print("任务执行出错:", e)

在这个示例中,我们捕获了 SoftTimeLimitExceeded 异常和其他异常,并打印出错误消息。

结束 Worker

当不再需要 Celery worker 时,我们可以通过发送中断信号来结束它。在命令行中按下 Ctrl C 即可结束 Celery worker。

监控任务状态

有时候,我们需要监控任务的状态,以便知道任务是成功完成、失败还是正在执行中。Celery 提供了状态监控的功能。下面是一个示例:

代码语言:javascript复制
pythonCopy codefrom celery_app import add

result = add.delay(4, 5)

# 监控任务状态
while not result.ready():
    print("任务正在执行中...")
    # 可选:获取任务状态
    print("任务状态:", result.status)

print("任务完成")
print("任务结果:", result.get())

在这个示例中,我们使用 result.ready() 方法来检查任务是否完成。如果任务完成,我们可以使用 result.get() 方法来获取任务的结果。

错误处理

当任务执行出错时,我们可以捕获异常并处理。下面是一个示例:

代码语言:javascript复制
pythonCopy codefrom celery.exceptions import SoftTimeLimitExceeded

try:
    result = add.delay(4, "invalid")
    print(result.get())
except SoftTimeLimitExceeded as e:
    print("任务执行超时:", e)
except Exception as e:
    print("任务执行出错:", e)

在这个示例中,我们捕获了 SoftTimeLimitExceeded 异常和其他异常,并打印出错误消息。

结束 Worker

当不再需要 Celery worker 时,我们可以通过发送中断信号来结束它。在命令行中按下 Ctrl C 即可结束 Celery worker。

任务结果处理

Celery 支持异步执行任务,并在任务执行完成后返回结果。你可以对任务结果进行处理,比如存储到数据库、发送通知等。下面是一个示例:

代码语言:javascript复制
pythonCopy codefrom celery_app import add

result = add.delay(4, 5)

# 可选:等待任务完成
result.wait()

# 处理任务结果
if result.successful():
    print("任务成功完成")
    print("任务结果:", result.result)
    # 在这里可以将结果存储到数据库或发送通知等
else:
    print("任务执行失败")
    print("任务异常:", result.result)

在这个示例中,我们使用 result.successful() 方法来检查任务是否成功完成,并根据结果进行相应的处理。

设置任务超时

有时候,任务可能会因为某些原因长时间执行而导致超时。为了避免任务执行时间过长,可以设置任务的超时时间。下面是一个示例:

代码语言:javascript复制
pythonCopy codefrom celery.exceptions import SoftTimeLimitExceeded

try:
    result = add.apply_async((4, 5), soft_time_limit=10)
    print(result.get())
except SoftTimeLimitExceeded as e:
    print("任务执行超时:", e)
except Exception as e:
    print("任务执行出错:", e)

在这个示例中,我们使用 apply_async() 方法来启动任务,并通过 soft_time_limit 参数设置任务的软超时时间为 10 秒。如果任务在指定的时间内未完成,则会抛出 SoftTimeLimitExceeded 异常。

高级特性

除了上述介绍的基本功能外,Celery 还提供了许多高级特性,如定时任务、任务重试、任务链、分布式任务等。你可以根据实际需求选择使用这些特性。以下是一些高级特性的简单介绍:

  • 定时任务:Celery 支持定时执行任务,可以使用 @app.task 装饰器的 eta 参数或 apply_async() 方法的 eta 参数来设置任务的执行时间。
  • 任务重试:Celery 允许你在任务执行失败时自动重试任务。你可以使用 @app.task 装饰器的 retry 参数来配置任务的重试策略。
  • 任务链:Celery 允许你将多个任务组合成一个任务链,其中一个任务的输出作为下一个任务的输入。你可以使用 chaingroup 组合任务。
  • 分布式任务:Celery 支持分布式任务,可以将任务分发到多台计算机上执行,从而提高任务执行的效率和并发性。

0 人点赞