Python定时器APScheduler

2022-05-10 13:55:39 浏览数 (1)

简介:APScheduler是python的一个定时任务调度框架,能实现类似linux下crontab类型的任务,使用起来比较方便。它提供基于固定时间间隔、日期以及crontab配置类似的任务调度。

一、调度方法

安装: pip install apscheduler

1、BackgroundScheduler调度器

调用start函数后会阻塞当前线程。当调度器是你应用中唯一要运行的东西时使用

2、BlockingScheduler调度器

调用start后主线程不会阻塞。当你不运行任何其他框架时使用,并希望调度器在你应用的后台执行

二、举个例子

代码语言:javascript复制
from apscheduler.schedulers.background import BackgroundScheduler

aps = BackgroundScheduler()

def RunMonitor():
    # 第一个参数为目标函数,第二个为内置的一个名称,seconds为执行的间隔
    aps.add_job(RunCaseEnv, 'interval', seconds=40 * 60, args=['monitor'])

    # 启动线程任务
    aps.start()

def RunCaseEnv(env):
    print('*****************************',env)

def StopMonitor():
    ## 结束进程
    aps.shutdown(wait=False)
    aps.remove_all_jobs()

以BackgroundScheduler调度为例 1、新增一个定时任务:每隔40分钟执行一次RunCaseEnv函数,传参为args=['monitor'] 但由于是BackgroundScheduler调度方式,所以每次定时任务并不会立即执行,而是等到40分钟后开始执行

2、结束定时任务 默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果不想等待,可以使用wait=False

3、暂停和重启定时任务 暂停任务:

代码语言:javascript复制
apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()

恢复任务:

代码语言:javascript复制
apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()

三、django_apscheduler

在使用Django框架开发web项目时,通过前端页面灵活设置定时活动的框架,使用方法与APScheduler相同 安装: pip install django-apscheduler

先在settings.py文件的INSTALLED_APPS中加入django-apscheduler应用

迁移数据库表 python manage.py migrate

views.py文件中添加开启监控的方法

代码语言:javascript复制
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_job

scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), 'default')


# 创建任务
def send_email_adjec(request):
    '''
    :param request:
    :return: 手动发送邮件
    '''

    # result = sendEmail.caseOverview(True)
    # sendEmail.SendDingTalk(result['sum'], result['fail'])

    # 创建任务
    # jobtest是要定时执行的任务
    scheduler.add_job(jobtest, 'interval', seconds=10, args='s')

    scheduler.start()

    return HttpResponseRedirect('/ZDH/android/')

# 具体要执行的代码
def jobtest(s):
    print('jobtest')

0 人点赞