[源码分析] 消息队列 Kombu 之 mailbox
0x00 摘要
本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 中的 mailbox 概念,顺便可以把之前几篇文章内容再次梳理下。
0x01 示例代码
本文实例代码来自 https://liqiang.io/post/celery-source-analysis-remote-manager-control,深表感谢。
示例代码分为两部分˛
Node可以理解为广播Consumer。Client可以认为是广播发起者。
1.1 Node
代码语言:javascript复制import sys
import kombu
from kombu import pidbox
hostname = "localhost"
connection = kombu.Connection('redis://localhost:6379')
mailbox = pidbox.Mailbox("testMailbox", type="direct")
node = mailbox.Node(hostname, state={"a": "b"})
node.channel = connection.channel()
def callback(body, message):
print(body)
print(message)
def main(arguments):
consumer = node.listen(callback=callback)
try:
while True:
print('Consumer Waiting')
connection.drain_events()
finally:
consumer.cancel()
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
1.2 client
代码语言:javascript复制import sys
import kombu
from kombu import pidbox
def callback():
print("callback")
def main(arguments):
connection = kombu.Connection('redis://localhost:6379')
mailbox = pidbox.Mailbox("testMailbox", type="direct")
bound = mailbox(connection)
bound._broadcast("print_msg", {'msg': 'Message for you'})
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
0x02 核心思路
广播功能是利用了Redis的 pubSub 机制完成。
2.1 Redis PubSub
为了支持消息多播,Redis单独使用了一个模块来支持消息多播,也就是PubSub。
Redis作为消息发布和订阅之间的服务器,起到桥梁的作用,在Redis里面有一个channel的概念,也就是频道,发布者通过指定发布到某个频道,只要有订阅者订阅了该频道,该消息就会发送给订阅者。
消费者可以启动多个,PubSub会保证它们收到的都是相同的消息序列。
2.2 概述
在 Kombu 的 mailbox 实现中,分为 Consumer 和 Producer两部分。
Consumer 模块,在 Kombu 的 Channel 类中,当注册 listener 时候,实际就是利用了 redis 驱动的 PubSub功能,把 consumer 注册订阅到了一个 key 上。从而 Consumer 的 queue 和 回调函数 就通过 Channel 与 redis 联系起来。这样后续就可以从 Redis 读取消息。
代码语言:javascript复制psubscribe, client.py:3542
_subscribe, redis.py:664
_register_LISTEN, redis.py:322
get, redis.py:375
drain_events, base.py:960
drain_events, connection.py:318
main, node.py:24
<module>, node.py:29
0x03 Consumer
下面我们就依据示例代码,一步一步剖析如何完成广播功能。
3.1 建立Connection
当完成以下代码之后,系统建立了Connection。
代码语言:javascript复制connection = kombu.Connection('redis://localhost:6379')
具体如下图,我们把问题域分为用户领域和Kombu领域两部分,以便大家理解:
代码语言:javascript复制user scope kombu scope
|
|
------------ | --------------------------------------
| connection | ----------------------> | Connection: redis://localhost:6379// |
------------ | --------------------------------------
|
|
|
|
|
3.2 建立mailbox
当完成以下代码之后,系统建立了Connection和mailbox。
代码语言:javascript复制connection = kombu.Connection('redis://localhost:6379')
mailbox = pidbox.Mailbox("testMailbox", type="fanout")
但是此时两者没有建立联系,而mailbox的某些成员变量也没有实际含义。
mailbox变量举例如下:
代码语言:javascript复制mailbox = {Mailbox} <kombu.pidbox.Mailbox object at 0x7fea4b81df28>
accept = {list: 1} ['json']
clock = {LamportClock} 0
connection = {NoneType} None
exchange = {Exchange} Exchange testMailbox.pidbox(fanout)
exchange_fmt = {str} '%s.pidbox'
namespace = {str} 'testMailbox'
node_cls = {type} <class 'kombu.pidbox.Node'>
oid = {str} '9386a23b-ae96-3c6c-b036-ae7646455ebb'
producer_pool = {NoneType} None
queue_expires = {NoneType} None
queue_ttl = {NoneType} None
reply_exchange = {Exchange} Exchange reply.testMailbox.pidbox(direct)
reply_exchange_fmt = {str} 'reply.%s.pidbox'
reply_queue = {Queue} <unbound Queue 9386a23b-ae96-3c6c-b036-ae7646455ebb.reply.testMailbox.pidbox -> <unbound Exchange reply.testMailbox.pidbox(direct)> -> 9386a23b-ae96-3c6c-b036-ae7646455ebb>
reply_queue_expires = {float} 10.0
reply_queue_ttl = {NoneType} None
serializer = {NoneType} None
type = {str} 'fanout'
unclaimed = {defaultdict: 0} defaultdict(<class 'collections.deque'>, {})
此时逻辑如下:
代码语言:javascript复制user scope kombu scope
|
|
------------ | --------------------------------------
| Connection|-----------> | Connection: redis://localhost:6379// |
------------ | --------------------------------------
|
|
| ----------------------------
| | Exchange |
| -------------------------- --> | |
--------- | | Mailbox | | | testMailbox.pidbox(fanout) |
| mailbox|--------------> | | | ----------------------------
--------- | | | |
| | exchange ----------- ---------------------------------
| | | | Exchange |
| | reply_exchange --------> | |
| | | | reply.testMailbox.pidbox(direct)|
| | reply_queue --------- ------------------- -------------
| | | | ^
| | | | -------- |
| -------------------------- --> | Queue ----------
| --------
|
3.3 建立Node
当完成以下代码之后,系统建立了Connection,mailbox和node。
Node是mailbox中的概念,可以理解为是具体的邮箱。
代码语言:javascript复制connection = kombu.Connection('redis://localhost:6379')
mailbox = pidbox.Mailbox("testMailbox", type="fanout")
node = mailbox.Node(hostname, state={"a": "b"})
node变量举例如下:
代码语言:javascript复制node = {Node} <kombu.pidbox.Node object at 0x7fea4b8bffd0>
channel = {NoneType} None
handlers = {dict: 0} {}
hostname = {str} 'localhost'
mailbox = {Mailbox} <kombu.pidbox.Mailbox object at 0x7fea4b81df28>
state = {dict: 1} {'a': 'b'}
逻辑如下图:
代码语言:javascript复制user scope
|
|
------------ | --------------------------------------
| Connection|-----------> | Connection: redis://localhost:6379// |
------------ | --------------------------------------
|
|
| ----------------------------
| | Exchange |
| ------------------------------ --> | |
--------- | | Mailbox | | | testMailbox.pidbox(fanout) |
| mailbox|--------------> | | | ----------------------------
--------- | | | |
| | exchange --------------- ---------------------------------
| | | | Exchange |
| | reply_exchange ------------> | |
| | | | reply.testMailbox.pidbox(direct)|
| | reply_queue ------------- ------------------- -------------
| | | | ^
| | | | -------- |
| ------------------------- ---- --> | Queue ----------
| ^ --------
| |
| --------------------- |
----- | | | |
|node | ----------------> Node channel | |
----- | | | |
| | mailbox -----
| | |
---------------------
3.4 建立channel
经过如下代码之后,这才建立channel。
代码语言:javascript复制connection = kombu.Connection('redis://localhost:6379')
mailbox = pidbox.Mailbox("testMailbox", type="fanout")
node = mailbox.Node(hostname, state={"a": "b"})
node.channel = connection.channel()
3.4.1 联系
这里关于channel,Connection 与 Transport 的联系解释如下:
- Connection:对 MQ 连接的抽象,一个 Connection 就对应一个 MQ 的连接;Connection 是 AMQP 对 连接的封装;
- Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连接;Channel 是 AMQP 对 MQ 的操作的封装;
- Transport:kombu 支持将不同的消息中间件以插件的方式进行灵活配置,使用transport这个术语来表示一个具体的消息中间件,可以认为是对broker的抽象:
- 对 MQ 的操作必然离不开连接,但是,Kombu 并不直接让 Channel 使用 Connection 来发送/接受请求,而是引入了一个新的抽象 Transport,Transport 负责具体的 MQ 的操作,也就是说 Channel 的操作都会落到 Transport 上执行。引入transport这个抽象概念可以使得后续添加对non-AMQP的transport非常简单;
- Transport是真实的 MQ 连接,也是真正连接到 MQ(redis/rabbitmq) 的实例,区分底层消息队列的实现;
- 当前Kombu中build-in支持有Redis、Beanstalk、Amazon SQS、CouchDB,、MongoDB,、ZeroMQ,、ZooKeeper、SoftLayer MQ和Pyro;
3.4.2 Channel
在 Transport 有两个channels 列表:
代码语言:javascript复制self._avail_channels
self.channels
如果_avail_channels 有内容则直接获取,否则生成一个新的Channel。
在真正连接时候,会调用 establish_connection 放入self._avail_channels。
代码语言:javascript复制def establish_connection(self):
# creates channel to verify connection.
# this channel is then used as the next requested channel.
# (returned by ``create_channel``).
self._avail_channels.append(self.create_channel(self))
return self # for drain events
Channel关键初始化代码如下:
代码语言:javascript复制class Channel(virtual.Channel):
"""Redis Channel."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._queue_cycle = cycle_by_name(self.queue_order_strategy)()
self.Client = self._get_client()
self.active_fanout_queues = set()
self.auto_delete_queues = set()
self._fanout_to_queue = {}
self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}
self.connection.cycle.add(self) # add to channel poller. # 加入消息循环
if register_after_fork is not None:
register_after_fork(self, _after_fork_cleanup_channel)
3.4.3 MultiChannelPoller
MultiChannelPoller 定义如下,可以理解为执行engine,主要作用是:
- 收集channel;
- 建立fd到channel的映射;
- 建立channel到socks的映射;
- 使用poll;
class MultiChannelPoller:
def __init__(self):
# active channels
self._channels = set()
# file descriptor -> channel map.
self._fd_to_chan = {}
# channel -> socket map
self._chan_to_sock = {}
# poll implementation (epoll/kqueue/select)
self.poller = poll()
# one-shot callbacks called after reading from socket.
self.after_read = set()
def add(self, channel):
self._channels.add(channel)
最后Channel变量举例如下:
代码语言:javascript复制self = {Channel} <kombu.transport.redis.Channel object at 0x7ffc6d9c5fd0>
Client = {type} <class 'redis.client.Redis'>
Message = {type} <class 'kombu.transport.virtual.base.Message'>
QoS = {type} <class 'kombu.transport.redis.QoS'>
active_fanout_queues = {set: 0} set()
active_queues = {set: 0} set()
async_pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
auto_delete_queues = {set: 0} set()
body_encoding = {str} 'base64'
channel_id = {int} 1
client = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
connection = {Transport} <kombu.transport.redis.Transport object at 0x7ffc6d9c5f60>
cycle = {FairCycle} <FairCycle: 0/0 []>
exchange_types = {dict: 3} {'direct': <kombu.transport.virtual.exchange.DirectExchange object at 0x7ffc6d9c5f98>, 'topic': <kombu.transport.virtual.exchange.TopicExchange object at 0x7ffc6d9c5d68>, 'fanout': <kombu.transport.virtual.exchange.FanoutExchange object at 0x7ffc6d9d2b70>}
handlers = {dict: 2} {'BRPOP': <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7ffc6d9c5fd0>>, 'LISTEN': <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7ffc6d9c5fd0>>}
health_check_interval = {int} 25
keyprefix_fanout = {str} '/0.'
keyprefix_queue = {str} '_kombu.binding.%s'
pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
priority_steps = {list: 4} [0, 3, 6, 9]
qos = {QoS} <kombu.transport.redis.QoS object at 0x7ffc6d9fbc88>
queue_order_strategy = {str} 'round_robin'
state = {BrokerState} <kombu.transport.virtual.base.BrokerState object at 0x7ffc6d969e10>
subclient = {PubSub} <redis.client.PubSub object at 0x7ffc6d9fbd68>
最后Transport变量举例如下:
代码语言:javascript复制connection = {Transport} <kombu.transport.redis.Transport object at 0x7ffc6d9c5f60>
Channel = {type} <class 'kombu.transport.redis.Channel'>
Cycle = {type} <class 'kombu.utils.scheduling.FairCycle'>
Management = {type} <class 'kombu.transport.virtual.base.Management'>
channels = {list: 1} [<kombu.transport.redis.Channel object at 0x7ffc6da8c748>]
client = {Connection} <Connection: redis://localhost:6379// at 0x7ffc6d5f0e80>
connection_errors = {tuple: 8} (<class 'amqp.exceptions.ConnectionError'>, <class 'kombu.exceptions.InconsistencyError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>, <class 'redis.exceptions.ConnectionError'>, <class 'redis.exceptions.AuthenticationError'>, <class 'redis.exceptions.TimeoutError'>)
cycle = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7ffc6d9d2198>
after_read = {set: 0} set()
eventflags = {int} 25
fds = {dict: 0} {}
poller = {_poll} <kombu.utils.eventio._poll object at 0x7ffc6d9d21d0>
driver_name = {str} 'redis'
driver_type = {str} 'redis'
implements = {Implements: 3} {'asynchronous': True, 'exchange_type': frozenset({'direct', 'fanout', 'topic'}), 'heartbeats': False}
manager = {Management} <kombu.transport.virtual.base.Management object at 0x7ffc6da8c6d8>
state = {BrokerState} <kombu.transport.virtual.base.BrokerState object at 0x7ffc6d969e10>
此时逻辑如下:
代码语言:javascript复制 ----------------------- -----------------------
| Transport | | MultiChannelPoller |
user scope | | | |
| -------------------------------------- | cycle -------> | _channels ----
| | | | | ----------------------- |
------------ | | Connection: redis://localhost:6379// | | channels -------- |
| Connection|-----------> | | | | | |
------------ | | | | _avail_channels --------- |
| | connection -------> | | | |
| | | ----------------------- | v
| -------------------------------------- | ----------------- ---
| -----> Channel | -----------
| ---------------------------- | cycle ------> | FairCycle |
| | Exchange | | | | |
| ------------------------------ --> | | | | -----------
--------- | | Mailbox | | | testMailbox.pidbox(fanout) | | handlers -----
| mailbox|--------------> | | | ---------------------------- ---- ---------------- |
--------- | | | | ^ |
| | exchange --------------- --------------------------------- | v
| | | | Exchange | | --------------- ---------------
| | reply_exchange ------------> | | | | 'BRPOP': Channel._brpop_read |
| | | | reply.testMailbox.pidbox(direct)| | | |
| | reply_queue ------------- ------------------- ------------- | | 'LISTEN': Channel._receive |
| | | | ^ | | |
| | | | -------- | | -------------------------------
| ------------------------- ---- --> | Queue ---------- |
| ^ -------- |
| | |
| --------------------- | |
----- | | | | |
|node | ----------------> Node channel -------------------------------------------------------------------
----- | | | |
| | mailbox -----
| | |
---------------------
3.5 建立 Consumer
如下代码建立一个Consumer,也建立了对应的Queue。就是说,广播还是需要依赖Consumer完成,或者说是借助Consumer功能。
代码语言:javascript复制def main(arguments):
consumer = node.listen(callback=callback)
listen代码如下:
代码语言:javascript复制def listen(self, channel=None, callback=None):
consumer = self.Consumer(channel=channel,
callbacks=[callback or self.handle_message],
on_decode_error=self.on_decode_error)
consumer.consume()
return consumer
此时对应Queue变量如下:
代码语言:javascript复制queue = {Queue} <unbound Queue localhost.testMailbox.pidbox -> <unbound Exchange testMailbox.pidbox(fanout)> -> >
ContentDisallowed = {type} <class 'kombu.exceptions.ContentDisallowed'>
alias = {NoneType} None
auto_delete = {bool} True
binding_arguments = {NoneType} None
bindings = {set: 0} set()
can_cache_declaration = {bool} False
channel = {str} 'line 178, in _getPyDictionaryn attr = getattr(var, n)n File "
consumer_arguments = {NoneType} None
durable = {bool} False
exchange = {Exchange} Exchange testMailbox.pidbox(fanout)
逻辑如下:
代码语言:javascript复制 ----------------------- -----------------------
| Transport | | MultiChannelPoller |
user scope | | | |
| -------------------------------------- | cycle -------> | _channels ----
| | | | | ----------------------- |
------------ | | Connection: redis://localhost:6379// | | channels -------- |
| Connection|-----------> | | | | | |
------------ | | | | _avail_channels --------- |
| | connection -------> | | | |
| | | ----------------------- | v
| -------------------------------------- | ----------------- ---
| -----> Channel | -----------
| ---------------------------- | cycle ------> | FairCycle |
| | Exchange | | | | |
| ------------------------------ --> | | <----- | | -----------
--------- | | Mailbox | | | testMailbox.pidbox(fanout) | | | handlers -----
| mailbox|--------------> | | | ---------------------------- | - -- ---------------- |
--------- | | | | | ^ ^ |
| | exchange --------------- --------------------------------- | | | v
| | | | Exchange | | | | --------------- ---------------
| | reply_exchange ------------> | | | | | | 'BRPOP': Channel._brpop_read |
| | | | reply.testMailbox.pidbox(direct)| | | | | |
| | reply_queue ------------- ------------------- ------------- | | | | 'LISTEN': Channel._receive |
| | | | ^ | | | | |
| | | | -------- | | | | -------------------------------
| ------------------------- ---- --> | Queue ---------- | | |
| ^ -------- | | |
| | | | |
| --------------------- | | | |
----- | | | | | | |
|node | ----------------> Node channel -------------------------------------------------------------------
----- | | | | | |
| | mailbox ----- | |
| | | ----------------------------------------------------
| --------------------- | |
| | |
| | |
| ------------------------ | -----------------------------------------------------------------------------
---------- | | | | | Queue | |
| consumer | | | Consumer channel ------- | |
---------- | | | | exchange |
| | queues ---------------> | |
| | | | |
----------- | | callbacks | | <localhost.testMailbox.pidbox -> Exchange testMailbox.pidbox(fanout)> |
| callback | | | | | |
------ ---- | ------------------------ -----------------------------------------------------------------------------
^ | |
| | |
--------------------------------------
|
3.5.1 binding 写入Redis
此时会把binding关系写入Redis,这样后续就可以利用这个binding来进行路由。
具体堆栈如下:
代码语言:javascript复制sadd, client.py:2243
_queue_bind, redis.py:817
queue_bind, base.py:568
bind_to, entity.py:674
queue_bind, entity.py:662
_create_queue, entity.py:617
declare, entity.py:606
declare, messaging.py:417
revive, messaging.py:404
__init__, messaging.py:382
Consumer, pidbox.py:78
listen, pidbox.py:91
main, node.py:20
<module>, node.py:29
逻辑如图,此时出现了Redis。
代码语言:javascript复制 user scope Kombu ----------------------- ----------------------- redis
| | Transport | | MultiChannelPoller | |
| | | | | |
| -------------------------------------- | cycle -------> | _channels ---- |
| | | | | ----------------------- | |
------------ | | Connection: redis://localhost:6379// | | channels -------- | |
| Connection|-----------> | | | | | | |
------------ | | | | _avail_channels --------- | |
| | connection -------> | | | | |
| | | ----------------------- | v |
| -------------------------------------- | ----------------- --- |
| -----> Channel | ----------- |
| ---------------------------- | cycle ------> | FairCycle | |
| | Exchange | | | | | |
| ------------------------------ --> | | <----- | | ----------- |
--------- | | Mailbox | | | testMailbox.pidbox(fanout) | | | handlers ----- |
| mailbox|--------------> | | | ---------------------------- | - -- ---------------- | |
--------- | | | | | ^ ^ | |
| | exchange --------------- --------------------------------- | | | v |
| | | | Exchange | | | | --------------- --------------- |
| | reply_exchange ------------> | | | | | | 'BRPOP': Channel._brpop_read | |
| | | | reply.testMailbox.pidbox(direct)| | | | | | |
| | reply_queue ------------- ------------------- ------------- | | | | 'LISTEN': Channel._receive | |
| | | | ^ | | | | | |
| | | | -------- | | | | ------------------------------- |
| ------------------------- ---- --> | Queue ---------- | | | |
| ^ -------- | | | |
| | | | | | ----------------------------------------------------
| --------------------- | | | | | | _kombu.binding.testMailbox.pidbox |
----- | | | | | | | | | |
|node | ----------------> Node channel ------------------------------------------------------------------- | | |
----- | | | | | | | | "x06x16x06x16localhost.testMailbox.pidbox" |
| | mailbox ----- | | | | |
| | | ---------------------------------------------------- | --------- ------------------------------------------
| --------------------- | | | ^
| | | | |
| | | | |
| ------------------------ | ----------------------------------------------------------------------------- | |
---------- | | | | | Queue | | | |
| consumer | | | Consumer channel ------- | | | |
---------- | | | | exchange | | |
| | queues ---------------> | | | |
| | | | | -------------------
----------- | | callbacks | | <localhost.testMailbox.pidbox -> Exchange testMailbox.pidbox(fanout)> | |
| callback | | | | | | |
------ ---- | ------------------------ ----------------------------------------------------------------------------- |
^ | | |
| | | |
--------------------------------------
|
手机如下:
3.5.2 配置
代码来到了kombu/transport/virtual/base.py,这里工作如下:
- 把 consumer 的 queue 加入到 Channel;
- 把回调函数加入到 Channel;
- 把 Consumer 加入循环;
这样,Comuser 的 queue 和 回调函数 就通过 Channel 联系起来。
代码如下:
代码语言:javascript复制 def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
"""Consume from `queue`."""
self._tag_to_queue[consumer_tag] = queue
self._active_queues.append(queue)
def _callback(raw_message):
message = self.Message(raw_message, channel=self)
if not no_ack:
self.qos.append(message, message.delivery_tag)
return callback(message)
self.connection._callbacks[queue] = _callback
self._consumers.add(consumer_tag)
self._reset_cycle()
调用堆栈如下:
代码语言:javascript复制basic_consume, base.py:635
basic_consume, redis.py:598
consume, entity.py:738
_basic_consume, messaging.py:594
consume, messaging.py:473
listen, pidbox.py:92
main, node.py:20
<module>, node.py:29
此时依然在Channel
代码语言:javascript复制self = {Channel} <kombu.transport.redis.Channel object at 0x7fc252239908>
Client = {type} <class 'redis.client.Redis'>
Message = {type} <class 'kombu.transport.virtual.base.Message'>
active_fanout_queues = {set: 1} {'localhost.testMailbox.pidbox'}
active_queues = {set: 0} set()
async_pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
auto_delete_queues = {set: 1} {'localhost.testMailbox.pidbox'}
body_encoding = {str} 'base64'
channel_id = {int} 1
client = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
closed = {bool} False
codecs = {dict: 1} {'base64': <kombu.transport.virtual.base.Base64 object at 0x7fc25218f5c0>}
connection = {Transport} <kombu.transport.redis.Transport object at 0x7fc2522295f8>
cycle = {FairCycle} <FairCycle: 0/1 ['localhost.testMailbox.pidbox']>
deadletter_queue = {NoneType} None
default_priority = {int} 0
do_restore = {bool} True
exchange_types = {dict: 3} {'direct': <kombu.transport.virtual.exchange.DirectExchange object at 0x7fc252239fd0>, 'topic': <kombu.transport.virtual.exchange.TopicExchange object at 0x7fc252239f60>, 'fanout': <kombu.transport.virtual.exchange.FanoutExchange object at 0x7fc252239f28>}
handlers = {dict: 2} {'BRPOP': <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7fc252239908>>, 'LISTEN': <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7fc252239908>>}
keyprefix_fanout = {str} '/0.'
keyprefix_queue = {str} '_kombu.binding.%s'
pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
priority_steps = {list: 4} [0, 3, 6, 9]
qos = {QoS} <kombu.transport.redis.QoS object at 0x7fc252264320>
queue_order_strategy = {str} 'round_robin'
state = {BrokerState} <kombu.transport.virtual.base.BrokerState object at 0x7fc25218f6a0>
subclient = {PubSub} <redis.client.PubSub object at 0x7fc252264400>
具体循环如下:
代码语言:javascript复制def _reset_cycle(self):
self._cycle = FairCycle(
self._get_and_deliver, self._active_queues, Empty)
FairCycle定义如下:
代码语言:javascript复制class FairCycle:
"""Cycle between resources.
Consume from a set of resources, where each resource gets
an equal chance to be consumed from.
Arguments:
fun (Callable): Callback to call.
resources (Sequence[Any]): List of resources.
predicate (type): Exception predicate.
"""
def __init__(self, fun, resources, predicate=Exception):
self.fun = fun
self.resources = resources
self.predicate = predicate
self.pos = 0
def _next(self):
while 1:
try:
resource = self.resources[self.pos]
self.pos = 1
return resource
except IndexError:
self.pos = 0
if not self.resources:
raise self.predicate()
def get(self, callback, **kwargs):
"""Get from next resource."""
for tried in count(0): # for infinity
resource = self._next()
try:
return self.fun(resource, callback, **kwargs)
except self.predicate:
# reraise when retries exchausted.
if tried >= len(self.resources) - 1:
raise
回调函数如下:
代码语言:javascript复制fun = {method} <bound method AbstractChannel._get_and_deliver of <kombu.transport.redis.Channel object at 0x7fc252239908>>
resources = {list: 1} ['localhost.testMailbox.pidbox']
逻辑如下:
代码语言:javascript复制 user scope Kombu ----------------------- ----------------------- redis
| | Transport | | MultiChannelPoller | |
| | | | | |
| -------------------------------------- | cycle -------> | _channels ---- |
| | | | | ----------------------- | |
------------ | | Connection: redis://localhost:6379// | | channels -------- v |
| Connection|-----------> | | | | | ----------------- --- |
------------ | | | | _avail_channels --------- | Channel | <------------ |
| | connection -------> | | | | | | |
| | | ----------------------- | | _active_queues ------------------------ |
| -------------------------------------- | | | | | |
| -----> | --------- - | |
| ---------------------------- | cycle ------> | FairCycle | | |
| | Exchange | | | | | | |
| ------------------------------ --> | | <----- | | ----------- | |
--------- | | Mailbox | | | testMailbox.pidbox(fanout) | | | handlers ----- | |
| mailbox|--------------> | | | ---------------------------- | - -- ---------------- | | |
--------- | | | | | ^ ^ | | |
| | exchange --------------- --------------------------------- | | | v | |
| | | | Exchange | | | | --------------- --------------- | |
| | reply_exchange ------------> | | | | | | 'BRPOP': Channel._brpop_read | | |
| | | | reply.testMailbox.pidbox(direct)| | | | | | | |
| | reply_queue ------------- ------------------- ------------- | | | | 'LISTEN': Channel._receive | | |
| | | | ^ | | | | | | |
| | | | -------- | | | | ------------------------------- | |
| ------------------------- ---- --> | Queue ---------- | | | | |
| ^ -------- | | | | |
| | | | | | | ----------------------------------------------------
| --------------------- | | | | | | | _kombu.binding.testMailbox.pidbox |
----- | | | | | | | | | | |
|node | ----------------> Node channel ------------------------------------------------------------------- | | | |
----- | | | | | | | | | "x06x16x06x16localhost.testMailbox.pidbox" |
| | mailbox ----- | | | | | |
| | | ---------------------------------------------------- | | --------- ------------------------------------------
| --------------------- | | | | ^
| | | | | |
| | | | | |
| ------------------------ | ----------------------------------------------------------------------------- | | |
---------- | | | | | Queue | | | | |
| consumer | | | Consumer channel ------- | | <----- | |
---------- | | | | exchange | | |
| | queues ---------------> | | | |
| | | | | ------------------------
----------- | | callbacks | | <localhost.testMailbox.pidbox -> Exchange testMailbox.pidbox(fanout)> | |
| callback | | | | | | |
------ ---- | ------------------------ ----------------------------------------------------------------------------- |
^ | | |
| | | |
--------------------------------------
|
手机如下
3.5.3 配置负载均衡
回到 Channel 类,这里最后会配置负载均衡,就是具体下一次使用哪一个 Queue 的消息。
代码语言:javascript复制def basic_consume(self, queue, *args, **kwargs):
if queue in self._fanout_queues:
exchange, _ = self._fanout_queues[queue]
self.active_fanout_queues.add(queue)
self._fanout_to_queue[exchange] = queue
ret = super().basic_consume(queue, *args, **kwargs)
# Update fair cycle between queues.
#
# We cycle between queues fairly to make sure that
# each queue is equally likely to be consumed from,
# so that a very busy queue will not block others.
#
# This works by using Redis's `BRPOP` command and
# by rotating the most recently used queue to the
# and of the list. See Kombu github issue #166 for
# more discussion of this method.
self._update_queue_cycle()
return ret
def _update_queue_cycle(self):
self._queue_cycle.update(self.active_queues)
堆栈如下:
代码语言:javascript复制update, scheduling.py:75
_update_queue_cycle, redis.py:1018
basic_consume, redis.py:610
consume, entity.py:738
_basic_consume, messaging.py:594
consume, messaging.py:473
listen, pidbox.py:92
main, node.py:20
<module>, node.py:29
策略如下:
代码语言:javascript复制class round_robin_cycle:
"""Iterator that cycles between items in round-robin."""
def __init__(self, it=None):
self.items = it if it is not None else []
def update(self, it):
"""Update items from iterable."""
self.items[:] = it
def consume(self, n):
"""Consume n items."""
return self.items[:n]
def rotate(self, last_used):
"""Move most recently used item to end of list."""
items = self.items
try:
items.append(items.pop(items.index(last_used)))
except ValueError:
pass
return last_used
逻辑如下:
代码语言:javascript复制 user scope Kombu ----------------------- ----------------------- redis
| | Transport | | MultiChannelPoller | |
| | | | | |
| -------------------------------------- | cycle -------> | _channels ---- |
| | | | | ----------------------- | |
------------ | | Connection: redis://localhost:6379// | | channels -------- v |
| Connection|-----------> | | | | | ----------------- --- |
------------ | | | | _avail_channels --------- | Channel | <------------ |
| | connection -------> | | | | | | |
| | | ----------------------- | | _active_queues ------------------------ |
| -------------------------------------- | | | | | |
| -----> cycle ------> -------- -- | |
| ---------------------------- | | | FairCycle | | |
| | Exchange | | | ----------- | |
| ------------------------------ --> | | <----- | _queue_cycle ----------- | |
--------- | | Mailbox | | | testMailbox.pidbox(fanout) | | | | | | |
| mailbox|--------------> | | | ---------------------------- | | handlers | v | |
--------- | | | | | | | round_robin_cycle| |
| | exchange --------------- --------------------------------- | - -- ---------------- | |
| | | | Exchange | | ^ ^ | | |
| | reply_exchange ------------> | | | | | | | |
| | | | reply.testMailbox.pidbox(direct)| | | | | | |
| | reply_queue ------------- ------------------- ------------- | | | v | |
| | | | ^ | | | ----- ------------------------- | |
| | | | -------- | | | | | 'BRPOP': Channel._brpop_read | | |
| ------------------------- ---- --> | Queue ---------- | | | | | | |
| ^ -------- | | | | 'LISTEN': Channel._receive | | |
| | | | | | | | | ----------------------------------------------------
| --------------------- | | | | ------------------------------- | | | _kombu.binding.testMailbox.pidbox |
----- | | | | | | | | | | |
|node | ----------------> Node channel ------------------------------------------------------------------- | | | |
----- | | | | | | | | | "x06x16x06x16localhost.testMailbox.pidbox" |
| | mailbox ----- | | | | | |
| | | ---------------------------------------------------- | | --------- ------------------------------------------
| --------------------- | | | | ^
| | | | | |
| | | | | |
| ------------------------ | ----------------------------------------------------------------------------- | | |
---------- | | | | | Queue | | | | |
| consumer | | | Consumer channel ------- | | <----- | |
---------- | | | | exchange | | |
| | queues ---------------> | | | |
| | | | | ------------------------
----------- | | callbacks | | <localhost.testMailbox.pidbox -> Exchange testMailbox.pidbox(fanout)> | |
| callback | | | | | | |
------ ---- | ------------------------ ----------------------------------------------------------------------------- |
^ | | |
| | | |
--------------------------------------
|
3.6 消费
3.2.1 消费主体
如下代码完成消费。
代码语言:javascript复制def main(arguments):
consumer = node.listen(callback=callback)
try:
while True:
print('Consumer Waiting')
connection.drain_events()
finally:
consumer.cancel()
具体就是使用 drain_events 里读取消息,其代码如下:
代码语言:javascript复制def drain_events(self, connection, timeout=None):
time_start = monotonic()
get = self.cycle.get
polling_interval = self.polling_interval
if timeout and polling_interval and polling_interval > timeout:
polling_interval = timeout
while 1:
try:
get(self._deliver, timeout=timeout)
except Empty:
if timeout is not None and monotonic() - time_start >= timeout:
raise socket.timeout()
if polling_interval is not None:
sleep(polling_interval)
else:
break
3.2.2 业务逻辑
3.2.2.1 注册
get方法功能如下(需要注意的是,每次消费都要使用一次get函数,即,都要进行注册,消费....):
- 注册响应方式;
- 进行poll操作,这是通用操作,或者 BRPOP,或者 LISTEN;
- 调用 handle_event 进行读取redis,具体消费;
def get(self, callback, timeout=None):
self._in_protected_read = True
try:
for channel in self._channels:
if channel.active_queues: # BRPOP mode?
if channel.qos.can_consume():
self._register_BRPOP(channel)
if channel.active_fanout_queues: # LISTEN mode?
self._register_LISTEN(channel)
events = self.poller.poll(timeout)
if events:
for fileno, event in events:
ret = self.handle_event(fileno, event) # 具体读取redis,进行消费
if ret:
return
# - no new data, so try to restore messages.
# - reset active redis commands.
self.maybe_restore_messages()
raise Empty()
finally:
self._in_protected_read = False
while self.after_read:
try:
fun = self.after_read.pop()
except KeyError:
break
else:
fun()
因为这里利用了pubsub,所以调用到 channel._subscribe 来注册订阅,具体如下:
代码语言:javascript复制def _register_LISTEN(self, channel):
"""Enable LISTEN mode for channel."""
if not self._client_registered(channel, channel.subclient, 'LISTEN'):
channel._in_listen = False
self._register(channel, channel.subclient, 'LISTEN')
if not channel._in_listen:
channel._subscribe() # send SUBSCRIBE
具体类如下:
代码语言:javascript复制self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7fc2522297f0>
_register会把channel,socket fd的信息结合起来,作用就是:如果对应的socket fd有poll,就会调用对应的channel。
代码语言:javascript复制def _register(self, channel, client, type):
if (channel, client, type) in self._chan_to_sock:
self._unregister(channel, client, type)
if client.connection._sock is None: # not connected yet.
client.connection.connect()
sock = client.connection._sock
self._fd_to_chan[sock.fileno()] = (channel, type)
self._chan_to_sock[(channel, client, type)] = sock
self.poller.register(sock, self.eventflags)
具体_subscribe就是与具体redis联系,进行注册。
这样,对于 consumer 来说,redis 也联系上了,poll 也联系上了,下面就可以消费了。
代码语言:javascript复制def _subscribe(self):
keys = [self._get_subscribe_topic(queue)
for queue in self.active_fanout_queues]
if not keys:
return
c = self.subclient
if c.connection._sock is None:
c.connection.connect()
self._in_listen = c.connection
c.psubscribe(keys)
堆栈如下:
代码语言:javascript复制_subscribe, redis.py:663
_register_LISTEN, redis.py:322
get, redis.py:375
drain_events, base.py:960
drain_events, connection.py:318
main, node.py:24
<module>, node.py:29
相应变量如下,这里 client 是 redis 驱动的 PubSub 对象:
代码语言:javascript复制c = {PubSub} <redis.client.PubSub object at 0x7fc252264400>
keys = {list: 1} ['/0.testMailbox.pidbox']
self = {Channel} <kombu.transport.redis.Channel object at 0x7fc252239908>
此时逻辑如下:
代码语言:javascript复制
user scope Kombu | redis
| psubscribe |
| | ----------------------------
---------------------> drain_events ---------------------------------------------------------------------------------------------------------------------> | '/0.testMailbox.pidbox' |
| | | ----------------------------
| | |
| | ----------------------- ----------------------- |
| | | Transport | | MultiChannelPoller | |
| | | | | | |
| | -------------------------------------- | cycle -------> | _channels ---- |
| | | | | | ----------------------- | |
------ ----- | | Connection: redis://localhost:6379// | | channels -------- v |
| Connection|-----------> | | | | | ----------------- --- |
------------ | | | | _avail_channels --------- | Channel | <------------ |
| | connection -------> | | | | | | |
| | | ----------------------- | | _active_queues ------------------------ |
| -------------------------------------- | | | | | |
| -----> cycle ------> -------- -- | |
| ---------------------------- | | | FairCycle | | |
| | Exchange | | | ----------- | |
| ------------------------------ --> | | <----- | _queue_cycle ----------- | |
--------- | | Mailbox | | | testMailbox.pidbox(fanout) | | | | | | |
| mailbox|--------------> | | | ---------------------------- | | handlers | v | |
--------- | | | | | | | round_robin_cycle| |
| | exchange --------------- --------------------------------- | - -- ---------------- | |
| | | | Exchange | | ^ ^ | | |
| | reply_exchange ------------> | | | | | | | |
| | | | reply.testMailbox.pidbox(direct)| | | | | | |
| | reply_queue ------------- ------------------- ------------- | | | v | |
| | | | ^ | | | ----- ------------------------- | |
| | | | -------- | | | | | 'BRPOP': Channel._brpop_read | | |
| ------------------------- ---- --> | Queue ---------- | | | | | | |
| ^ -------- | | | | 'LISTEN': Channel._receive | | |
| | | | | | | | | ----------------------------------------------------
| --------------------- | | | | ------------------------------- | | | _kombu.binding.testMailbox.pidbox |
----- | | | | | | | | | | |
|node | ----------------> Node channel ------------------------------------------------------------------- | | | |
----- | | | | | | | | | "x06x16x06x16localhost.testMailbox.pidbox" |
| | mailbox ----- | | | | | |
| | | ---------------------------------------------------- | | --------- ------------------------------------------
| --------------------- | | | | ^
| | | | | |
| | | | | |
| ------------------------ | ----------------------------------------------------------------------------- | | |
---------- | | | | | Queue | | | | |
| consumer | | | Consumer channel ------- | | <----- | |
---------- | | | | exchange | | |
| | queues ---------------> | | | |
| | | | | ------------------------
----------- | | callbacks | | <localhost.testMailbox.pidbox -> Exchange testMailbox.pidbox(fanout)> | |
| callback | | | | | | |
------ ---- | ------------------------ ----------------------------------------------------------------------------- |
^ | | |
| | | |
--------------------------------------
|
手机如下
3.2.2.2 消费
前小节提到了,handle_event 之中会具体读取redis,进行消费。
当接受到信息之后,会调用如下:
代码语言:javascript复制def _deliver(self, message, queue):
try:
callback = self._callbacks[queue]
except KeyError:
self._reject_inbound_message(message)
else:
callback(message)
堆栈如下:
代码语言:javascript复制_deliver, base.py:975
_receive_one, redis.py:721
_receive, redis.py:692
on_readable, redis.py:358
handle_event, redis.py:362
get, redis.py:380
drain_events, base.py:960
drain_events, connection.py:318
main, node.py:24
<module>, node.py:29
此时变量如下,就是 basic_consume 之中的 _callback :
代码语言:javascript复制self._callbacks = {dict: 1}
'localhost.testMailbox.pidbox' = {function} <function Channel.basic_consume.<locals>._callback at 0x7fc2522c1840>
继续调用,处理信息
代码语言:javascript复制def receive(self, body, message):
"""Method called when a message is received.
This dispatches to the registered :attr:`callbacks`.
Arguments:
body (Any): The decoded message body.
message (~kombu.Message): The message instance.
Raises:
NotImplementedError: If no consumer callbacks have been
registered.
"""
callbacks = self.callbacks
[callback(body, message) for callback in callbacks]
堆栈如下:
代码语言:javascript复制receive, messaging.py:583
_receive_callback, messaging.py:620
_callback, base.py:630
_deliver, base.py:980
_receive_one, redis.py:721
_receive, redis.py:692
on_readable, redis.py:358
handle_event, redis.py:362
get, redis.py:380
drain_events, base.py:960
drain_events, connection.py:318
main, node.py:24
<module>, node.py:29
变量如下:
代码语言:javascript复制body = {dict: 5} {'method': 'print_msg', 'arguments': {'msg': 'Message for you'}, 'destination': None, 'pattern': None, 'matcher': None}
message = {Message} <Message object at 0x7fc2522e20d8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '7dd6ad01-4162-42c3-b8db-bb40dc7dfda0', 'body_length': 119, 'properties': {}, 'delivery_info': {'exchange': 'testMailbox.pidbox', 'rout
self = {Consumer} <Consumer: [<Queue localhost.testMailbox.pidbox -> <Exchange testMailbox.pidbox(fanout) bound to chan:1> -> bound to chan:1>]>
最后调用到用户方法:
代码语言:javascript复制callback, node.py:15
<listcomp>, messaging.py:586
receive, messaging.py:586
_receive_callback, messaging.py:620
_callback, base.py:630
_deliver, base.py:980
_receive_one, redis.py:721
_receive, redis.py:692
on_readable, redis.py:358
handle_event, redis.py:362
get, redis.py:380
drain_events, base.py:960
drain_events, connection.py:318
main, node.py:24
<module>, node.py:29
这样,mailbox 的 consumer 端就分析完毕。
0x04 Producer
Producer 就是发送邮件,此处逻辑要简单许多。
代码如下:
代码语言:javascript复制def main(arguments):
connection = kombu.Connection('redis://localhost:6379')
mailbox = pidbox.Mailbox("testMailbox", type="fanout")
bound = mailbox(connection)
bound._broadcast("print_msg", {'msg': 'Message for you'})
4.1 Mailbox
现在位于Mailbox,可以看到就是调用 _publish。
代码语言:javascript复制def _broadcast(self, command, arguments=None, destination=None,
reply=False, timeout=1, limit=None,
callback=None, channel=None, serializer=None,
pattern=None, matcher=None):
arguments = arguments or {}
reply_ticket = reply and uuid() or None
chan = channel or self.connection.default_channel
# Set reply limit to number of destinations (if specified)
if limit is None and destination:
limit = destination and len(destination) or None
serializer = serializer or self.serializer
self._publish(command, arguments, destination=destination,
reply_ticket=reply_ticket,
channel=chan,
timeout=timeout,
serializer=serializer,
pattern=pattern,
matcher=matcher)
if reply_ticket:
return self._collect(reply_ticket, limit=limit,
timeout=timeout,
callback=callback,
channel=chan)
变量如下:
代码语言:javascript复制arguments = {dict: 1} {'msg': 'Message for you'}
self = {Mailbox} <kombu.pidbox.Mailbox object at 0x7fccf19514e0>
继续调用 _publish,其中如果需要回复,则做相应设置,否则直接调用 producer 进行发送。
代码语言:javascript复制def _publish(self, type, arguments, destination=None,
reply_ticket=None, channel=None, timeout=None,
serializer=None, producer=None, pattern=None, matcher=None):
message = {'method': type,
'arguments': arguments,
'destination': destination,
'pattern': pattern,
'matcher': matcher}
chan = channel or self.connection.default_channel
exchange = self.exchange
if reply_ticket:
maybe_declare(self.reply_queue(channel))
message.update(ticket=reply_ticket,
reply_to={'exchange': self.reply_exchange.name,
'routing_key': self.oid})
serializer = serializer or self.serializer
with self.producer_or_acquire(producer, chan) as producer:
producer.publish(
message, exchange=exchange.name, declare=[exchange],
headers={'clock': self.clock.forward(),
'expires': time() timeout if timeout else 0},
serializer=serializer, retry=True,
)
此时变量如下:
代码语言:javascript复制exchange = {Exchange} Exchange testMailbox.pidbox(fanout)
message = {dict: 5} {'method': 'print_msg', 'arguments': {'msg': 'Message for you'}, 'destination': None, 'pattern': None, 'matcher': None}
4.2 producer
下面产生了producer。于是由producer进行操作。
代码语言:javascript复制def _publish(self, body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory,
immediate, exchange, declare):
channel = self.channel
message = channel.prepare_message(
body, priority, content_type,
content_encoding, headers, properties,
)
# handle autogenerated queue names for reply_to
reply_to = properties.get('reply_to')
if isinstance(reply_to, Queue):
properties['reply_to'] = reply_to.name
return channel.basic_publish(
message,
exchange=exchange, routing_key=routing_key,
mandatory=mandatory, immediate=immediate,
)
4.3 Channel
继续执行到 Channel,就是要对 redis 进行处理了。
代码语言:javascript复制def basic_publish(self, message, exchange, routing_key, **kwargs):
"""Publish message."""
self._inplace_augment_message(message, exchange, routing_key)
if exchange:
return self.typeof(exchange).deliver( # 这里
message, exchange, routing_key, **kwargs
)
# anon exchange: routing_key is the destination queue
return self._put(routing_key, message, **kwargs)
4.4 FanoutExchange
直接用 Exchange 进行发送。
代码语言:javascript复制class FanoutExchange(ExchangeType):
"""Fanout exchange.
The `fanout` exchange implements broadcast messaging by delivering
copies of all messages to all queues bound to the exchange.
To support fanout the virtual channel needs to store the table
as shared state. This requires that the `Channel.supports_fanout`
attribute is set to true, and the `Channel._queue_bind` and
`Channel.get_table` methods are implemented.
"""
type = 'fanout'
def lookup(self, table, exchange, routing_key, default):
return {queue for _, _, queue in table}
def deliver(self, message, exchange, routing_key, **kwargs):
if self.channel.supports_fanout:
self.channel._put_fanout(
exchange, message, routing_key, **kwargs)
4.5 Channel
流程进入到 Channel,这时候调用 redis 驱动进行发送。
代码语言:javascript复制def _put_fanout(self, exchange, message, routing_key, **kwargs):
"""Deliver fanout message."""
with self.conn_or_acquire() as client:
client.publish(
self._get_publish_topic(exchange, routing_key),
dumps(message),
)
4.6 redis 驱动
最后,redis 驱动进行发送。
代码语言:javascript复制def publish(self, channel, message):
"""
Publish ``message`` on ``channel``.
Returns the number of subscribers the message was delivered to.
"""
return self.execute_command('PUBLISH', channel, message)
关键变量如下:
代码语言:javascript复制channel = {str} '/0.testMailbox.pidbox'
message = {str} '{"body": "eyJtZXRob2QiOiAicHJpbnRfbXNnIiwgImFyZ3VtZW50cyI6IHsibXNnIjogIk1lc3NhZ2UgZm9yIHlvdSJ9LCAiZGVzdGluYXRpb24iOiBudWxsLCAicGF0dGVybiI6IG51bGwsICJtYXRjaGVyIjogbnVsbH0=", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"cloc
self = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>