前言
如果应用有一个长时间运行的任务,如处理上传数据或者发送电子邮件,而你不想在 请求中等待任务结束,那么可以使用任务队列发送必须的数据给另一个进程。 这样就 可以在后台运行任务,立即返回请求。
Celery 环境
Celery 是一个独立的 Python 包。flask 结合 celery 使用不需要安装额外的包,使用 pip 安装:
代码语言:javascript复制> pip install celery
Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。它是一个任务队列,专注于实时处理,同时还支持任务调度。 可以使用的场景如:
- 异步发邮件,这个时候 只需要提交任务给celery 就可以了.之后 由worker 进行发邮件的操作 .
- 跑批接口的任务,需要耗时比较长,这个时候 也可以做成异步任务 .
- 定时调度任务等
Celery 简介
Celery 扮演生产者和消费者的角色,先了解一下什么是生产者消费者模式。 该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据,如下图所示:
接下来需要弄清楚几个问题,谁生产数据(Task),谁是中间件(Broker),谁来消费数据(Worker),消费完之后运行结果(backend)在哪?
看下图就很清楚了
celery 的5个角色
- Task 就是任务,有异步任务(Async Task)和定时任务(Celery Beat)
- Broker 中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。Celery 本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。
- Worker 执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
- Beat 定时任务调度器,根据配置定时将任务发送给Broker。
- Backend 用于存储任务的执行结果。
celery 详细文档参考前面这篇https://www.cnblogs.com/yoyoketang/p/15423517.html
Redis 安装
Celery 本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。那么需要先安装Redis之类的中间件
代码语言:javascript复制docker pull redis:latest
docker run -itd --name redis-test -p 6379:6379 redis
上面是没有设置密码的,设置密码用下面这句
代码语言:javascript复制docker run -itd --name myredis -p 6379:6379 redis --requirepass "123456" --restart=always --appendonly yes
pip 安装相关依赖包
代码语言:javascript复制pip install celery==5.2.7
pip install redis==2.10.6
pip install eventlet==0.33.1
flask celery 基本配置
你首先需要有一个 Celery 实例,这个实例称为 celery 应用。其地位就相当于 Flask 中 Flask 一样。这个实例被用作所有 Celery 相关事务的 入口,如创建任务和管理工人,因此它必须可以被其他模块导入。
例如,你可以把它放在一个 tasks 模块中。这样不需要重新配置,你就可以使用 tasks 的子类,增加 Flask 应用情境的支持,并钩接 Flask 的配置。
只要如下这样就可以在 Falsk 中使用 Celery 了:
代码语言:javascript复制from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
这个函数创建了一个新的 Celery 对象,使用了应用配置中的 broker ,并从 Flask 配置中更新了 Celery 的其余配置。然后创建了一个任务子类,在一个应用情境中包 装了任务执行。
一个示例任务
让我们来写一个任务,该任务把两个数字相加并返回结果。我们配置 Celery 的 broker ,后端使用 Redis 。使用上文的工厂创建一个 celery 应用,并用它定 义任务。
代码语言:javascript复制from flask import Flask
flask_app = Flask(__name__)
flask_app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(flask_app)
@celery.task()
def add_together(a, b):
return a b
这个任务现在可以在后台调用了:
代码语言:javascript复制result = add_together.delay(23, 42)
result.wait() # 65
运行 Celery worker
至此,如果你已经按上文一步一步执行,你会失望地发现你的 .wait() 不会真正 返回。这是因为还需要运行一个 Celery worker来接收和执行任务。:
代码语言:javascript复制$ celery -A your_application.celery worker
把 your_application 字符串替换为你创建 celery 对像的应用包或模块。 现在worker 已经在运行中,一旦任务结束, wait 就会返回结果。
完整示例
代码语言:javascript复制from flask import Flask
from celery import Celery
# 基本配置
broker_url = 'redis://localhost:6379'
result_backend = 'redis://localhost:6379'
app = Flask(__name__)
celery_app = Celery(app.import_name,
broker=broker_url,
backend=result_backend)
@celery_app.task(name='demo/add')
def add(x, y):
return x y
@app.route('/add')
def index():
results = add.delay(10, 20)
print('---------10 20 ------------')
return {"msg": "success", "result": results.wait()}
if __name__ == '__main__':
app.run(debug=True)
先启动flask服务
代码语言:javascript复制flask run
启动celery worker服务
代码语言:javascript复制>celery -A app.celery_app worker -l info
需注意的是,celery 5.x的版本在windows上运行,还需要安装一个eventlet
代码语言:javascript复制pip install eventlet
最后这样启动celery worker 服务
代码语言:javascript复制celery -A app.celery_app worker -P eventlet -l info
启动后看到的日志
代码语言:javascript复制>celery -A app.celery_app worker -P eventlet -l info
-- ******* ---- Windows-10-10.0.17134-SP0 2022-09-08 10:36:58
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: app:0x259edced0d0
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. demo/add
看到[tesks]
有任务名称emo/add
说明添加成功
访问接口就可以看到运行结果
2022年第 12期《python接口web自动化 测试开发》课程,9月17号开学!
本期上课时间:2022年9月17号 - 2022年12月17号,周六周日上午9:00-11:00