Celery
官网
Celery 官网:http://www.celeryproject.org/
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度
代码语言:javascript复制# 官网解释
"""
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
"""
Celery异步任务框架
代码语言:javascript复制"""
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
人是一个独立运行的服务 | 医院也是一个独立运行的服务
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
"""
Celery架构
Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。
消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
使用场景
异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
延迟执行:解决延迟任务
定时执行:解决周期(周期)任务,比如每天数据统计
Celery的安装配置
安装:pip install celery
消息中间件:RabbitMQ/Redis
app=Celery(‘任务名’, broker=’xxx’, backend=’xxx’)
注意如果是windows平台还需要安装:pip install eventlet
两种celery任务结构:提倡用包管理,结构更清晰
方式一:简单使用
代码语言:javascript复制# 第一步:定义一个py文件(名字随意,celery_task)
"""celery_task.py"""
from celery import Celery
backend = 'redis://127.0.0.1:6379/1' # 结果存储
broker = 'redis://127.0.0.1:6379/2' # 消息中间件
app = Celery(__name__,broker=broker,backend=backend) # __name__区分__main__
# 被它修饰,就变成了celery的任务
@app.task
def add(a,b):
return a b
# 第二步:提交任务(新建一个py文件:submit_task)
"""submit_task.py"""
from celery_task import add
# 异步调用
# 只是把任务提交到了redis中,但是没有执行,返回一个唯一标识,后期使用唯一标识去看任务执行结果
res=add.delay(33,41)
print(res) # 2ddb35df-25f2-4f7c-8405-0bd7b1fa5645
# 第三步:任务执行单元执行,使用命令启动worker
格式:celery -A 文件名 worker -l 日志输出级别 (win平台 -P eventlet)
celery -A celery_task worker -l info -P eventlet
'''
celery_task:py文件的名字
-l info:日志输出级别是info
-P eventlet 在win平台需要下载,pip install eventlet
'''
#如果队列里有任务,就会执行,如果没有任务,worker就等在这
# 第四步:查询结果是否执行完成 get_result.py
"""get_result.py"""
from celery_task import app
from celery.result import AsyncResult
id = '2ddb35df-25f2-4f7c-8405-0bd7b1fa5645'
if __name__ == '__main__':
asy = AsyncResult(id=id, app=app)
if asy.successful():
result = asy.get()
print(result)
elif asy.failed():
print('任务失败')
elif asy.status == 'PENDING':
print('任务等待中被执行')
elif asy.status == 'RETRY':
print('任务异常后正在重试')
elif asy.status == 'STARTED':
print('任务已经开始被执行')
方法二:包管理结构(推荐)
随便定义包名,但是包内必须要有celery.py
1 包结构
celery_task
__init__.py
celery.py
course_task.py
home_task.py
user_task.py
步骤:
- 创建包,包下写celery.py文件,文件内写celery任务
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/2'
app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.add_task'])
# include内写app管理的任务
- 任意位置提交任务
from celery_task.add_task import add
from celery_task.celery import app
res=add.delay(100,200)
print(res)
# 提交任务delay在任意位置提交就可以,只需将celery任务导过来即可
- 启动worker :包管理只需去包所在的根路径启动就可以了,不需要切换路径到包内去启动worker,因为包下有celery.py了
scripts> celery -A celery_task worker -l info -P eventlet
- 查看结果
from celery_task import app
from celery.result import AsyncResult
id = '2ddb35df-25f2-4f7c-8405-0bd7b1fa5645'
if __name__ == '__main__':
asy = AsyncResult(id=id, app=app)
if asy.successful():
result = asy.get()
print(result)
elif asy.failed():
print('任务失败')
elif asy.status == 'PENDING':
print('任务等待中被执行')
elif asy.status == 'RETRY':
print('任务异常后正在重试')
elif asy.status == 'STARTED':
print('任务已经开始被执行')
Celery主要处理三种任务
异步任务,延迟任务,定时任务
delay提交异步任务
上面的示例就是
apply_async提交延迟任务
代码语言:javascript复制# 其他不变,提交任务的时候,如下:
from celery_task.user_task import add
from datetime import datetime, timedelta
eta = datetime.utcnow() timedelta(seconds=10)
# 参数传递需要使用args,传时间要使用时间对象eta,使用的是utc时间
mul.apply_async(args=(20, 50), eta=eta)
beat_schedule提交定时任务
定时任务需要启动beat
和worker
- beat负责提交定时任务
- worker负责提交celery任务
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/2'
app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.add_task'])
# include内写app管理的任务
# 时区
app.conf.timezone='Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc=False
#第一步:在celery.py中配置
# celery任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'''
任务名:{
task:任务
schedule:时间
args:参数(函数参数)
}
'''
'task-mul': {
'task': 'celery_task.user_task.mul',
'schedule': timedelta(seconds=3), # 3s后
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': (3, 15),
},
'task-add': {
'task': 'celery_task.home_task.add',
'schedule': timedelta(seconds=10), # 10s后
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': (3, 5),
},
}
#第二步:启动beat(beat负责定时提交任务)
celery -A celery_task beat -l info
# 第三步:启动worker,任务就会被worker执行了
celery -A celery_task worker -l info -P eventlet