源码分析 并行分布式任务队列 Celery 之 Timer & Heartbeat
目录
- [源码分析] 并行分布式任务队列 Celery 之 Timer & Heartbeat
- 0x00 摘要
-
- 0x01 Blueprint
- 0x02 Timer Step
- 2.1 Transport
- 2.2 Thread-less VS Thread-based
- 0x03 Timer in Pool
- 3.1 gevent 和 eventlet
- 3.2 BasePool
- 0x04 kombu.Timer
- 4.1 异步
- 4.2 调用
- 4.2.1 添加 timer function
- 4.2.2 调用
- 4.3 实验
- 4.3.1 示例代码
- 4.3.2 Hub 的使用
- 0x05 timer2
- 0x06 Heart
- 6.1 Heart in Bootstep
- 6.2 Heart in Consumer
- 6.3 worker-online
- 6.4 worker-offline
- 6.5 发送心跳
- 0xEE 个人信息
- 0xFF 参考
0x00 摘要
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
之前我们用了十几篇文章,介绍了 Kombu 和 Celery 的基础功能。从本文开始,我们介绍 Celery 的一些辅助功能(比如负载均衡,容错等等)。其实从某种意义上来说,这些辅助功能更加重要。
本文我们介绍 Timer 和 Heart 这两个组件。大家可以看看底层设计是如何影响上层实现的。
[源码分析] 消息队列 Kombu 之 mailbox
[源码分析] 消息队列 Kombu 之 Hub
[源码分析] 消息队列 Kombu 之 Consumer
[源码分析] 消息队列 Kombu 之 Producer
[源码分析] 消息队列 Kombu 之 启动过程
[源码解析] 消息队列 Kombu 之 基本架构
[源码解析] 并行分布式框架 Celery 之架构 (1)
[源码解析] 并行分布式框架 Celery 之架构 (2)
[源码解析] 并行分布式框架 Celery 之 worker 启动 (1)
[源码解析] 并行分布式框架 Celery 之 worker 启动 (2)
[源码解析] 分布式任务队列 Celery 之启动 Consumer
[源码解析] 并行分布式任务队列 Celery 之 Task是什么
[从源码学设计]celery 之 发送Task & AMQP
[源码解析] 并行分布式任务队列 Celery 之 消费动态流程
[源码解析] 并行分布式任务队列 Celery 之 多进程模型
[源码分析] 分布式任务队列 Celery 多线程模型 之 子进程
[源码分析]并行分布式任务队列 Celery 之 子进程处理消息
0x01 Blueprint
Celery 的 Worker初始化过程中,其内部各个子模块的执行顺序是由一个BluePrint类定义,并且根据各个模块之间的依赖进行排序(实际上把这种依赖关系组织成了一个 DAG)执行。
Celery worker 的 Blueprint 如下,我们可以看到 Timer,Hub 是 Celery Worker 的两个基本组件,提到 hub 是因为后面讲解需要用到。
代码语言:javascript复制class Blueprint(bootsteps.Blueprint):
"""Worker bootstep blueprint."""
name = 'Worker'
default_steps = {
'celery.worker.components:Hub', # 这里是 Hub
'celery.worker.components:Pool',
'celery.worker.components:Beat',
'celery.worker.components:Timer', # 这里是 Timer
'celery.worker.components:StateDB',
'celery.worker.components:Consumer',
'celery.worker.autoscale:WorkerComponent',
}
0x02 Timer Step
我们首先来到 Timer Step。
从 Timer 组件 的定义中可以看到,Timer 组件 会根据当前worker是否使用事件循环机制来决定创建什么类型的timer。
- 如果使用 eventloop,则使用
kombu.asynchronous.timer.Timer as _Timer
,这里具体等待动作由用户自己完成。 - 否则使用 Pool 内部的Timer类(就是
timer_cls='celery.utils.timer2.Timer'
),timer2 自己做了一个线程来做定时等待;
定义如下:
代码语言:javascript复制from kombu.asynchronous.timer import Timer as _Timer
class Timer(bootsteps.Step):
"""Timer bootstep."""
def create(self, w):
if w.use_eventloop: # 检查传入的Worker是否使用了use_eventloop
# does not use dedicated timer thread.
w.timer = _Timer(max_interval=10.0) # 直接使用kombu的timer做定时器
else:
if not w.timer_cls: # 如果配置文件中没有配置timer_clas
# Default Timer is set by the pool, as for example, the
# eventlet pool needs a custom timer implementation.
w.timer_cls = w.pool_cls.Timer # 使用缓冲池中的Timer
w.timer = self.instantiate(w.timer_cls,
max_interval=w.timer_precision,
on_error=self.on_timer_error,
on_tick=self.on_timer_tick) # 导入对应的类并实例化
起初看代码时候很奇怪,为什么要再单独定义一个 timer2?
原因推断是(因为对 Celery 的版本发展历史不清楚,所以此处不甚确定,希望有同学可以指正):依据 底层 Transport 的设计来对 Timer 做具体实现调整。
2.1 Transport
大家知道,Celery 是依赖于 Kombu,而在 Kombu 体系中,用 transport 对所有的 broker 进行了抽象,为不同的 broker 提供了一致的解决方案。通过Kombu,开发者可以根据实际需求灵活的选择或更换broker。
我们再回顾下具体 Kombu 的概念:
- Connection 是 AMQP 对 连接的封装;
- Channel 是 AMQP 对 MQ 操作的封装;
那么两者的关系就是对 MQ 的操作(Channel)必然离不开连接(Connection),但是 Kombu 并不直接让 Channel 使用 Connection 来发送 / 接受请求,而是引入了一个新的抽象 Transport。Transport 负责具体的 MQ 的操作,也就是说 Channel 的操作都会落到 Transport 上执行;
Transport 代表真实的 MQ 连接,也是真正连接到 MQ( redis / rabbitmq )的实例。就是存储和发送消息的实体,用来区分底层消息队列是用 amqp、Redis 还是其它实现的。
具体 Kombu 逻辑如下图,Transport 在左下角处 :
2.2 Thread-less VS Thread-based
对于 Transport,某些 rate-limit implementation(比如 RabbitMQ / Redis ) 为了减少开销,采用了event-loop(底层使用了 Epoll),是 thread-less and lock-free。
而其他旧类型的 Transport 就是 Thread based,比如 Mongo。因此,
即,选用 timer 的哪种实现,看是否需要等待来决定,就是谁来完成 “等待” 这个动作。
翻了翻 Celery 2.4.7 的代码,发现在这个版本,确实只有 Thread-based timer,其代码涵盖了 目前的 timer 2 和 kombu.asynchronous.timer.Timer
大部分功能。应该是从 3.0.2 之后,把部分代码分离到了 kombu.asynchronous.timer.Timer
,实现了 Thread-less 和 Thread-based 两个不同的实现。
具体可以参见下面源码中的注释:
代码语言:javascript复制- RabbitMQ/Redis: thread-less and lock-free rate-limit implementation.
This means that rate limits pose minimal overhead when used with
RabbitMQ/Redis or future transports using the event-loop,
and that the rate-limit implementation is now thread-less and lock-free.
The thread-based transports will still use the old implementation for
now, but the plan is to use the timer also for other
broker transports in Celery 3.1.
0x03 Timer in Pool
注意,上面的是 Timer Step,是一个启动的阶段,其目的是生成 Timer 组件 给 其他组件使用,并不是 Timer 功能类。
我们其次来看看 Timer 功能类 在 线程池 Pool 中的使用,就对应了前面 Blueprint step 之中的两种不同 cases。
分别也对应了两种应用场景(或者说是线程池实现):
- gevent 和 eventlet 使用
kombu.asynchronous.timer.Timer
。 - BasePool(以及其他类型线程池)使用了
timer2.Timer。
初步来分析,gevent 和 eventlet 都是用协程来模拟线程,所以本身具有Event loop,因此使用 kombu.asynchronous.timer.Timer
也算顺理成章。
3.1 gevent 和 eventlet
对于 gevent,eventlet 这种情况,使用了 class Timer(_timer.Timer) 作为 Timer 功能类。
从代码中可以看到,class Timer 扩展了 kombu.asynchronous.timer.Timer
。
from kombu.asynchronous import timer as _timer
class Timer(_timer.Timer):
def __init__(self, *args, **kwargs):
from gevent import Greenlet, GreenletExit
class _Greenlet(Greenlet):
cancel = Greenlet.kill
self._Greenlet = _Greenlet
self._GreenletExit = GreenletExit
super().__init__(*args, **kwargs)
self._queue = set()
def _enter(self, eta, priority, entry, **kwargs):
secs = max(eta - monotonic(), 0)
g = self._Greenlet.spawn_later(secs, entry)
self._queue.add(g)
g.link(self._entry_exit)
g.entry = entry
g.eta = eta
g.priority = priority
g.canceled = False
return g
def _entry_exit(self, g):
try:
g.kill()
finally:
self._queue.discard(g)
def clear(self):
queue = self._queue
while queue:
try:
queue.pop().kill()
except KeyError:
pass
@property
def queue(self):
return self._queue
3.2 BasePool
而 BasePool 采用了 timer2 . Timer
作为 Timer 功能类。
from celery.utils import timer2
class BasePool:
"""Task pool."""
Timer = timer2.Timer
下面我们具体看看 Timer 功能类 如何实现。
0x04 kombu.Timer
4.1 异步
kombu.asynchronous.timer.Timer
实现了异步Timer。
由其注释可以,kombu.asynchronous.timer.Timer 在调用者每次得到下一次entry时,会给出tuple of (wait_seconds, entry)
,调用者应该进行等待相应时间。
即,kombu.Timer是调用者等待,普通timer是timer自己启动线程等待。
代码语言:javascript复制"""Iterate over schedule.
This iterator yields a tuple of ``(wait_seconds, entry)``,
where if entry is :const:`None` the caller should wait
for ``wait_seconds`` until it polls the schedule again.
"""
定义如下:
代码语言:javascript复制class Timer:
"""Async timer implementation."""
Entry = Entry
on_error = None
def __init__(self, max_interval=None, on_error=None, **kwargs):
self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL)
self.on_error = on_error or self.on_error
self._queue = []
4.2 调用
4.2.1 添加 timer function
用户通过 call_repeatedly 来添加 timer function。
代码语言:javascript复制def call_repeatedly(self, secs, fun, args=(), kwargs=None, priority=0):
kwargs = {} if not kwargs else kwargs
tref = self.Entry(fun, args, kwargs)
@wraps(fun)
def _reschedules(*args, **kwargs):
last, now = tref._last_run, monotonic()
lsince = (now - tref._last_run) if last else secs
try:
if lsince and lsince >= secs:
tref._last_run = now
return fun(*args, **kwargs) # 调用用户方法
finally:
if not tref.canceled:
last = tref._last_run
next = secs - (now - last) if last else secs
self.enter_after(next, tref, priority)
tref.fun = _reschedules
tref._last_run = None
return self.enter_after(secs, tref, priority)
4.2.2 调用
Timer通过apply_entry进行调用。
代码语言:javascript复制def apply_entry(self, entry):
try:
entry()
except Exception as exc:
if not self.handle_error(exc):
logger.error('Error in timer: %r', exc, exc_info=True)
在获取下一次entry时,会返回等待时间。
代码语言:javascript复制def __iter__(self, min=min, nowfun=monotonic,
pop=heapq.heappop, push=heapq.heappush):
"""Iterate over schedule.
This iterator yields a tuple of ``(wait_seconds, entry)``,
where if entry is :const:`None` the caller should wait
for ``wait_seconds`` until it polls the schedule again.
"""
max_interval = self.max_interval
queue = self._queue
while 1:
if queue:
eventA = queue[0]
now, eta = nowfun(), eventA[0]
if now < eta:
yield min(eta - now, max_interval), None
else:
eventB = pop(queue)
if eventB is eventA:
entry = eventA[2]
if not entry.canceled:
yield None, entry
continue
else:
push(queue, eventB)
else:
yield None, None
4.3 实验
我们做实验看看 timer 功能类 的 使用。
4.3.1 示例代码
下面代码来自https://github.com/liuliqiang/blog_codes/tree/master/python/celery/kombu,特此感谢。
代码语言:javascript复制def main(arguments):
hub = Hub()
exchange = Exchange('asynt')
queue = Queue('asynt', exchange, 'asynt')
def send_message(conn):
producer = Producer(conn)
producer.publish('hello world', exchange=exchange, routing_key='asynt')
print('message sent')
def on_message(message):
print('received: {0!r}'.format(message.body))
message.ack()
# hub.stop() # <-- exit after one message
conn = Connection('redis://localhost:6379')
conn.register_with_event_loop(hub)
def p_message():
print('redis://localhost:6379')
with Consumer(conn, [queue], on_message=on_message):
send_message(conn)
hub.timer.call_repeatedly(
3, p_message
)
hub.run_forever()
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
这里,Hub 就是 timer 的客户。
得到Stack如下,可以看到 hub 使用 timer 做了消息循环,于是我们需要看看 hub:
代码语言:javascript复制p_message
_reschedules, timer.py:127
__call__, timer.py:65
fire_timers, hub.py:142
create_loop, hub.py:300
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:46
<module>, testUb.py:50
启动时候的逻辑如下,hub 通过 hub.timer.call_repeatedly 设置了需要调用的用户函数 fun,在 Timer 内部,fun 被包装设置为 _reschedules。
代码语言:javascript复制 Hub
| ----------------------------------
| | kombu.asynchronous.timer.Timer |
| | |
| call_repeatedly(fun) | |
| | |
----------------------------------------------> _reschedules [@wraps(fun)] |
| | |
| | |
| | |
| ----------------------------------
|
|
v
4.3.2 Hub 的使用
以下代码是Hub类,在这里,Hub 就是 timer 的用户。
可以看到,hub 建立了message_loop。在 loop 中,hub 会:
- 使用 fire_timers 进行 timer 处理,会设置下一次 timer。
- 得到 poll_timeout 后,会进行处理或者 sleep。
下面是简化版代码。
代码语言:javascript复制def create_loop():
while 1:
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
if readers or writers:
events = poll(poll_timeout)
for fd, event in events or ():
if event & READ:
try:
cb, cbargs = readers[fd]
try:
cb(*cbargs)
except Empty:
pass
else:
# no sockets yet, startup is probably not done.
sleep(min(poll_timeout, 0.1))
yield
我们再看看 fire_timers,这就是调用用户方法。
代码语言:javascript复制def fire_timers(self, min_delay=1, max_delay=10, max_timers=10,
propagate=()):
timer = self.timer
delay = None
if timer and timer._queue:
for i in range(max_timers):
delay, entry = next(self.scheduler)
if entry is None:
break
entry()# 调用用户方法
return min(delay or min_delay, max_delay)
使用Entry调用用户方法
代码语言:javascript复制class Entry:
"""Schedule Entry."""
def __call__(self):
return self.fun(*self.args, **self.kwargs)# 调用用户方法
具体逻辑如下:
代码语言:javascript复制 --------------------------
| |
| Hub |
| |
| | | ----------------------------------
| | | | kombu.asynchronous.timer.Timer |
| | | | |
| | | call_repeatedly(fun) | |
| | | | |
| ----------------------------------------> _reschedules [@wraps(fun)] |
| | | | |
| | | | |
| | | | |
| | | ----------------------------------
| create_loop |
| | ^
| | | |
| | | |
| v | |
| | |
| ---> message_loop | |
| | | |
| | | | |
| | v | iter(self.timer) |
| | fire_timers --------------------------------------
| | |
| | | |
| | v |
| | poll |
| | |
| | | |
| | v |
| | sleep |
| | |
| | | |
| ----------- |
--------------------------
0x05 timer2
在celery/utils/timer2.py
中定义了Timer
类实例,可以看出其继承了threading.Thread,但是居然也用kombu.asynchronous.timer
。
在源码注释中有:This is only used for transports not supporting AsyncIO
。
其实,就是 timer2 自己做了一个线程来做定时sleep等待,然后调用用户方法而已。
代码语言:javascript复制from kombu.asynchronous.timer import Entry
from kombu.asynchronous.timer import Timer as Schedule
from kombu.asynchronous.timer import logger, to_timestamp
class Timer(threading.Thread): # 扩展了 线程
"""Timer thread.
Note:
This is only used for transports not supporting AsyncIO.
"""
Entry = Entry
Schedule = Schedule
running = False
on_tick = None
_timer_count = count(1)
在run方法中,会定期sleep。
代码语言:javascript复制def run(self):
try:
self.running = True
self.scheduler = iter(self.schedule)
while not self._is_shutdown.isSet():
delay = self._next_entry()
if delay:
if self.on_tick:
self.on_tick(delay)
if sleep is None: # pragma: no cover
break
sleep(delay)
try:
self._is_stopped.set()
except TypeError: # pragma: no cover
# we lost the race at interpreter shutdown,
# so gc collected built-in modules.
pass
except Exception as exc:
sys.stderr.flush()
os._exit(1)
在_next_entry方法中,调用用户方法,这是通过kombu.asynchronous.timer
完成的。
def _next_entry(self):
with self.not_empty:
delay, entry = next(self.scheduler)
if entry is None:
if delay is None:
self.not_empty.wait(1.0)
return delay
return self.schedule.apply_entry(entry)
__next__ = next = _next_entry # for 2to3
0x06 Heart
Timer 类主要是做一些定时调度方面的工作。
Heart 组件 就是使用 Timer组件 进行定期调度,发送心跳 Event,告诉其他 Worker 这个 Worker 还活着。
同时,当本worker 启动,停止时候,也发送 worker-online,worker-offline 这两种消息。
6.1 Heart in Bootstep
位置在:celery/worker/consumer/heart.py。
其作用就是启动 heart 功能类。
代码语言:javascript复制class Heart(bootsteps.StartStopStep):
"""Bootstep sending event heartbeats.
This service sends a ``worker-heartbeat`` message every n seconds.
Note:
Not to be confused with AMQP protocol level heartbeats.
"""
requires = (Events,)
def __init__(self, c,
without_heartbeat=False, heartbeat_interval=None, **kwargs):
self.enabled = not without_heartbeat
self.heartbeat_interval = heartbeat_interval
c.heart = None
super().__init__(c, **kwargs)
def start(self, c):
c.heart = heartbeat.Heart(
c.timer, c.event_dispatcher, self.heartbeat_interval,
)
c.heart.start()
def stop(self, c):
c.heart = c.heart and c.heart.stop()
shutdown = stop
6.2 Heart in Consumer
位置在:celery/worker/heartbeat.py。可以看到就是从启动之后,使用 call_repeatedly 定期发送心跳。
代码语言:javascript复制class Heart:
"""Timer sending heartbeats at regular intervals.
Arguments:
timer (kombu.asynchronous.timer.Timer): Timer to use.
eventer (celery.events.EventDispatcher): Event dispatcher
to use.
interval (float): Time in seconds between sending
heartbeats. Default is 2 seconds.
"""
def __init__(self, timer, eventer, interval=None):
self.timer = timer
self.eventer = eventer
def _send(self, event, retry=True):
return self.eventer.send(event, freq=self.interval, ...)
def start(self):
if self.eventer.enabled:
self.tref = self.timer.call_repeatedly(
self.interval, self._send, ('worker-heartbeat',),
)
此时变量为:
代码语言:javascript复制self = {Heart} <celery.worker.heartbeat.Heart object at 0x000001D377636408>
eventer = {EventDispatcher} <celery.events.dispatcher.EventDispatcher object at 0x000001D37765B308>
interval = {float} 2.0
timer = {Timer: 0} <Timer(Timer-1, stopped daemon)>
tref = {NoneType} None
_send_sent_signal = {NoneType} None
6.3 worker-online
当启动时候,发送 worker-online 消息。
代码语言:javascript复制 def start(self):
if self.eventer.enabled:
self._send('worker-online')
self.tref = self.timer.call_repeatedly(
self.interval, self._send, ('worker-heartbeat',),
)
6.4 worker-offline
当停止时候,发送 worker-offline 消息。
代码语言:javascript复制 def stop(self):
if self.tref is not None:
self.timer.cancel(self.tref)
self.tref = None
if self.eventer.enabled:
self._send('worker-offline', retry=False)
6.5 发送心跳
Heart组件会调用 eventer 来群发心跳:
- eventer 是 celery.events.dispatcher.EventDispatcher;
- 心跳是 'worker-heartbeat' 这个 Event;
所以我们下文就要分析 celery.events.dispatcher.EventDispatcher。
代码语言:javascript复制 def _send(self, event, retry=True):
if self._send_sent_signal is not None:
self._send_sent_signal(sender=self)
return self.eventer.send(event, freq=self.interval,
active=len(active_requests),
processed=all_total_count[0],
loadavg=load_average(),
retry=retry,
**SOFTWARE_INFO)