痛点
如果日常工作需求对定时器功能的依赖,
比如:
1、自动化脚本定时采集性能
2、在flask后端服务中,需要定时同步数据
3、定时启动某些程序
基于上述需要或者痛点,需要找一个定时器功能实现.
定时器功能第一时间想到的是linux自带的cron功能.
例子: crontab -e命令,新增一个如下命令,保存即可.
代码语言:javascript复制*/1 * * * * ~/pythonProject/dist/PingLocalMac >> ~/Desktop/PingLocalMac.txt
cron也有缺点: 1、不适合复杂的定时任务 2、定时任务修改,需要重启crontab管理 3、定时任务,没有状态存储,也不是知道是否执行了
如果你的需求正好是cron不能实现的,那给你推荐一款python轻量级定时器"apscheduler"
简介
APScheduler是python的一个定时任务调度框架,能实现类似linux下crontab类型的任务,使用起来比较方便。它提供基于固定时间间隔、日期以及crontab配置类似的任务调度。
安装
代码语言:javascript复制pip install apscheduler
基本概念介绍
触发器(triggers):
触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发。每个作业都有它自己的触发器,除了初始配置之外,触发器是完全无状态的。
作业存储器(job stores):
作业存储器指定了作业被存放的位置,默认情况下作业保存在内存,也可将作业保存在各种数据库中,当作业被存放在数据库中时,它会被序列化,当被重新加载时会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储。
执行器(executors):
执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。
调度器(schedulers):任务调度器,属于控制角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员通常不需要直接处理作业存储器、执行器或触发器,配置作业存储器和执行器是通过调度器来完成的。
调度器的工作流程
入门脚本
代码语言:javascript复制启动一个阻塞性脚本,每隔3s打印一次
# -*- coding: utf-8 -*-
# Time: 2018/10/13 19:01:30
# File Name: ex_interval.py
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(tick, 'interval', seconds=3)
print('Press Ctrl {0} to exit'.format('Break' if os.name == 'nt' else 'C '))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
print("#######################")
block定时器
使用配置
代码语言:javascript复制jobstores = {
#'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite'),
# 本地数据库地址
# 使用内存作为数据库
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(20), # 线程池
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3 # 实力个数
}
cron 任务
代码语言:javascript复制定时 cron 任务也非常简单,直接给触发器 trigger 传入 'cron' 即可。hour =19 ,minute =23 这里表示每天的19:23 分执行任务。这里可以填写数字,也可以填写字符串
# -*- coding: utf-8 -*-
# Time: 2018/10/13 19:21:09
# File Name: ex_cron.py
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(tick, 'cron', hour=19,minute=23)
print('Press Ctrl {0} to exit'.format('Break' if os.name == 'nt' else 'C '))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
代码语言:javascript复制hour =19 , minute =23
hour ='19', minute ='23'
minute = '*/3' 表示每 5 分钟执行一次
hour ='19-21', minute= '23' 表示 19:23、 20:23、 21:23 各执行一次任务
background定时器
代码语言:javascript复制"""
后台性定时器
"""
import os
import time
import datetime
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import logging
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
def my_job(id='my_job_id'):
print(id, '-->', datetime.datetime.now())
#scheduler.pause() # 暂停
#scheduler.resume() # 恢复
#scheduler.start(paused=True)
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), id)
scheduler = BackgroundScheduler()
scheduler.add_job(my_job, trigger='interval',seconds=1, id='my_job_id_test')
scheduler.start()
配置调度器
代码语言:javascript复制jobstores = {
# 'mongo': MongoDBJobStore(),
#'mongo': {'type': 'mongodb'},
#'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite'),
# 使用数据库作为存储器
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
其他方法
代码语言:javascript复制scheduler.remove_job('my_job_id')
# 移除任务
print(scheduler.get_jobs())
# 获取任务列表
print(scheduler.get_job(job_id="my_job_id"))
# 获取任务详情
temp_dict = {"seconds": 5}
temp_trigger = scheduler._create_trigger(trigger='interval', trigger_args=temp_dict)
result = scheduler.modify_job(job_id='my_job_id_test', trigger=temp_trigger)
# 修改任务
result = scheduler.reschedule_job(job_id='my_job_id_test', trigger='interval', seconds=4)
# 修改任务
scheduler.pause()
# 暂停作业
scheduler.resume()
# 恢复作业
监听事件
代码语言:javascript复制def my_listener(event):
if event.exception:
print(event.exception)
# 打印异常信息
print('The job crashed')
else:
print('The job worked'
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)