简介
celery是使用python语言开发的一款任务管理器,可以接受高并发,配置简单,可以使用文件或数据库作为数据源
文件结构
配置文件详解 celeryconfig.py
代码语言:javascript复制from datetime import timedelta
from celery.schedules import crontab
BROKER_URL = 'redis://:xxx@127.0.0.1:6379/'
# BROKER_URL = 'redis://:xxx@127.0.0.1:32268/1.txt'
# BROKER_URL = 'redis://redis:6379/1'
CELERY_RESULT_BACKEND = 'redis://:xxx@localhost:6379/2'
# CELERY_RESULT_BACKEND = 'redis://:xx@xx:32268/2' #celety 6.0.0版本以上使用
# CELERY_RESULT_BACKEND = 'redis://redis:6379/2'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显
CELERY_ACCEPT_CONTENT = ["json"] # 指定任务接受的内容类型
#是否丢弃运行结果(丢弃结果会提升效率)
CELERY_IGNORE_RESULT = True
#指定时区,默认是UTC时间,由于celery对时区支持不是很好,所以我选择不指定
CELERY_TIMEZONE = "Asia/Shanghai" #默认是UTC时间
#导入指定的任务模块
CELERY_IMPORTS = (
'polling.taskOne',
)
#定时任务
CELERYBEAT_SCHEDULE = {
# 'task1':{
# 'task':'polling.taskOne.update', #任务名
# #'schedule': timedelta(seconds=30), #设置每10s执行一次
# 'schedule': crontab(minute=14, hour=12, day_of_week=[1.txt,2,3,4,5]), #设置每10s执行一次
# # 'args':(10,100)
# },
# 'task2': {
# 'task': 'polling.taskOne.get_monitor', # 任务名
# 'schedule': timedelta(seconds=100), #设置每10s执行一次
# #'schedule': crontab(minute=50, hour=8, day_of_week=[1.txt, 2, 3, 4, 5]), # 设置每10s执行一次
# # 'args':(10,100)
# },
'task6': {
'task': 'polling.taskOne.refresh_data', # 任务名
'schedule': timedelta(seconds=60), #设置每10s执行一次
#'schedule': crontab(minute=50, hour=8, day_of_week=[1.txt, 2, 3, 4, 5]), # 设置每10s执行一次
# 'args':(10,100)
},
# 'task3': {
# 'task': 'polling.taskOne.refresh_kube', # 任务名
# # 'schedule': timedelta(seconds=100), # 设置每10s执行一次
# 'schedule': crontab(minute=14, hour=12, day_of_week=[1.txt, 2, 3, 4, 5]), # 设置每10s执行一次
# # 'args':(10,100)
# },
# 'task3': {
# 'task': 'polling.taskOne.get_extime_project', # 任务名
# 'schedule': timedelta(seconds=10), #设置每10s执行一次
# #'schedule': crontab(minute=50, hour=8, day_of_week=[1.txt, 2, 3, 4, 5]), # 设置每10s执行一次
# # 'args':(10,100)
# }
}
编写任务模块 taskOne.py
代码语言:javascript复制@app.task() #使用app.task()
def getpod():
url = "http://192.168.1.120:4000/prod-api/kube/"
result = requests.get(url=url)
send_pod_msg()
return result.text
编写初始化文件 __init__.py
代码语言:javascript复制from celery import Celery
app = Celery("polling") #需要运行的任务模块
#通过celery实例加载配置模块
app.config_from_object('polling.celeryconfig') #任务的配置文件
运行命令
代码语言:javascript复制"""
win运行:
celery -A polling.taskOne beat -l info
celery -A polling.taskOne worker -l info -P eventlet
linux运行:
python3 -m celery -A polling.taskOne beat -l info
python3 -m celery -A polling.taskOne worker -l info
"""
注意
celery 报错合集:
ModuleNotFoundError: No module named 'polling' #如果是基于django做任务 没有在任务模块同级会报错,解决办法进入模块同级目录运行
__init__ username 出现这个错误是没有安装kombu或版本不正确
pip3 install kombu==5.1.0 解决办法
如何django集成celery
需要安装
pip3 install django-celery-beat
python3 manage.py makemigrations
python3 manage.py migrate
数据库中会出现对应的django-celery-beat表