系统任务和普通任务都是通过任务管理器调度的。它们的区别是:系统任务在程序运行后即不会被修改,而普通任务则会被修改。(转载请指明出于breaksoftware的csdn博客)
为什么要有这样的设计?因为我希望它是一个可以不用停止服务就可以更新相关配置的系统。比如我们现在要加一个普通任务,我们只要修改下普通任务配置文件即可。再比如我们需要修改数据库中表结构,我们也不用停止服务修改代码来保证数据格式的一致性。
我们程序需要知道配置文件是否被修改。如何去做?一种方法是借用一些系统方法监听相应配置文件的修改,一旦文件有变化,马上通知我们的主程序去处理。另一种则是采用轮询检查机制,即定期去生成差异结果。为了不让这个系统更加复杂,我选择后者。而它就是我所谓的系统任务。
不管是系统任务还是普通任务,实现的类都要继承于job_base
代码语言:javascript复制from abc import ABCMeta,abstractmethod
class job_base:
__metaclass__ = ABCMeta
@abstractmethod
def run(self):
pass
有这个限制主要是为了保证每个任务都是run方法。调度框架将执行该方法以完成任务执行。
在《码农技术炒股之路——架构和设计》一文中,介绍了我们将基于APScheduler实现任务调度功能。首先我们需要启动BackgroundScheduler对象
代码语言:javascript复制from apscheduler.schedulers.background import BackgroundScheduler
@singleton
class job_center():
def __init__(self):
self._sched = None
self._job_conf_path = ""
self._job_id_handle = {}
self._static_job_id_handle = {}
def start(self):
self._sched = BackgroundScheduler()
self._sched.start()
当我们需要加入任务时,则调用下面这个方法
代码语言:javascript复制 def add_jobs(self, jobs_info, is_static = False):
if None == self._sched:
LOG_WARNING("job center must call start() first")
return
for (job_name,job_info) in jobs_info.items():
if is_static and job_name in self._static_job_id_handle.keys():
continue
job_type = job_info["type"]
class_name = job_info["class"]
job_handle = self._get_obj(class_name)
if is_static:
self._static_job_id_handle[job_name] = job_handle
else:
self._job_id_handle[job_name] = job_handle
cmd = "self._sched.add_job(job_handle.run, job_type, id = job_name"
params = self._join_params(job_info)
if 0 != len(params):
cmd = " , "
cmd = params
cmd = ")"
#print cmd
eval(cmd)
jobs_info保存的是任务配置文件中任务信息,我们看个样例
代码语言:javascript复制[update_share_base_info]
type=cron
class=update_stock_base_info
day_of_week=1-5
hour=9
minute=30
second=10
timezone = Asia/Shanghai
这个配置中除了type和class,其他都是APScheduler框架中add_job方法中的参数。上面配置意思是:以上海时间,从周一到周五,早上9点30分10秒执行一次。
add_jobs中通过class字段,获取该class对应的一个对象
代码语言:javascript复制 def _get_obj(self, _cls_name):
_packet_name = _cls_name
_module_home = __import__(_packet_name,globals(),locals(),[_cls_name])
obj = getattr(_module_home,_cls_name)
class_obj = obj()
return class_obj
这儿又要提到我之前特别强调过的单例使用方式。经过测试,《码农技术炒股之路——配置管理器、日志管理器》中单例的实现可以保证上面这个方法获取的是同一个对象,而网上其他单例模式则不行。 获取对象后,我们要组装出要执行的命令。cmd = "self._sched.add_job(job_handle.run, job_type, id = job_name"中job_handle就是上面获取的对象,而run则是每个job都要有的方法。这也是为什么要求每个任务类都要继承于job_base的原因。
之后调用_join_params将配置文件中其他信息组装成参数拼接出完整命令
代码语言:javascript复制 def _join_params(self, job_info):
params = ""
param = ""
job_type = job_info["type"]
for key in job_info.keys():
if key in conf_keys.job_conf_info_dict[job_type]:
if 0 != len(params):
params = ' , '
value = job_info[key]
if value.isdigit():
param = key " = " value
else:
param = key " = '" value "'"
if 0 != len(param):
params = param
return params
如此我们配置中的任务就会被加入到APScheduler调度队列中。
我们再看下如何删除一个任务
代码语言:javascript复制 def remove_jobs(self, jobs_info):
if None == self._sched:
LOG_WARNING("job center must call start() first")
return
for job_name in jobs_info.keys():
self._sched.remove_job(job_name)
self._job_id_handle.pop(job_name)
第7行通过任务名称在APScheduler中把任务删除。第8行将任务对应的对象从列表中删除。为什么要使用_job_id_handle去保存这些任务对象呢?因为如果不在一个更大的生命周期内保存它,它就会被认为是一个局部变量,从而被释放,导致之后APScheduler再也调用不了它。 我们看个管理普通任务的系统任务代码
代码语言:javascript复制@singleton
class j_load_job_conf(job_base):
def __init__(self):
self._pre_jobs_info = {}
self._frame_conf_inst = scheduler_frame_conf_inst()
self._job_center = job_center()
def run(self):
section_name = "strategy_job"
option_name = "conf_path"
if False == self._frame_conf_inst.has_option(section_name, option_name):
LOG_WARNING("no %s %s" % (section_name, option_name))
return
conf_path = self._frame_conf_inst.get(section_name, option_name)
LOG_DEBUG("Load %s %s %s" % (section_name, option_name, conf_path))
job_conf_parser_obj = job_conf_parser()
jobs_info = job_conf_parser_obj.parse(conf_path)
self._execute_jobs(jobs_info)
def _execute_jobs(self, jobs_info):
add_dict = {}
remove_dict = {}
modify_dict = {}
frame_tools.dict_diff(jobs_info, self._pre_jobs_info, add_dict, remove_dict, modify_dict)
add_jobs_info = dict(add_dict, **modify_dict)
remove_jobs_info = {}
for item in modify_dict.keys():
remove_jobs_info[item] = self._pre_jobs_info[item]
LOG_INFO("add jobs %s" % (json.dumps(add_jobs_info)))
LOG_INFO("remove jobs %s" % (json.dumps(remove_jobs_info)))
if 0 == len(add_jobs_info) and 0 == len(remove_jobs_info):
return
self._pre_jobs_info = jobs_info
self._job_center.remove_jobs(remove_jobs_info)
self._job_center.add_jobs(add_jobs_info)
run方法将会定期执行。它会从固定目录读取普通任务配置文件信息。然后在_execute_jobs方法中,通过和上一次读取的任务信息对比,生成三个字典:需要删除的任务、需要新增的任务和需要修改的任务。需要修改的任务将变成先删除后新增的方式实现修改。所以最后操作的将是两个字段信息。 普通任务本文就不介绍了,之后介绍的每个抓取和离线计算业务都是普通任务。