0x00 摘要
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。本文介绍 Celery 的Lamport 逻辑时钟 & Mingle。
本文为 Celery 最后一篇。接下来有几篇独立文章,然后会开一个新系列,敬请期待。
全部连接如下:
[源码分析] 消息队列 Kombu 之 mailbox
[源码分析] 消息队列 Kombu 之 Hub
[源码分析] 消息队列 Kombu 之 Consumer
[源码分析] 消息队列 Kombu 之 Producer
[源码分析] 消息队列 Kombu 之 启动过程
[源码解析] 消息队列 Kombu 之 基本架构
[源码解析] 并行分布式框架 Celery 之架构 (1)
[源码解析] 并行分布式框架 Celery 之架构 (2)
[源码解析] 并行分布式框架 Celery 之 worker 启动 (1)
[源码解析] 并行分布式框架 Celery 之 worker 启动 (2)
[源码解析] 分布式任务队列 Celery 之启动 Consumer
[源码解析] 并行分布式任务队列 Celery 之 Task是什么
[从源码学设计]celery 之 发送Task & AMQP
[源码解析] 并行分布式任务队列 Celery 之 消费动态流程
[源码解析] 并行分布式任务队列 Celery 之 多进程模型
[源码分析] 分布式任务队列 Celery 多线程模型 之 子进程
[源码分析]并行分布式任务队列 Celery 之 子进程处理消息
[源码分析] 并行分布式任务队列 Celery 之 Timer & Heartbeat
[源码解析] 并行分布式任务队列 Celery 之 EventDispatcher & Event 组件
[源码解析] 并行分布式任务队列 Celery 之 负载均衡
[源码解析] 并行分布式框架 Celery 之 容错机制
[源码解析] 并行分布式框架 Celery 之 Lamport 逻辑时钟 & Mingle
0x01 逻辑时钟
1.1 来由
分布式系统解决了传统单体架构的单点问题和性能容量问题,另一方面也带来了很多的问题,其中一个问题就是多节点的时间同步问题:不同机器上的物理时钟难以同步,导致无法区分在分布式系统中多个节点的事件时序。
1978年 Lamport 提出了逻辑时钟的概念,来解决分布式系统中区分事件发生的时序问题。
1.2 什么是逻辑时钟
逻辑时钟是为了区分现实中的物理时钟提出来的概念,一般情况下我们提到的时间都是指物理时间,但实际上很多应用中,只要所有机器有相同的时间就够了,这个时间不一定要跟实际时间相同。
更进一步,如果两个节点之间不进行交互,那么它们的时间甚至都不需要同步。因此问题的关键点在于节点间的交互要在事件的发生顺序上达成一致,而不是对于时间达成一致。
综上,逻辑时钟指的是分布式系统中用于区分事件的发生顺序的时间机制。
1.3 为什么需要逻辑时钟
时间是在现实生活中是很重要的概念,有了时间我们就能比较事情发生的先后顺序。如果是单个计算机内执行的事务,由于它们共享一个计时器,所以能够很容易通过时间戳来区分先后。同理在分布式系统中也通过时间戳的方式来区分先后行不行?
答案是NO,因为在分布式系统中的不同节点间保持它们的时钟一致是一件不容易的事情。因为每个节点的CPU都有自己的计时器,而不同计时器之间会产生时间偏移,最终导致不同节点上面的时间不一致。
那么是否可以通过某种方式来同步不同节点的物理时钟呢?答案是有的,NTP就是常用的时间同步算法,但是即使通过算法进行同步,总会有误差,这种误差在某些场景下(金融分布式事务)是不能接受的。
因此,Lamport提出逻辑时钟就是为了解决分布式系统中的时序问题,即如何定义a在b之前发生。
当且仅当事件A是由事件B引起的时候,事件A和B之间才存在一个先后关系。两个事件可以建立因果关系的前提是:两个事件之间可以用等于或小于光速的速度传递信息。 值得注意的是这里的因果关系指的是时序关系,即时间的前后,并不是逻辑上的原因和结果。
在分布式系统中,网络是不可靠的,所以我们去掉可以和速度的约束,得到两个事件可以建立因果(时序)关系的前提是:两个事件之间是否发生过信息传递。在分布式系统中,进程间通信的手段(共享内存、消息发送等)都属于信息传递。
1.4 Lamport 逻辑时钟
分布式系统中按是否存在节点交互可分为三类事件,一类发生于节点内部,二是发送事件,三是接收事件。
逻辑时钟定义
- Clock Condition:对于任意事件a, b:如果a -> b(->表示a先于b发生),那么C(a) < C(b),反之不然,因为有可能是并发事件。
- 如果a和b都是进程Pi里的事件,并且a在b之前,那么Ci(a) < Ci(b) 。
- 如果a是进程Pi里关于某消息的发送事件,b是另一进程Pj里关于该消息的接收事件,那么Ci(a) < Cj(b)
Lamport 逻辑时钟原理如下:
- 每个事件对应一个Lamport时间戳,初始值为0
- 如果事件在节点内发生,时间戳加1
- 如果事件属于发送事件,时间戳加1并在消息中带上该时间戳
- 如果事件属于接收事件,时间戳 = Max(本地时间戳,消息中的时间戳) 1
假设有事件a、b,C(a)、C(b)分别表示事件a、b对应的Lamport时间戳,如果a发生在b之前(happened before),记作 a -> b,则有C(a) < C(b),例如图中有 C1 -> B1,那么 C(C1) < C(B1)。通过该定义,事件集中Lamport时间戳不等的事件可进行比较,我们获得事件的偏序关系(partial order)。注意:如果C(a) < C(b),并不能说明a -> b,也就是说C(a) < C(b)是a -> b的必要不充分条件
如果C(a) = C(b),那a、b事件的顺序又是怎样的?值得注意的是当C(a) = C(b)的时候,它们肯定不是因果关系,所以它们之间的先后其实并不会影响结果,我们这里只需要给出一种确定的方式来定义它们之间的先后就能得到全序关系。注意:Lamport逻辑时钟只保证因果关系(偏序)的正确性,不保证绝对时序的正确性。
0x02 Lamport 时钟 in Kombu
在 Kombu 中,就有 Lamport 时钟 的实现。
具体定义如下,我们可以知道:
- 当发送消息时候,使用 forward API 来增加时钟;
- 当收到消息时候,使用 adjust 来调整本地时钟;
class LamportClock:
"""Lamport's logical clock.
A Lamport logical clock is a monotonically incrementing software counter
maintained in each process. It follows some simple rules:
* A process increments its counter before each event in that process;
* When a process sends a message, it includes its counter value with
the message;
* On receiving a message, the receiver process sets its counter to be
greater than the maximum of its own value and the received value
before it considers the message received.
Conceptually, this logical clock can be thought of as a clock that only
has meaning in relation to messages moving between processes. When a
process receives a message, it resynchronizes its logical clock with
the sender.
*Usage*
When sending a message use :meth:`forward` to increment the clock,
when receiving a message use :meth:`adjust` to sync with
the time stamp of the incoming message.
"""
#: The clocks current value.
value = 0
def __init__(self, initial_value=0, Lock=Lock):
self.value = initial_value
self.mutex = Lock()
def adjust(self, other):
with self.mutex:
value = self.value = max(self.value, other) 1
return value
def forward(self):
with self.mutex:
self.value = 1
return self.value
def sort_heap(self, h):
if h[0][0] == h[1][0]:
same = []
for PN in zip(h, islice(h, 1, None)):
if PN[0][0] != PN[1][0]:
break # Prev and Next's clocks differ
same.append(PN[0])
# return first item sorted by process id
return sorted(same, key=lambda event: event[1])[0]
# clock values unique, return first item
return h[0]
def __str__(self):
return str(self.value)
def __repr__(self):
return f'<LamportClock: {self.value}>'
0x03 使用 clock
3.1 Kombu mailbox
比如在 Kombu mailbox 之中,发送时候就需要携带本地的clock。
代码语言:javascript复制producer.publish(
reply, exchange=exchange, routing_key=routing_key,
declare=[exchange], headers={
'ticket': ticket, 'clock': self.clock.forward(),
}, retry=True,
**opts
)
在收到消息时,就相应调整本地时钟
代码语言:javascript复制def _collect(self, ticket,
limit=None, timeout=1, callback=None,
channel=None, accept=None):
adjust_clock = self.clock.adjust
def on_message(body, message):
header = message.headers.get
adjust_clock(header('clock') or 0)
3.2 Celery 应用
Celery 应用本身就有一个 LamportClock 变量。
代码语言:javascript复制class Celery:
self.clock = LamportClock()
3.3 EventDispatcher
在 EventDispatcher 发送 Event 时候,就会使用 LamportClock 的时钟。
代码语言:javascript复制def publish(self, type, fields, producer,
blind=False, Event=Event, **kwargs):
clock = None if blind else self.clock.forward()
event = Event(type, hostname=self.hostname, utcoffset=utcoffset(),
pid=self.pid, clock=clock, **fields)
with self.mutex:
return self._publish(event, producer,
routing_key=type.replace('-', '.'), **kwargs)
0x04 Mingle
在 Celery 的介绍中,Mingle 主要用在启动或者重启的时候,它会和其他的 worker 交互,从而进行同步。同步的数据有:
- 其他 worker 的 clock
- 其他 worker 已经处理掉的 tasks
同步 clock 比较好理解,但是为什么要同步 其他worker已经处理完的 task 呢?因为这个场景是启动或者重启。
如果我们在 Celery 之中设置一个节点为task_acks_late=True
之后,那么这个节点上正在执行的任务若是遇到断电,运行中被结束等情况,这些任务会被重新分发到其他节点进行重试。
所以当某个节点重启期间,可能本来由本 worker 负责的 task 会已经被其他 worker 处理掉,为了避免重复处理,就需要同步一下。
4.1 定义
Mingle 定义如下:
代码语言:javascript复制class Mingle(bootsteps.StartStopStep):
"""Bootstep syncing state with neighbor workers.
At startup, or upon consumer restart, this will:
- Sync logical clocks.
- Sync revoked tasks.
"""
label = 'Mingle'
requires = (Events,)
compatible_transports = {'amqp', 'redis'}
def start(self, c):
self.sync(c)
4.2 Sync 过程
启动即同步,代码逻辑如下:
- Mingle 向 每一个 Worker 发送 hello
- 每个 Worker 都向 Mingle 回复自己的信息(clock 和 tasks)
- Mingle 更新自己的信息
这需要注意的是:没有回调函数,直接 send_hello 就返回了其他 worker 的结果,这是用异步来模拟的一个同步过程。
而 在 send_hello返回时候,因为这时候收到了所有 worker 的回复,也包括自己,所以需要把自己host对应的回复删除。
对应代码如下:
代码语言:javascript复制 def sync(self, c):
replies = self.send_hello(c)
if replies:
[self.on_node_reply(c, nodename, reply)
for nodename, reply in replies.items() if reply]
else:
info('mingle: all aone')
4.2.1 发起同步
首先,Mingle 会向 每一个 Worker 发送 hello。
代码语言:javascript复制 def send_hello(self, c):
inspect = c.app.control.inspect(timeout=1.0, connection=c.connection)
our_revoked = c.controller.state.revoked
replies = inspect.hello(c.hostname, our_revoked._data) or {}
replies.pop(c.hostname, None) # delete my own response
return replies
此时相关变量如下:
代码语言:javascript复制c.controller.state = {module} <module 'celery.worker.state' >
c.controller.state.revoked = {LimitedSet: 0} <LimitedSet(0): maxlen=50000, expires=10800, minlen=0>
c.controller = {Worker} celery@DESKTOP-0GO3RPO
c = {Consumer}
4.2.1.1 revoked task
我们可以看到,Mingle 会从 c.controller.state.revoked
之中获取 内容,即 当前 worker 记录的已被完成的 tasks。然后发送给其他 worker。
4.2.1.2 inspect.hello
这里是使用了 celery.app.control.Control 的 inspect 功能进行广播发送。
代码语言:javascript复制 def _request(self, command, **kwargs):
return self._prepare(self.app.control.broadcast(
command,
arguments=kwargs,
destination=self.destination,
callback=self.callback,
connection=self.connection,
limit=self.limit,
timeout=self.timeout, reply=True,
pattern=self.pattern, matcher=self.matcher,
))
4.2.2 其他worker 回复
celery.app.control.Control 之中,会使用 _prepare 来处理其他 worker 的返回。
代码语言:javascript复制 def _prepare(self, reply):
if reply:
by_node = flatten_reply(reply)
if (self.destination and
not isinstance(self.destination, (list, tuple))):
return by_node.get(self.destination)
if self.pattern:
pattern = self.pattern
matcher = self.matcher
return {node: reply for node, reply in by_node.items()
if match(node, pattern, matcher)}
return by_node
4.2.3 收到后同步
在收到其他worker回复之后会进行同步,我们可以看到其同步了时钟 和 tasks。
具体 task 的更新,是由 state 完成的。
代码语言:javascript复制 def sync_with_node(self, c, clock=None, revoked=None, **kwargs):
self.on_clock_event(c, clock)
self.on_revoked_received(c, revoked)
def on_clock_event(self, c, clock):
c.app.clock.adjust(clock) if clock else c.app.clock.forward()
def on_revoked_received(self, c, revoked):
if revoked:
c.controller.state.revoked.update(revoked)
4.2.4 如何使用 revoked
当发布任务时候,如果发现该任务已经被设置为 revoked,则不会发布该任务。
代码语言:javascript复制def default(task, app, consumer,
info=logger.info, error=logger.error, task_reserved=task_reserved,
to_system_tz=timezone.to_system, bytes=bytes,
proto1_to_proto2=proto1_to_proto2):
"""Default task execution strategy.
Note:
Strategies are here as an optimization, so sadly
it's not very easy to override.
"""
.....
revoked_tasks = consumer.controller.state.revoked
def task_message_handler(message, body, ack, reject, callbacks,
to_timestamp=to_timestamp):
......
if (req.expires or req.id in revoked_tasks) and req.revoked():
return
......
if callbacks:
[callback(req) for callback in callbacks]
handle(req)
return task_message_handler
0xFF 参考
分布式系统:Lamport 逻辑时钟
5: 远程控制管理
6: Events 的实现
7: Worker 之间的交互
8: State 和 Result