一、使用场景:
定时触发器在生产环境经常用到,比如说定时load一段活动配置,定时做清理存储动作,定时检查进程运行健康状态,定时上报事件日志等。
定时触发器的实现原理,一般是依赖io非阻塞复用(比如epoll的定时fd)。
二、基本设计:
- 定时时间下一次时间点计算功能
- 检测函数执行是否成功,以及事后回调,事后回调必须完成是否重新调度或者删除任务
- 删除任务可由函数执行失败触发(因为一次失败的任务,下次可能还会失败),或者提供手动cancel()接口来取消任务
class TriggerFunction(object):
def try_to_call(self, trigger_finished_cb):
if self._delete:
trigger_finished_cb(self)
return
def go():
try:
r = self._func(*self._args, **self._kwargs)
if r == False:
self._delete = True
except:
log.error("Exception in trigger function %r" % self._func)
finally:
now = time.time()
self._next_call_timestamp = now self._interval
trigger_finished_cb(self)
go()
def cancel(self):
self._delete = True
self._func = None
self._args = None
self._kwargs = None
事后函数在另一个调度class,里面有这个trigger_finished方法:
代码语言:javascript复制
def trigger_finished(trigger_func):
if not trigger_func._delete:
self._schedule(trigger_func)
self._running_triggers.remove(trigger_func)
三、多线程环境下更多设计:
- 考虑到函数可能被多次同时调用(想象一下,如果每秒定时的任务队列,如果上一次的函数执行时间过长,超过1s,那么下下一秒的任务会第二次同时进入函数),所以一次调用执行过程中必须不能被打扰,必须加一个锁保护。
- 考虑不用锁的实现,在python里面有个叫greenlet协程设计
- 是否是每次都准时 1个周期的隔离点调用,还是说这个定时周期不包括函数的执行时间。
- 如果是定时间的调用,想象一下有多个定时器在同时调用,那么在同一时间可能会形成性能高峰,所以需要加入加一个随机偏差值提供给用户选择。
- 调度函数执行如果在主线程,应该不应该阻塞主线程,而是在事件线程中操作
按照设计,更新下代码
代码语言:javascript复制class TriggerFunction(object):
def try_to_call(self, trigger_finished_cb):
/*更新部分,判断是否取到锁*/
if self._lock:
return
/*更新部分*/
if self._delete:
trigger_finished_cb(self)
return
def go():
try:
/*更新部分,取到锁*/
self._lock = True
/*更新部分*/
r = self._func(*self._args, **self._kwargs)
if r == False:
self._delete = True
except:
log.error("Exception in trigger function %r" % self._func)
finally:
/*更新部分,释放锁*/
self._lock = False
/*更新部分*/
/*更新部分,是否按时执行,计算下一个时间间隔*/
now = time.time()
if self._always_on_time and self._interval:
while self._next_call_timestamp < now:
self._next_call_timestamp = self._interval
else:
self._next_call_timestamp = now self._interval
/*更新部分*/
/*更新部分,是否随机执行*/
if self._diffusion:
self._next_call_timestamp =
self._interval * random.uniform(-0.2, 0.2)
/*更新部分*/
trigger_finished_cb(self)
/*更新部分,是否随机执行*/
try:
if greenlet:
g = greenlet.greenlet(go)
g.switch()
else:
go()
/*greelet协程无锁设计*/
def cancel(self):
self._delete = True
self._func = None
self._args = None
self._kwargs = None