- Celery框架介绍
- Celery使用场景
- Celery安装配置以及基础用法
- Celery添加任务
- Celery获取任务
- Celery高级使用
- Celery实战-更新接口缓存
-曾老湿, 江湖人称曾老大。
-多年互联网运维工作经验,曾负责过大规模集群架构自动化运维管理工作。 -擅长Web集群架构与自动化运维,曾负责国内某大型金融公司运维工作。 -devops项目经理兼DBA。 -开发过一套自动化运维平台(功能如下): 1)整合了各个公有云API,自主创建云主机。 2)ELK自动化收集日志功能。 3)Saltstack自动化运维统一配置管理工具。 4)Git、Jenkins自动化代码上线及自动化测试平台。 5)堡垒机,连接Linux、Windows平台及日志审计。 6)SQL执行及审批流程。 7)慢查询日志分析web界面。
Celery框架介绍
什么是Celery? |
---|
"""
1、celery框架自带socket,所以自身是一个独立运行的服务
2、启动celery服务,是来执行服务中的任务的,服务中带一个执行任务的对象,会执行准备就绪的任务,将执行任务的结果保存起来
3、celery框架由三部分组成:存放要执行的任务broker,执行任务的对象worker,存放任务结果的backend
4、安装的celery主体模块,默认只提供worker,要结合其他技术提供broker和backend(两个存储的单位)
"""
官方
Celery 官网:http://www.celeryproject.org/
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/
Celery架构 |
---|
Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(backend - task result store)组成。
消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等。
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等。

Celery使用场景
Celery架构 |
---|
异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
定时任务:定时执行某件事情,比如每天数据统计
Celery安装配置以及基础用法
安装 |
---|
(luffy) bash-3.2$ pip install celery
目录结构 |
---|
project
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须交celery.py
│ └── tasks.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果
创建目录 |
---|
luffyapi/scripts/celery框架/简单使用/celery_task/celery.py
celery框架是目录 简单使用时目录 celery_task 是包,包中必须创建有个celery.py文件
然后创建任务,任务文件可以随意.
celery_task/task1.py
代码语言:javascript复制from .celery import app
# 一个任务就是一个函数,任务的执行结果就是函数的返回值
@app.task
def add(n1, n2):
print('运算数', n1, n2)
print('运算结果:%s' % (n1 n2))
return n1 n2
celery_task/task2.py
代码语言:javascript复制from .celery import app
@app.task
def low(n1, n2):
print('减法:%s' % (n1 - n2))
return n1 - n2
celery_task/celery.py
代码语言:javascript复制from celery import Celery
# 通过Celery功能产生一个Celery应用
broker = 'redis://10.0.0.51:6379/6' #任务仓库
backend = 'redis://10.0.0.51:6379/8' #结果仓库
include = ['celery_task.task1', 'celery_task.task2'] #任务,完成需求的函数所在文件
app = Celery(broker=broker, backend=backend, include=include)
Celery启动 |
---|
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet
代码语言:javascript复制cd /Users/driverzeng/Desktop/luffy/luffyapi/scripts/celery框架/简单使用
(luffy) bash-3.2$ celery worker -A celery_task -l info

Celery添加任务
手动添加立即任务 |
---|
celery框架/简单使用/添加celery任务的脚本.py
代码语言:javascript复制from celery_task import task1, task2
## 使用模块中的函数,和celery没有任何关系
res = task1.add(10, 15)
print(res)
res2 = task2.low(10, 15)
print(res2)

上面的操作和celery一点关系都没有,redis一点数据都不会写入
如何与celery建立关系呢?
代码语言:javascript复制from celery_task import task1, task2
## 使用模块中的函数,和celery没有任何关系
# res = task1.add(10, 15)
# print(res)
# res2 = task2.low(10, 15)
# print(res2)
# 调用celery框架的方法,完成任务的添加
## 手动添加立即任务,调用delay就相当于将add交给celery进行调用,delay的参数与add的保持一致
res = task1.add.delay(100, 150)
print(res)


手动添加延迟任务 |
---|
celery框架/简单使用/添加celery任务的脚本.py
代码语言:javascript复制from celery_task import task1, task2
## 使用模块中的函数,和celery没有任何关系
# res = task1.add(10, 15)
# print(res)
# res2 = task2.low(10, 15)
# print(res2)
# 调用celery框架的方法,完成任务的添加
## 手动添加立即任务,调用delay就相当于将add交给celery进行调用,delay的参数与add的保持一致
# res = task1.add.delay(100, 150)
# print(res)
## 手动添加延迟任务
from datetime import datetime, timedelta
def eta_second(second):
ctime = datetime.now()
utc_time = datetime.utcfromtimestamp(ctime.timestamp())
time_delay = timedelta(seconds=second)
return utc_time time_delay
## args就是执行add函数所需参数,eta就是延迟执行的时间
res = task1.add.apply_async(args=(200, 50), eta=eta_second(10))
print(res)
Celery获取任务
获取任务脚本 |
---|
celery框架/简单使用/获取任务结果的脚本.py
代码语言:javascript复制from celery_task.celery import app
from celery.result import AsyncResult
id = 'e4439b36-d200-4551-8307-899018ebaffb'
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('任务失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')
Celery高级使用
自动添加任务 |
---|
celery框架/高级使用/celery_task/celery.py
代码语言:javascript复制from celery import Celery
# 通过Celery功能产生一个Celery应用
broker = 'redis://10.0.0.51:6379/7'
backend = 'redis://10.0.0.51:6379/9'
include = ['celery_task.tasks']
app = Celery(broker=broker, backend=backend, include=include)
## 启动worker:celery worker -A celery_task -l info
## 启动beat:celery beat -A celery_task -l info
## beat也是一个socket,启动后会根据配置文件,自动添加任务(定时任务)
## app的配置文件
from datetime import timedelta
app.conf.beat_schedule = {
# 任务名可以随意写
'jump_task': {
## 指定任务源
'task': 'celery_task.tasks.jump',
## 延迟时间:每三秒一次
'schedule': timedelta(seconds=3),
## 任务函数传递的参数
'args': (300, 150),
}
}
celery框架/高级使用/celery_task/tasks.py
代码语言:javascript复制from .celery import app
# 一个任务就是一个函数,任务的执行结果就是函数的返回值
@app.task
def jump(n1, n2):
print('积:%s' % (n1 * n2))
return n1 * n2
@app.task
def full(n1, n2):
print('商:%s' % (n1 // n2))
return n1 // n2
先启动worker,再启动beat
代码语言:javascript复制## 进入task的上级目录
cd /Users/driverzeng/Desktop/luffy/luffyapi/scripts/celery框架/高级使用
## 启动worker
(luffy) bash-3.2$ celery worker -A celery_task -l info
## 启动beat
(luffy) bash-3.2$ celery beat -A celery_task -l info


可以看到3秒会执行一次任务



数据库相关需要配置时区 |
---|
tasks.py
代码语言:javascript复制from .celery import app
# 一个任务就是一个函数,任务的执行结果就是函数的返回值
@app.task
def jump(n1, n2):
print('积:%s' % (n1 * n2))
return n1 * n2
@app.task
def full(n1, n2):
print('商:%s' % (n1 // n2))
return n1 // n2
celery.py
代码语言:javascript复制from celery import Celery
# 通过Celery功能产生一个Celery应用
broker = 'redis://10.0.0.51:6379/7'
backend = 'redis://10.0.0.51:6379/9'
include = ['celery_task.tasks']
app = Celery(broker=broker, backend=backend, include=include)
## 启动worker:celery worker -A celery_task -l info
## 启动beat:celery beat -A celery_task -l info
## beat也是一个socket,启动后会根据配置文件,自动添加任务(定时任务)
## app的配置文件
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
from datetime import timedelta
app.conf.beat_schedule = {
# 任务名可以随意写
'jump_task': {
## 指定任务源
'task': 'celery_task.tasks.jump',
## 延迟时间
'schedule': timedelta(seconds=3),
## 任务函数传递的参数
'args': (300, 150),
}
}
运维最熟悉的crontab定时任务 |
---|
celery.py
代码语言:javascript复制from celery import Celery
# 通过Celery功能产生一个Celery应用
broker = 'redis://10.0.0.51:6379/7'
backend = 'redis://10.0.0.51:6379/9'
include = ['celery_task.tasks']
app = Celery(broker=broker, backend=backend, include=include)
## 启动worker:celery worker -A celery_task -l info
## 启动beat:celery beat -A celery_task -l info
## beat也是一个socket,启动后会根据配置文件,自动添加任务(定时任务)
## app的配置文件
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
# 任务名可以随意写
'jump_task': {
## 指定任务源
'task': 'celery_task.tasks.jump',
## 延迟时间: 每3秒一次
'schedule': timedelta(seconds=3),
## 任务函数传递的参数
'args': (300, 150),
},
'full_task': {
## 指定任务源
'task': 'celery_task.tasks.full',
## 每周一早上8点
'schedule': crontab(hour=14,minute=34, day_of_week=0),
## 任务函数传递的参数
'args': (30, 15),
}
}
tasks.py
代码语言:javascript复制from .celery import app
# 一个任务就是一个函数,任务的执行结果就是函数的返回值
@app.task
def jump(n1, n2):
print('积:%s' % (n1 * n2))
return n1 * n2
@app.task
def full(n1, n2):
print('商:%s' % (n1 // n2))
return n1 // n2
Celery实战-更新接口缓存
修改celery_task |
---|
首先将celery_task目录移动到项目根目录下
luffyapi/celery_task/tasks.py
代码语言:javascript复制from .celery import app
from home.models import Banner
from home.serializers import BannerModelSerializer
from django.core.cache import cache
# 一个任务就是一个函数,任务的执行结果就是函数的返回值
@app.task
def update_banner_list():
# 获取最新内容
banner_query = Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')
# 序列化
banner_data = BannerModelSerializer(banner_query, many=True).data
for banner in banner_data:
banner['image'] = 'http://127.0.0.1:8000' banner['image']
# 更新缓存
cache.set('banner_list', banner_data)
return True
luffyapi/celery_task/celery.py
代码语言:javascript复制import os,django
os.environ.setdefault("DJANGO_SETTINGS_MODULE",'luffyapi.settings.dev')
django.setup()
from celery import Celery
# 通过Celery功能产生一个Celery应用
broker = 'redis://10.0.0.51:6379/7'
backend = 'redis://10.0.0.51:6379/9'
include = ['celery_task.tasks']
app = Celery(broker=broker, backend=backend, include=include)
## 启动worker:celery worker -A celery_task -l info
## 启动beat:celery beat -A celery_task -l info
## beat也是一个socket,启动后会根据配置文件,自动添加任务(定时任务)
## app的配置文件
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
# 任务名可以随意写
'update_banner_list_task': {
## 指定任务源
'task': 'celery_task.tasks.update_banner_list',
## 延迟时间: 每3秒一次
'schedule': timedelta(seconds=10),
## 任务函数传递的参数
# 'args': (300, 150),
},
}