1.安装
代码语言:javascript复制pip install schedule
2.文档
代码语言:javascript复制https://schedule.readthedocs.io/en/stable/faq.html#how-to-execute-jobs-in-parallel
3.官网使用demo
代码语言:javascript复制import schedule
import time
def job():
print("I'm working...")
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)
while True:
schedule.run_pending()
time.sleep(1)
4.我的schedule使用demo
代码语言:javascript复制import sys
import time
import schedule
import os
import logging
if not os.path.exists('/var/log/video_download/'):
os.makedirs('/var/log/video_download')
log = logging.getLogger()
log.setLevel(logging.DEBUG)
fmt = logging.Formatter("%(asctime)s %(pathname)s %(filename)s %(funcName)s %(lineno)s %(levelname)s - %(message)s",
"%Y-%m-%d %H:%M:%S")
stream_handler = logging.FileHandler(
'/var/log/video_download/debug-%s.log' % (time.strftime('%Y-%m-%d', time.localtime(time.time()))))
stream_handler.setLevel(logging.DEBUG)
stream_handler.setFormatter(fmt)
log.addHandler(stream_handler)
def handler():
print("this is handler")
def main():
if len(sys.argv) != 2:
print('python video_data.py hour')
sys.exit()
param = sys.argv[1]
if param == 'hour':
log.debug("enter main")
schedule.every().day.at("00:00").do(handler)
schedule.every().hour.do(handler)
while True:
schedule.run_pending()
time.sleep(1)
else:
print("python video_data.py hour")
sys.exit()
if __name__ == "__main__":
main()
5.拓展:
并行执行任务
(1)默认情况下,schedule按顺序执行所有作业。这背后的原因是很难找到一个让每个人都开心的并行执行模型
代码语言:javascript复制import threading
import time
import schedule
def job():
print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
while 1:
schedule.run_pending()
time.sleep(1)
(2)如果需要控制线程数,就需要用queue
代码语言:javascript复制import Queue
import time
import threading
import schedule
def job():
print("I'm working")
def worker_main():
while 1:
job_func = jobqueue.get()
job_func()
jobqueue.task_done()
jobqueue = Queue.Queue()
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
worker_thread = threading.Thread(target=worker_main)
worker_thread.start()
while 1:
schedule.run_pending()
time.sleep(1)
(3)抛出异常
代码语言:javascript复制import functools
def catch_exceptions(cancel_on_failure=False):
def catch_exceptions_decorator(job_func):
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except:
import traceback
print(traceback.format_exc())
if cancel_on_failure:
return schedule.CancelJob
return wrapper
return catch_exceptions_decorator
@catch_exceptions(cancel_on_failure=True)
def bad_task():
return 1 / 0
schedule.every(5).minutes.do(bad_task)
(4)只运行一次
代码语言:javascript复制def job_that_executes_once():
# Do some work ...
return schedule.CancelJob
schedule.every().day.at('22:30').do(job_that_executes_once)
(5)一次取消多个任务
代码语言:javascript复制def greet(name):
print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
schedule.clear('daily-tasks')
(6)在任务中加入日志功能
代码语言:javascript复制import functools
import time
import schedule
# This decorator can be applied to
def with_logging(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
print('LOG: Running job "%s"' % func.__name__)
result = func(*args, **kwargs)
print('LOG: Job "%s" completed' % func.__name__)
return result
return wrapper
@with_logging
def job():
print('Hello, World.')
schedule.every(3).seconds.do(job)
while 1:
schedule.run_pending()
time.sleep(1)
(7)随机开展工作
代码语言:javascript复制def my_job():
# This job will execute every 5 to 10 seconds.
print('Foo')
schedule.every(5).to(10).seconds.do(my_job)
(8)传参给作业函数
代码语言:javascript复制def greet(name):
print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')