python使用celery

2023-03-06 15:14:56 浏览数 (1)

简介

celery是使用python语言开发的一款任务管理器,可以接受高并发,配置简单,可以使用文件或数据库作为数据源

文件结构

文件结构文件结构

配置文件详解 celeryconfig.py

代码语言:javascript复制
from datetime import  timedelta
from celery.schedules import crontab

BROKER_URL = 'redis://:xxx@127.0.0.1:6379/'
# BROKER_URL = 'redis://:xxx@127.0.0.1:32268/1.txt'
# BROKER_URL = 'redis://redis:6379/1'
CELERY_RESULT_BACKEND = 'redis://:xxx@localhost:6379/2'
# CELERY_RESULT_BACKEND = 'redis://:xx@xx:32268/2' #celety 6.0.0版本以上使用 

# CELERY_RESULT_BACKEND = 'redis://redis:6379/2'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'           # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24   # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显
CELERY_ACCEPT_CONTENT = ["json"]            # 指定任务接受的内容类型

#是否丢弃运行结果(丢弃结果会提升效率)
CELERY_IGNORE_RESULT = True

#指定时区,默认是UTC时间,由于celery对时区支持不是很好,所以我选择不指定
CELERY_TIMEZONE = "Asia/Shanghai" #默认是UTC时间

#导入指定的任务模块
CELERY_IMPORTS = (
    'polling.taskOne',
)

#定时任务

CELERYBEAT_SCHEDULE = {
    # 'task1':{
    #     'task':'polling.taskOne.update', #任务名
    #     #'schedule': timedelta(seconds=30), #设置每10s执行一次
    #     'schedule': crontab(minute=14, hour=12, day_of_week=[1.txt,2,3,4,5]), #设置每10s执行一次
    #     # 'args':(10,100)
    # },
    # 'task2': {
    #     'task': 'polling.taskOne.get_monitor',  # 任务名
    #     'schedule': timedelta(seconds=100), #设置每10s执行一次
    #     #'schedule': crontab(minute=50, hour=8, day_of_week=[1.txt, 2, 3, 4, 5]),  # 设置每10s执行一次
    #     # 'args':(10,100)
    # },

    'task6': {
        'task': 'polling.taskOne.refresh_data',  # 任务名
        'schedule': timedelta(seconds=60), #设置每10s执行一次
        #'schedule': crontab(minute=50, hour=8, day_of_week=[1.txt, 2, 3, 4, 5]),  # 设置每10s执行一次
        # 'args':(10,100)
    },
    # 'task3': {
    #     'task': 'polling.taskOne.refresh_kube',  # 任务名
    #     # 'schedule': timedelta(seconds=100),  # 设置每10s执行一次
    #     'schedule': crontab(minute=14, hour=12, day_of_week=[1.txt, 2, 3, 4, 5]),  # 设置每10s执行一次
    #     # 'args':(10,100)
    # },


    # 'task3': {
    #         'task': 'polling.taskOne.get_extime_project',  # 任务名
    #         'schedule': timedelta(seconds=10), #设置每10s执行一次
    #         #'schedule': crontab(minute=50, hour=8, day_of_week=[1.txt, 2, 3, 4, 5]),  # 设置每10s执行一次
    #         # 'args':(10,100)
    #     }
}

编写任务模块 taskOne.py

代码语言:javascript复制
@app.task() #使用app.task()
def getpod():
    url = "http://192.168.1.120:4000/prod-api/kube/"
    result = requests.get(url=url)
    send_pod_msg()
    return result.text

编写初始化文件 __init__.py

代码语言:javascript复制
from  celery import  Celery

app = Celery("polling") #需要运行的任务模块

#通过celery实例加载配置模块
app.config_from_object('polling.celeryconfig') #任务的配置文件

运行命令

代码语言:javascript复制
"""
win运行:
celery -A polling.taskOne beat -l info
celery -A polling.taskOne worker -l info -P eventlet

linux运行:
python3 -m celery -A polling.taskOne beat -l info
python3 -m  celery -A polling.taskOne worker -l info 
"""

注意

celery 报错合集:

ModuleNotFoundError: No module named 'polling' #如果是基于django做任务 没有在任务模块同级会报错,解决办法进入模块同级目录运行

__init__ username 出现这个错误是没有安装kombu或版本不正确

pip3 install kombu==5.1.0 解决办法

如何django集成celery

需要安装

pip3 install django-celery-beat

python3 manage.py makemigrations

python3 manage.py migrate

数据库中会出现对应的django-celery-beat表

0 人点赞