python测试开发django-161.Celery 定时任务保存到数据库 (djcelery)

2021-11-05 10:07:32 浏览数 (1)

前言

接着前面Celery 定时任务,这篇使用Celery djcelery 把定时任务存到数据库。

djcelery 环境准备

定时任务基础环境准备,就不多说了,接着前面一篇https://www.cnblogs.com/yoyoketang/p/15432907.html. Celery的使用方式有两种:

  • Celery 只用Celery,本身自带worker 和 beat (定时任务)功能,定时任务在setting配置
  • Celery djcelery 使用了djcelery,可以在任务中方便的直接操作 Django 数据库,而且最终的任务可以在 Django 的后台中查看和修改相关的任务。

多安装一个 djcelery 主要是把定时任务放到数据库中,方便配置和管理。

pip 安装django-celery

代码语言:javascript复制
pip install django-celery==3.3.1

在 setting 里面配置 INSTALLED_APPS,添加’djcelery’

代码语言:javascript复制
INSTALLED_APPS = [
    ......
    'djcelery'
]

同步数据库

代码语言:javascript复制
python manage.py makemigrations
python manage.py migrate

执行完成后会看到djcelery相关的几张表

CELERY 配置

在setting.py 添加CELERY 相关配置

代码语言:javascript复制
import djcelery
djcelery.setup_loader()

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

BROKER_URL = 'redis://192.168.1.1:6379'
# RESULT_BACKEND 结果保存数据库
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
# SCHEDULER 定时任务保存数据库
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

相对于前面一篇,修改了以下内容

代码语言:javascript复制
# 新增这2句
import djcelery
djcelery.setup_loader()

# RESULT_BACKEND 结果保存数据库
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
# SCHEDULER 定时任务保存数据库
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

tasks任务

在app下新建tasks.py,必须要是tasks.py文件名称,django会自动查找到app下的该文件

代码语言:javascript复制
from __future__ import absolute_import
from celery import shared_task

@shared_task
def add(x, y):
    print("task----------111111----------")
    return x   y

@shared_task
def mul(x, y):
    print("task----------22222----------")
    return x * y

views视图创建任务

创建视图,把定时任务信息写入数据库

代码语言:javascript复制
from django.http import JsonResponse
import datetime
import json
from djcelery.models import PeriodicTask, CrontabSchedule

def create_task(request):
    task_name = "test"     # 唯一值,自定义不能重复, 这里是测试下,先写死
    task = 'yoyo.tasks.add'  # 任务的注册路径
    # task_args = [10, 11]   # 任务参数
    task_kwargs = {'x': 10, 'y': 11}  # 关键字参数
    # 定时任务规则
    crontab_time = {
        'minute': '*/2',       # 每2分钟执行一次
        'hour': '*',
        'day_of_week': '*',
        'day_of_month': '*',
        'month_of_year': '*'
    }
    # 写入 schedule表
    schedule = CrontabSchedule.objects.create(**crontab_time)
    # 任务和 schedule 关联
    task, created = PeriodicTask.objects.get_or_create(
        name=task_name,        # 名称保持唯一
        task=task,
        crontab=schedule,
        enabled=True,      # 是否开启任务
        # args=json.dumps(task_args),
        kwargs=json.dumps(task_kwargs),
        # 任务过期时间,设置当前时间往后1天
        expires=datetime.datetime.now() datetime.timedelta(days=1)
    )
    if created:
        return JsonResponse({"code": 0, "msg": "success"})
    else:
        return JsonResponse({"code": 111, "msg": "create failed"})

分别启动django, worker 和 beat服务

代码语言:javascript复制
python manage.py runserver 0.0.0.0:8000
celery -A MyDjango beat -l info
celery -A MyDjango worker -l info

访问接口触发后,数据库 djcelery_crontabschedule 表会写入定时信息

djcelery_periodictask 表记录任务信息

0 人点赞