定时触发函数的Python实现

2021-01-28 01:36:30 浏览数 (1)

一、使用场景:

定时触发器在生产环境经常用到,比如说定时load一段活动配置,定时做清理存储动作,定时检查进程运行健康状态,定时上报事件日志等。

定时触发器的实现原理,一般是依赖io非阻塞复用(比如epoll的定时fd)。

二、基本设计:

  • 定时时间下一次时间点计算功能
  • 检测函数执行是否成功,以及事后回调,事后回调必须完成是否重新调度或者删除任务
  • 删除任务可由函数执行失败触发(因为一次失败的任务,下次可能还会失败),或者提供手动cancel()接口来取消任务
代码语言:javascript复制
class TriggerFunction(object):
    def try_to_call(self, trigger_finished_cb):

        if self._delete:
            trigger_finished_cb(self)
            return
            
        def go():
            try:
                r = self._func(*self._args, **self._kwargs)
                if r == False:
                    self._delete = True
            except:
                log.error("Exception in trigger function %r" % self._func)
            finally:

                now = time.time()
                self._next_call_timestamp = now   self._interval

                trigger_finished_cb(self)

         go()
         
    def cancel(self):
        self._delete = True
        self._func = None
        self._args = None
        self._kwargs = None

事后函数在另一个调度class,里面有这个trigger_finished方法:

代码语言:javascript复制

  def trigger_finished(trigger_func):
      if not trigger_func._delete:
          self._schedule(trigger_func)
      self._running_triggers.remove(trigger_func)

三、多线程环境下更多设计:

  • 考虑到函数可能被多次同时调用(想象一下,如果每秒定时的任务队列,如果上一次的函数执行时间过长,超过1s,那么下下一秒的任务会第二次同时进入函数),所以一次调用执行过程中必须不能被打扰,必须加一个锁保护。
  • 考虑不用锁的实现,在python里面有个叫greenlet协程设计
  • 是否是每次都准时 1个周期的隔离点调用,还是说这个定时周期不包括函数的执行时间。
  • 如果是定时间的调用,想象一下有多个定时器在同时调用,那么在同一时间可能会形成性能高峰,所以需要加入加一个随机偏差值提供给用户选择。
  • 调度函数执行如果在主线程,应该不应该阻塞主线程,而是在事件线程中操作

按照设计,更新下代码

代码语言:javascript复制
class TriggerFunction(object):
    def try_to_call(self, trigger_finished_cb):
        /*更新部分,判断是否取到锁*/
        if self._lock:
            return
        /*更新部分*/

        if self._delete:
            trigger_finished_cb(self)
            return
            
        def go():
            try:
                /*更新部分,取到锁*/
                self._lock = True
                /*更新部分*/
                r = self._func(*self._args, **self._kwargs)
                if r == False:
                    self._delete = True
            except:
                log.error("Exception in trigger function %r" % self._func)
            finally:
                /*更新部分,释放锁*/
                self._lock = False
                /*更新部分*/

                /*更新部分,是否按时执行,计算下一个时间间隔*/
                now = time.time()
                if self._always_on_time and self._interval:
                    while self._next_call_timestamp < now:
                        self._next_call_timestamp  = self._interval
                else:
                    self._next_call_timestamp = now   self._interval
                /*更新部分*/

                /*更新部分,是否随机执行*/
                if self._diffusion:
                    self._next_call_timestamp  = 
                        self._interval * random.uniform(-0.2, 0.2)
                /*更新部分*/

                trigger_finished_cb(self)

        /*更新部分,是否随机执行*/
        try:
            if greenlet:
                g = greenlet.greenlet(go)
                g.switch()
            else:
                go()
        /*greelet协程无锁设计*/
         
    def cancel(self):
        self._delete = True
        self._func = None
        self._args = None
        self._kwargs = None

0 人点赞