Python项目50-Celery框架

2022-09-26 13:52:40 浏览数 (1)

  • Celery框架介绍
  • Celery使用场景
  • Celery安装配置以及基础用法
  • Celery添加任务
  • Celery获取任务
  • Celery高级使用
  • Celery实战-更新接口缓存

-曾老湿, 江湖人称曾老大。


-多年互联网运维工作经验,曾负责过大规模集群架构自动化运维管理工作。 -擅长Web集群架构与自动化运维,曾负责国内某大型金融公司运维工作。 -devops项目经理兼DBA。 -开发过一套自动化运维平台(功能如下): 1)整合了各个公有云API,自主创建云主机。 2)ELK自动化收集日志功能。 3)Saltstack自动化运维统一配置管理工具。 4)Git、Jenkins自动化代码上线及自动化测试平台。 5)堡垒机,连接Linux、Windows平台及日志审计。 6)SQL执行及审批流程。 7)慢查询日志分析web界面。


Celery框架介绍


什么是Celery?

代码语言:javascript复制
"""
1、celery框架自带socket,所以自身是一个独立运行的服务
2、启动celery服务,是来执行服务中的任务的,服务中带一个执行任务的对象,会执行准备就绪的任务,将执行任务的结果保存起来
3、celery框架由三部分组成:存放要执行的任务broker,执行任务的对象worker,存放任务结果的backend
4、安装的celery主体模块,默认只提供worker,要结合其他技术提供broker和backend(两个存储的单位)
"""

官方

Celery 官网:http://www.celeryproject.org/

Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/


Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(backend - task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等。

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等。

Celery使用场景


Celery架构

异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

定时任务:定时执行某件事情,比如每天数据统计

Celery安装配置以及基础用法


安装

代码语言:javascript复制
(luffy) bash-3.2$ pip install celery

目录结构

代码语言:javascript复制
project
    ├── celery_task     # celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py
    │   └── tasks.py    # 所有任务函数
    ├── add_task.py     # 添加任务
    └── get_result.py   # 获取结果

创建目录

luffyapi/scripts/celery框架/简单使用/celery_task/celery.py

celery框架是目录 简单使用时目录 celery_task 是包,包中必须创建有个celery.py文件

然后创建任务,任务文件可以随意.

celery_task/task1.py

代码语言:javascript复制
from .celery import app


# 一个任务就是一个函数,任务的执行结果就是函数的返回值
@app.task
def add(n1, n2):
    print('运算数', n1, n2)
    print('运算结果:%s' % (n1   n2))
    return n1   n2

celery_task/task2.py

代码语言:javascript复制
from .celery import app


@app.task
def low(n1, n2):
    print('减法:%s' % (n1 - n2))
    return n1 - n2

celery_task/celery.py

代码语言:javascript复制
from celery import Celery

# 通过Celery功能产生一个Celery应用
broker = 'redis://10.0.0.51:6379/6'     #任务仓库
backend = 'redis://10.0.0.51:6379/8'    #结果仓库
include = ['celery_task.task1', 'celery_task.task2']    #任务,完成需求的函数所在文件
app = Celery(broker=broker, backend=backend, include=include)

Celery启动

代码语言:javascript复制
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet
代码语言:javascript复制
cd /Users/driverzeng/Desktop/luffy/luffyapi/scripts/celery框架/简单使用

(luffy) bash-3.2$ celery worker -A celery_task -l info

Celery添加任务


手动添加立即任务

celery框架/简单使用/添加celery任务的脚本.py

代码语言:javascript复制
from celery_task import task1, task2

## 使用模块中的函数,和celery没有任何关系
res = task1.add(10, 15)
print(res)
res2 = task2.low(10, 15)
print(res2)

上面的操作和celery一点关系都没有,redis一点数据都不会写入

如何与celery建立关系呢?

代码语言:javascript复制
from celery_task import task1, task2

## 使用模块中的函数,和celery没有任何关系
# res = task1.add(10, 15)
# print(res)
# res2 = task2.low(10, 15)
# print(res2)

# 调用celery框架的方法,完成任务的添加
## 手动添加立即任务,调用delay就相当于将add交给celery进行调用,delay的参数与add的保持一致
res = task1.add.delay(100, 150)
print(res)


手动添加延迟任务

celery框架/简单使用/添加celery任务的脚本.py

代码语言:javascript复制
from celery_task import task1, task2

## 使用模块中的函数,和celery没有任何关系
# res = task1.add(10, 15)
# print(res)
# res2 = task2.low(10, 15)
# print(res2)

# 调用celery框架的方法,完成任务的添加
## 手动添加立即任务,调用delay就相当于将add交给celery进行调用,delay的参数与add的保持一致
# res = task1.add.delay(100, 150)
# print(res)


## 手动添加延迟任务
from datetime import datetime, timedelta


def eta_second(second):
    ctime = datetime.now()
    utc_time = datetime.utcfromtimestamp(ctime.timestamp())
    time_delay = timedelta(seconds=second)
    return utc_time   time_delay


## args就是执行add函数所需参数,eta就是延迟执行的时间
res = task1.add.apply_async(args=(200, 50), eta=eta_second(10))
print(res)

Celery获取任务


获取任务脚本

celery框架/简单使用/获取任务结果的脚本.py

代码语言:javascript复制
from celery_task.celery import app

from celery.result import AsyncResult

id = 'e4439b36-d200-4551-8307-899018ebaffb'
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

Celery高级使用


自动添加任务

celery框架/高级使用/celery_task/celery.py

代码语言:javascript复制
from celery import Celery

# 通过Celery功能产生一个Celery应用
broker = 'redis://10.0.0.51:6379/7'
backend = 'redis://10.0.0.51:6379/9'
include = ['celery_task.tasks']
app = Celery(broker=broker, backend=backend, include=include)

## 启动worker:celery worker -A celery_task -l info

## 启动beat:celery  beat -A celery_task -l info
## beat也是一个socket,启动后会根据配置文件,自动添加任务(定时任务)

## app的配置文件
from datetime import timedelta

app.conf.beat_schedule = {
    # 任务名可以随意写
    'jump_task': {
        ## 指定任务源
        'task': 'celery_task.tasks.jump',
        ## 延迟时间:每三秒一次
        'schedule': timedelta(seconds=3),
        ## 任务函数传递的参数
        'args': (300, 150),
    }
}

celery框架/高级使用/celery_task/tasks.py

代码语言:javascript复制
from .celery import app


# 一个任务就是一个函数,任务的执行结果就是函数的返回值
@app.task
def jump(n1, n2):
    print('积:%s' % (n1 * n2))
    return n1 * n2


@app.task
def full(n1, n2):
    print('商:%s' % (n1 // n2))
    return n1 // n2

先启动worker,再启动beat

代码语言:javascript复制
## 进入task的上级目录
cd /Users/driverzeng/Desktop/luffy/luffyapi/scripts/celery框架/高级使用

## 启动worker
(luffy) bash-3.2$ celery worker -A celery_task -l info

## 启动beat
(luffy) bash-3.2$ celery beat -A celery_task -l info

可以看到3秒会执行一次任务


数据库相关需要配置时区

tasks.py

代码语言:javascript复制
from .celery import app


# 一个任务就是一个函数,任务的执行结果就是函数的返回值
@app.task
def jump(n1, n2):
    print('积:%s' % (n1 * n2))
    return n1 * n2


@app.task
def full(n1, n2):
    print('商:%s' % (n1 // n2))
    return n1 // n2

celery.py

代码语言:javascript复制
from celery import Celery

# 通过Celery功能产生一个Celery应用
broker = 'redis://10.0.0.51:6379/7'
backend = 'redis://10.0.0.51:6379/9'
include = ['celery_task.tasks']
app = Celery(broker=broker, backend=backend, include=include)

## 启动worker:celery worker -A celery_task -l info

## 启动beat:celery  beat -A celery_task -l info
## beat也是一个socket,启动后会根据配置文件,自动添加任务(定时任务)

## app的配置文件
# 时区
app.conf.timezone = 'Asia/Shanghai'

# 是否使用UTC
app.conf.enable_utc = False

from datetime import timedelta

app.conf.beat_schedule = {
    # 任务名可以随意写
    'jump_task': {
        ## 指定任务源
        'task': 'celery_task.tasks.jump',
        ## 延迟时间
        'schedule': timedelta(seconds=3),
        ## 任务函数传递的参数
        'args': (300, 150),
    }
}

运维最熟悉的crontab定时任务

celery.py

代码语言:javascript复制
from celery import Celery

# 通过Celery功能产生一个Celery应用
broker = 'redis://10.0.0.51:6379/7'
backend = 'redis://10.0.0.51:6379/9'
include = ['celery_task.tasks']
app = Celery(broker=broker, backend=backend, include=include)

## 启动worker:celery worker -A celery_task -l info

## 启动beat:celery  beat -A celery_task -l info
## beat也是一个socket,启动后会根据配置文件,自动添加任务(定时任务)

## app的配置文件
# 时区
app.conf.timezone = 'Asia/Shanghai'

# 是否使用UTC
app.conf.enable_utc = False

from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
    # 任务名可以随意写
    'jump_task': {
        ## 指定任务源
        'task': 'celery_task.tasks.jump',
        ## 延迟时间: 每3秒一次
        'schedule': timedelta(seconds=3),
        ## 任务函数传递的参数
        'args': (300, 150),
    },
    'full_task': {
        ## 指定任务源
        'task': 'celery_task.tasks.full',
        ## 每周一早上8点
        'schedule': crontab(hour=14,minute=34, day_of_week=0),
        ## 任务函数传递的参数
        'args': (30, 15),
    }
}

tasks.py

代码语言:javascript复制
from .celery import app


# 一个任务就是一个函数,任务的执行结果就是函数的返回值
@app.task
def jump(n1, n2):
    print('积:%s' % (n1 * n2))
    return n1 * n2


@app.task
def full(n1, n2):
    print('商:%s' % (n1 // n2))
    return n1 // n2

Celery实战-更新接口缓存


修改celery_task

首先将celery_task目录移动到项目根目录下

luffyapi/celery_task/tasks.py

代码语言:javascript复制
from .celery import app
from home.models import Banner
from home.serializers import BannerModelSerializer
from django.core.cache import cache


# 一个任务就是一个函数,任务的执行结果就是函数的返回值
@app.task
def update_banner_list():
    # 获取最新内容
    banner_query = Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')
    # 序列化
    banner_data = BannerModelSerializer(banner_query, many=True).data
    for banner in banner_data:
        banner['image'] = 'http://127.0.0.1:8000'   banner['image']
    # 更新缓存
    cache.set('banner_list', banner_data)
    return True

luffyapi/celery_task/celery.py

代码语言:javascript复制
import os,django

os.environ.setdefault("DJANGO_SETTINGS_MODULE",'luffyapi.settings.dev')
django.setup()

from celery import Celery

# 通过Celery功能产生一个Celery应用
broker = 'redis://10.0.0.51:6379/7'
backend = 'redis://10.0.0.51:6379/9'
include = ['celery_task.tasks']
app = Celery(broker=broker, backend=backend, include=include)

## 启动worker:celery worker -A celery_task -l info

## 启动beat:celery  beat -A celery_task -l info
## beat也是一个socket,启动后会根据配置文件,自动添加任务(定时任务)

## app的配置文件
# 时区
app.conf.timezone = 'Asia/Shanghai'

# 是否使用UTC
app.conf.enable_utc = False

from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
    # 任务名可以随意写
    'update_banner_list_task': {
        ## 指定任务源
        'task': 'celery_task.tasks.update_banner_list',
        ## 延迟时间: 每3秒一次
        'schedule': timedelta(seconds=10),
        ## 任务函数传递的参数
        # 'args': (300, 150),
    },
}

0 人点赞