码农技术炒股之路——任务管理器

2019-01-16 15:50:50 浏览数 (1)

        系统任务和普通任务都是通过任务管理器调度的。它们的区别是:系统任务在程序运行后即不会被修改,而普通任务则会被修改。(转载请指明出于breaksoftware的csdn博客)

        为什么要有这样的设计?因为我希望它是一个可以不用停止服务就可以更新相关配置的系统。比如我们现在要加一个普通任务,我们只要修改下普通任务配置文件即可。再比如我们需要修改数据库中表结构,我们也不用停止服务修改代码来保证数据格式的一致性。

        我们程序需要知道配置文件是否被修改。如何去做?一种方法是借用一些系统方法监听相应配置文件的修改,一旦文件有变化,马上通知我们的主程序去处理。另一种则是采用轮询检查机制,即定期去生成差异结果。为了不让这个系统更加复杂,我选择后者。而它就是我所谓的系统任务。

        不管是系统任务还是普通任务,实现的类都要继承于job_base

代码语言:javascript复制
from abc import ABCMeta,abstractmethod
class job_base:
    __metaclass__ = ABCMeta
    @abstractmethod
    def run(self):
        pass

        有这个限制主要是为了保证每个任务都是run方法。调度框架将执行该方法以完成任务执行。

        在《码农技术炒股之路——架构和设计》一文中,介绍了我们将基于APScheduler实现任务调度功能。首先我们需要启动BackgroundScheduler对象

代码语言:javascript复制
from apscheduler.schedulers.background import BackgroundScheduler

@singleton
class job_center():
    def __init__(self):
        self._sched = None
        self._job_conf_path = ""
        self._job_id_handle = {}
        self._static_job_id_handle = {}
    
    def start(self):
        self._sched = BackgroundScheduler()
        self._sched.start()

        当我们需要加入任务时,则调用下面这个方法

代码语言:javascript复制
    def add_jobs(self, jobs_info, is_static = False):
        if None == self._sched:
            LOG_WARNING("job center must call start() first")
            return

        for (job_name,job_info) in jobs_info.items():
            if is_static and job_name in self._static_job_id_handle.keys():
                continue
            job_type = job_info["type"]
            class_name = job_info["class"]
            job_handle = self._get_obj(class_name)
            if is_static:
                self._static_job_id_handle[job_name] = job_handle
            else:
                self._job_id_handle[job_name] = job_handle
            cmd = "self._sched.add_job(job_handle.run, job_type, id = job_name"
            params = self._join_params(job_info)
            if 0 != len(params):
                cmd  = " , "
                cmd  = params
            cmd  = ")"
            #print cmd
            eval(cmd)

        jobs_info保存的是任务配置文件中任务信息,我们看个样例

代码语言:javascript复制
[update_share_base_info]
type=cron
class=update_stock_base_info
day_of_week=1-5
hour=9
minute=30
second=10
timezone = Asia/Shanghai

        这个配置中除了type和class,其他都是APScheduler框架中add_job方法中的参数。上面配置意思是:以上海时间,从周一到周五,早上9点30分10秒执行一次。

        add_jobs中通过class字段,获取该class对应的一个对象

代码语言:javascript复制
    def _get_obj(self, _cls_name):  
        _packet_name = _cls_name  
        _module_home = __import__(_packet_name,globals(),locals(),[_cls_name])
        obj =  getattr(_module_home,_cls_name)  
        class_obj = obj()
        return class_obj

        这儿又要提到我之前特别强调过的单例使用方式。经过测试,《码农技术炒股之路——配置管理器、日志管理器》中单例的实现可以保证上面这个方法获取的是同一个对象,而网上其他单例模式则不行。         获取对象后,我们要组装出要执行的命令。cmd = "self._sched.add_job(job_handle.run, job_type, id = job_name"中job_handle就是上面获取的对象,而run则是每个job都要有的方法。这也是为什么要求每个任务类都要继承于job_base的原因。

        之后调用_join_params将配置文件中其他信息组装成参数拼接出完整命令

代码语言:javascript复制
    def _join_params(self, job_info):
        params = ""
        param = ""
        job_type = job_info["type"]
        for key in job_info.keys():
            if key in conf_keys.job_conf_info_dict[job_type]:
                if  0 != len(params):
                    params  = ' , '
                value = job_info[key]
                if value.isdigit():
                    param = key   " = "   value
                else:
                    param = key   " = '"   value   "'"
                if 0 != len(param):
                    params  = param
        return params

        如此我们配置中的任务就会被加入到APScheduler调度队列中。

        我们再看下如何删除一个任务

代码语言:javascript复制
    def remove_jobs(self, jobs_info):
        if None == self._sched:
            LOG_WARNING("job center must call start() first")
            return

        for job_name in jobs_info.keys():
            self._sched.remove_job(job_name)
            self._job_id_handle.pop(job_name)

        第7行通过任务名称在APScheduler中把任务删除。第8行将任务对应的对象从列表中删除。为什么要使用_job_id_handle去保存这些任务对象呢?因为如果不在一个更大的生命周期内保存它,它就会被认为是一个局部变量,从而被释放,导致之后APScheduler再也调用不了它。         我们看个管理普通任务的系统任务代码

代码语言:javascript复制
@singleton
class j_load_job_conf(job_base):
    def __init__(self):
        self._pre_jobs_info = {}
        self._frame_conf_inst = scheduler_frame_conf_inst()
        self._job_center = job_center()

    def run(self):
        section_name = "strategy_job"
        option_name = "conf_path"
        if False == self._frame_conf_inst.has_option(section_name, option_name):
            LOG_WARNING("no %s %s" % (section_name, option_name))
            return
        conf_path = self._frame_conf_inst.get(section_name, option_name)
        LOG_DEBUG("Load %s %s %s" % (section_name, option_name, conf_path))
        
        job_conf_parser_obj = job_conf_parser()
        jobs_info = job_conf_parser_obj.parse(conf_path)
        self._execute_jobs(jobs_info)

    def _execute_jobs(self, jobs_info):
        add_dict = {}
        remove_dict = {}
        modify_dict = {}

        frame_tools.dict_diff(jobs_info, self._pre_jobs_info, add_dict, remove_dict, modify_dict)

        add_jobs_info = dict(add_dict, **modify_dict)

        remove_jobs_info = {}
        for item in modify_dict.keys():
            remove_jobs_info[item] = self._pre_jobs_info[item]

        LOG_INFO("add jobs %s" % (json.dumps(add_jobs_info)))
        LOG_INFO("remove jobs %s" % (json.dumps(remove_jobs_info)))
        
        if 0 == len(add_jobs_info) and 0 == len(remove_jobs_info):
            return

        self._pre_jobs_info = jobs_info

        self._job_center.remove_jobs(remove_jobs_info)
        self._job_center.add_jobs(add_jobs_info)

        run方法将会定期执行。它会从固定目录读取普通任务配置文件信息。然后在_execute_jobs方法中,通过和上一次读取的任务信息对比,生成三个字典:需要删除的任务、需要新增的任务和需要修改的任务。需要修改的任务将变成先删除后新增的方式实现修改。所以最后操作的将是两个字段信息。         普通任务本文就不介绍了,之后介绍的每个抓取和离线计算业务都是普通任务。

0 人点赞