03.理解RabbitMQ消息通信中的基本概念

2019-08-14 14:21:05 浏览数 (1)

当提到消息通信时,我们脑海里最先浮现的可能是邮箱和即时通信(IM),不过这些模型并非我们讨论的RabbitMQ消息通信。比如说,虽然AMQP(高级消息队列协议)像邮箱那样为离线消费者存储消息,但是这些根据标签路由的消息更为灵活。同时和邮件不同的是,这些消息没有固定的结构,甚至可以直接存储二进制数据。同时也不同于即时通信(IM)协议,AMQP隐去了消息的发送方和接收方。AMQP消息能以一对多的广播方式进行路由,也可以选择以一对一的方式路由。在IM中,你只能一对一通信。

由于AMQP消息通信与其他通信协议不同,因此接下来我们将要解释AMQP中的术语和构造。

首先我们看下什么是消费者和生产者。

消费者和生产者

RabbitMQ在应用程序和服务器之间扮演着路由器的角色。所以当应用程序连接到RabbitMQ时,他就必须决定:我是在发送还是在接收呢?或者从AMQP的角度思考,我是一个生产者还是一个消费者呢?

生产者(producer)创建消息,然后发布(发送)到代理服务器(RabbitMQ)。

那么,消息又是什么呢?

消息包括两部分:有效载荷(payload)和标签(label)。有效载荷就是你想要传输的数据。它可以是任何内容,一个JSON数组或者是你喜欢的高清无码动作片。RabbitMQ不会在意这些。那么,标签又是做什么用的呢?它描述了有效载荷,并且RabbitMQ用它来决定谁将获得消息的拷贝。

消费者连接到代理服务器,并订阅到队列(queue)上。如果把消息队列想象成一个邮箱。每当消息达到特定的邮箱时,RabbitMQ会将其发送给其中一个订阅或监听的消费者那里,当消费者接收到消息时,它只得到消息的一部分:有效载荷。在消息路由过程中,消息的标签并没有随有效载荷一同传递。RabbitMQ甚至不会告诉你是谁生产/发送了消息。如果想明确知道是谁生产了此消息的话,就要看生产者是否把发送方消息放入到有效载荷中。

下图描述的是生产者到消费者的消息流

其实,整个过程很简单。生产者创建消息,消费者接受这些消息。你的应用程序可以作为生产者,向其他应用程序发送消息。或者作为一个消费者,接受消息。也可以同时是消费者也是生产者。不过,在这之前,它必须先建立一条信道(channel)。

那么,什么是信道呢?

信道是建立在“真实的”TCP连接内的虚拟连接。当你连接到Rabbit时,你的应用程序和Rabbit代理服务器之间就会创建一条TCP连接。一旦TCP连接打开(你通过了认证),应用程序就会创建一条AMQP信道。不论是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成的。

那我们为什么不直接通过TCP连接发送AMQP命令呢?

主要原因在于对操作系统来说建立和销毁TCP会话是非常昂贵的开销。假设我们从队列中消费消息,并根据服务需求合理调度线程。如果你只进行TCP连接,即每个线程都要自行连接到Rabbit。也就是说高峰期每秒都会有成千上万条连接。这不仅造成了TCP连接的巨大浪费,而且操作系统每秒也就只能建立这么点数量的连接。因此,你可能会很快就遇到性能瓶颈了。

然而,如果我们为所有线程只使用一条TCP连接以满足性能方面的要求,又能保证每个线程的私密性,就像拥有了独立连接一样,岂不很完美。这就是要引入信道概念的原因。

线程启动后,会在现成的连接上创建一条信道,也就获得了连接到Rabbit上的私密通信路径,而不会给操作系统的TCP栈造成额外负担。在一条TCP连接上创建多少条信道是没有限制的。你也可以把它想象成一束光纤电缆。TCP连接就像电缆,而AMQP信道就像一条条独立光纤束。

队列

从概念上来讲,AMQP消息路由必须有三部分:交换机、队列和绑定。生产者把消息发布到交换机上;消息最终达到队列,并被消费者接收;绑定决定了消息如何从路由器路由到特定的队列。我们在研究交换机和绑定之前,需要先理解队列的概念和工作原理。

队列就如同具名邮箱。消息最终到达队列中并等待消费。那么消费者是如何从特定的队列中接收消息的呢?

消费者主要通过两种方式从特定的队列中接收消息。

(1)通过AMQP的basic.consume命令订阅。此时会将信道设置为接收模式,直到取消队列的订阅为止。订阅了消息后,消费者在消费或拒绝了最近接收的那条消息后,就能从队列中自动的接收下一条消息。 (2)通过AMQP的basic.get命令请求单条消息。如果要获得更多消息的话,需要再次发送basic.get命令。切记,不要将basic.get放在一个循环里来替代basic.consume。因为这会严重影响Rabbit的性能。你可以大致理解为,basic.get命令会订阅消息,获取单条消息后,然后取消订阅。消费者理应始终使用basic.consume来实现高吞吐量。

当有消费者订阅了队列,如果有消息的话,消息会立即发送给这些订阅的消费者。如果消息到达了无人订阅的队列上,消息将会在队列中等待,直到有消费者订阅该队列。

那么,当有多个消费者订阅到同一个队列上时,消息又是如何分发的呢?

当Rabbit队列拥有多个消费者时,队列收到的消息将以循环的方式发送给消费者。每条消息只会发送给一个订阅的消费者。当消费者确认接收到了消息后,Rabbit将会把消息从队列中删除。

你可能注意到了,刚才提到了对消息进行确认。是的,消费者接收到的每一条消息都必须进行确认。消费者必须通过AMQP的basic.ack命令显式地向RabbitMQ发送一个确认,或者在订阅到队列的时候就将auto_ack参数设置为true。当设置了auto_ack时,一旦消费者接收消息,RabbitMQ会自动视其确认了消息。

如果消费者收到一条消息,然后确认之前从Rabbit断开连接/从队列上取消订阅,RabbitMQ会认为这条消息没有分发,然后重新分发给下一个订阅的消费者。如果你的应用程序崩溃了,这样做可以确保消息会被发送给另一个消费者进行处理。

另一方面,如果应用程序有bug而忘记确认消息啦,Rabbit将不会给该消费者发送更多的消息。这是因为在上一条消息被确认之前,Rabbit会认为这个消费者并没有准备好接收下一条消息。

在收到消息后,如果你想要明确拒绝而不是确认收到该消息的话,该如何做呢?比如说,你在处理消息的时候遇到了不可恢复的错误,或者是格式错误的消息等。

只要消息尚未确认,你则有以下两种选择: (1)把消费者从Rabbit服务器断开连接,这会导致Rabbit自动重新把消息入队,并且发给另一个消费者。这样连接/断开的方式会额外增加Rabbit的负担,如果所有消费者处理消息时都会遇到错误的话,会导致潜在的重大负荷。 (2)RabbitMQ 2.0.0以及更新的版本,那就可以使用AMQP的basic.reject命令。顾名思义,basic.reject允许消费者拒绝RabbitMQ发送的消息。如果把reject命令的requeue参数设置为true的话,RabbitMQ会将消息重新发送给下一个订阅的消费者。如果设置为false的话,Rabbit MQ会把消息从队列中移除,而不会把它发送给新的消费者。如果你检测到一条格式错误的消息而任何一个消费者都无法处理的时候,这样做就十分有用。

还有一件更重要的事情,如何创建队列。消费者和生产者都能使用queue.declare命令来创建队列。当创建队列时,你常常想要指定队列名称。消费者订阅队列时需要队列名称,并在创建绑定时也需要指定队列名称。如果不指定队列名称的话,Rabbit会分配一个随机名称并在queue.declare命令的响应中返回(对于构建AMQP上的RPC应用来说,使用临时“匿名”队列很有用)。

在队列设置中,有一些有用的参数,比如:

  • exclusive 如果设置为true的话,队列将变成私有的,此时只有你的应用程序才能消费队列消息。当你想要限制一个队列只有一个消费者的时候很有帮助。
  • auto-delete 当最后一个消费者取消订阅的时候,队列就会自动移除。如果你需要临时队列只为一个消费者服务的话,请结合auto-deleteexclusive。当消费者断开连接时,队列就被移除了。

如果你尝试声明一个已经存在的队列会发生什么呢?只要声明参数完全匹配现存的话,Rabbit就什么都不做,并成功返回,就好像这个队列已经创建成功一样,如果参数不匹配的话,队列声明尝试会失败。如果你只是想检测队列是否存在,则可以设置queue.declarepassive选项为true。在该设置下,如果队列存在,那么queue.declare命令会成功返回;如果队列不存在的话,queue.declare命令不会创建队列而会返回一个错误。

当设计应用程序时,是该由生产者还是消费者来创建所需的队列呢?看起来最自然的答案是由消费者来创建队列。毕竟,消费者才需要订阅队列,而且总不能订阅一个不存在的队列,是吧?但是,先别这么快下结论。

你首先选哟想清楚消息的生产者能否承担得起丢失消息。发送出去的消息如果路由到了不存在的队列上的话,Rabbit会忽略他们。因此,如果你不能承担得起消息进入“黑洞”而丢失的话,你的生产者和消费者就都应该尝试去创建队列。另一方面,如果你能承担得起丢失消息,或者你实现了一种方法来重新发布未处理的消息的话,你可以只让自己的消费者来声明队列。

总的来说,队列是AMQP消息通信的基础模块:

  • 为消息提供了住所,消息在此等待消费。
  • 对负载均衡来说,队列是绝佳方案。只需附加一堆消费者,并让RabbitMQ以循环的方式均匀地分配发来的消息。
  • 队列是Rabbit中消息的最后的终点,除非消息进入了“黑洞”。

我们了解了队列之后,那么消息又是如何到达队列的呢?接下来,让我们认识一下AMQP的交换机和绑定吧。

交换机和绑定

当你想要将消息投递到队列时,你通过把消息发送给交换机来完成。然后,根据确定的规则,Rabbit MQ 将会决定消息该投递到哪个队列。这些规则被称为路由键。队列通过路由键绑定到交换机。当你把消息发送到代理服务器时,消息将拥有一个路由键–即便是空的–RabbitMQ也会将其和绑定使用的路由键进行匹配。如果相匹配的话,那么消息将会投递到该队列。如果路由的消息不匹配任何绑定模式的话,消息将进入“黑洞”。

在AMQP中你还可以直接将队列绑定到交换机上,而不使用路由键,然后你发送给交换机的每一条没有路由键的消息,都会投递到上述队列中去。

服务器会根据路由键将消息从交换机路由到队列,但是它是如何处理投递到多个队列的情况的呢?

协议中定义的不同类型交换机发挥了作用。以供四种类型:direct、fanout、topic 和 headers。每一种类型实现了不同的路由算法。headers交换机允许你匹配AMQP消息的header而非路由键。除此之外,headers交换机和direct交换机完全一致,但性能会差很多。因此它并不太实用,而且几乎再也用不到了。所以,接下来,我们看下除了headers交换机外的其他三种。

默认交换机(一种特殊的直连交换机)

在消息代理服务器实现direct类型交换机时,就已经包含了一个空白字符串名称的默认交换机(default exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。

举个栗子:当你声明了一个名为"search-indexing-online"的队列,AMQP代理会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为"search-indexing-online"。因此,当携带着名为"search-indexing-online"的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为"search-indexing-online"的队列中。换句话说,默认交换机看起来貌似能够直接将消息投递给队列,尽管技术上并没有做相关的操作。

当默认的交换机无法满足应用程序的需求时,你可以声明你自己的直连交换机。

直连交换机

直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。直连交换机主要用来处理消息的单播路由(unicast routing)

  • 将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key)
  • 当一个携带着路由键为RK的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为RK的队列。

直连交换机经常用来循环分发任务给多个工作者(workers)。当这样做的时候,我们需要明白一点,在AMQP 0-9-1中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。

接下来,我们看下主要处理消息的广播路由的扇形交换机。

扇型交换机

扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。

举例来说,一个Web应用程序可能需要在用户上传新的图片时,用户相册必须清除缓存,同时用户应该得到些积分奖励。你可以将两个队列绑定到图片上传交换机上。一个用于清除缓存,另一个用于增加用户积分。从这个场景中你可以了解到.使用交换机、绑定和队列比直接向指定的队列发送消息要有优势。假设应用程序的第一个需求是在图片上传到网站上后,需要清除用户相册缓存。你可以通过只使用一个队列就能轻易完成。但是当产品负责人让你实现一个新功能,即在上传完成后给用户一点奖励,你该怎么办呢?如果你是直接将消息发送给队列的话,就不得不修改发送方的代码,以将消息发送给新的用户积分(points)队列。如果你使用的是fanout交换机的话,你唯一需要做的就是为新的消费者写一段代码,然后声明新的队列并将其绑定到fanout交换机上。就如同我们之前讲的那样,发送方的代码和消费者的代码两者之间完全解藕了,这允许你轻而易 举地添加应用程序的功能。

最后,我们来看下主要处理消息的多播路由的主题(topic)交换机

主题交换机

主题交换机(topic exchanges)通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。

主题交换机拥有非常广泛的用户案例。无论何时,当一个问题涉及到那些想要有针对性的选择需要接收消息的 多消费者/多应用(multiple consumers/applications) 的时候,主题交换机都可以被列入考虑范围。

下面我们用Web应用程序日志系统为例。

其中,有两个生产者发布消息,分别绑定topic交换机,路由键分别为log.criticalalert.critical。 下面有三个消费者分别声明了三个队列。分别是:

  • 绑定topic交换机,路由键前缀为log.开头
  • 绑定topic交换机,路由键后缀为.critical结束
  • 绑定topic交换机,路由键前缀为alert.开头

从上面可以看到,单个.把路由键分为了几部分,*匹配特定位置的任意文本。如果要匹配所有规则,你可以使用#

我们在理解了交换机、绑定和队列之后,你可能会认为自己已经掌握了RabbitMQ的所有特性。但是,随着深入使用,你会发现有一个概念我们尚未讨论:虚拟主机:vhost

虚拟主机和隔离

每一个RabbitMQ服务器都能创建虚拟消息服务器,我们称之为虚拟主机(vhost)。每一个vhost本质上都是一个mini版的RabbitMQ服务器,拥有自己的队列、交换机和队列。更重要的是,它拥有自己的权限机制。这使得你能够安全地使用一个RabbitMQ服务器来服务众多的应用程序。

vhost之于Rabbit就像虚拟机之于物理服务器一样:它们通过在各个实例间提供逻辑上的分离,允许你为不同应用程序安全保密地运行数据。这很有用.它既能将同一Rabbit的众多客户区分开来,又可以避免队列和交换机的命名冲突。否则你可能不得不运行多个Rabbit,并忍受随之而来头疼的管理问题。相反.你可以只运行一个Rabbit,然后按需启动或关闭vhost。

vhost是AMQP概念的基础,你必须在连接时进行指定。由于RabbitMQ包含了开箱即用的默认vhost:"/",因此使用起来非常简单。如果你不需要多个vhost的话,那么就使用默认的吧。通过使用缺省的guest用户名和密码guest就可以访问默认vhost。为安全起见,你应该更改它。AMQP的一个有趣的地方在于它并没有指定权限控制是在vhost级别还是在服务器端级别实现。这留给了服务器的开发者去决定。在RabbitMQ的例子中,权限控制是以vhost为单位的。

当你在Rabbit里创建一个用户时,用户通常会被指派给至少一个vhost,并且只能访问被指派vhost内的队列、交换机和绑定。当你在设计消息通信架构时,记住vhost之间是绝对隔离的。你无法将vhost banana tree上的交换机绑定到vhostoak tre。中的队列去。事实上,这既保证了安全性,又确保了可移植性。

我们可以看到vhost带来的巨大益处,那么如何创建它们呢?vhost和权限控制非常独特,不同于队列、交换机和绑定,它们是AMQP中唯一无法通过AMQP协议创建的基元。对于RabbitMQ来说,你需要通过RabbitMQ的安装路径下./sbin/目录下的rabbitmqctl工具来创建。

  • rabbitmqctl add_vhost [vhost_name] 创建一个vhost
  • rabbitmqctl delete_vhost [vhost_name] 删除一个vhost
  • rabbitmqctl list_vhost Rabbit服务器上运行着哪些vhost

但是,在平常工作中,我们经常会使用Web管理界面进行操作。

到目前为止呢,通过vhost你保障了队列和交换机的安全。现在我们来讨论下当Rabbit崩溃或者重启时,如何确保关键信息不丢失。

持久化和策略

默认情况下,重启RabbitMQ服务器后,那些队列和交换机就都消失了(随同里面的消息)。原因在于每个队列和交换机的durable属性。该属性默认清况为false,它决定了abbitMQ是否需要在崩溃或者重启之后重新创建队列(或者交换机)。将它设置为true,这样你就不需要在服务器断电后重新创建队列和交换机了。你也许会认为把队列和交换机的durable属性设置为true就足够可以让消息幸免于重启.但是你错了。队列和交换机当然必须被设置成true,但光这样做还不够。

能从AMQP服务器崩溃中恢复的消息,我们称之为持久化消息、。在消息发布前,通过把它的“投递模式”( delivery mode)选项设置为2 (AMQP客户端可能会使用人性化的常量来代替数值)来把消息标记成持久化。到目前为止,消息还只是被表示为持久化的,但是它还必须被发布到持久化的交换机中并到达持久化的队列中才行。如果不是这样的话,则包含持久化消息的队列(或者交换机)会在Rabbit崩溃重启后不复存在,从而导致消息成为孤儿。因此,如果消息想要从Rabbit崩溃中恢复,那么消息必须:

  • 把它的投递模式选项设置为2(持久)
  • 发送到持久化的交换机
  • 到达持久化的队列

做到了以上三点,你就不用担心你的关键信息无缘无故的失踪啦。

那么,RabbitMQ确保持久性消息又是怎么从服务器重启中恢复过来的呢?

是因为他们把消息写入磁盘上的一个持久化日志文件中了。当发布一条持久性消息到持久交换机上时,Rabbit会在消息提交到日志文件后才发送响应。记住,之后这条消息如果路由到了非持久队列的话,它会自动从持久性日志中移除.并且无法从服务器重启中恢复。如果你使用持久性消息的话.则确保之前提到的持久性消息的那三点都必须做到位(我们再怎么强调也不为过)。一旦你从持久化队列中消费了一条持久性消息的话(并且确认了它).RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集。在你消费持久性消息前,如果RabbitMQ重启的话,服务器会自动重建交换机和队列(以及绑定),重播持久性日志文件中的消息到合适的队列或者交换机上(取决于Rabbit服务器宕机的时候,消息处在路由过程的哪个环节))。

你可能认为自己应该为所有的消息都启用持久化消息通信。你可以这样做,但同时你也要为此付出代价:性能。写人磁盘要比存入内存中慢不止一点点,而且会极大地减少RabbitMQ服务器每秒可处理的消息总数。使用持久化机制而导致消息吞吐量降低至少10倍的情况并不少见(将RabbitMQ的消息存储置于SSD上的话,就可以极大地提升持久化消息通信的性能。)。另外还有一点就是,持久性消息在RabbitMQ内建集群环境下工作得并不好。虽然RabbitMQ集群允许你和集群中的任何节点的任一队列进行通信,但是事实上那些队列均匀地分布在各个节点而没有冗余(在集群中任何一个队列都没有备份的拷贝)。如果运行seed bin队列的集群节点崩溃了.那么直到节点恢复前,这个队列也就从整个集群中消失了(如果队列是可持久化的)。更重要的是.当节点宕机时,其上的队列也都不可用了,而且持久化队列也无法重建。这就会导致消息丢失。我们会在下次分享的时候更详细地讨论这一情况,并给出替代的集群方法来解决这个问题。

在我们刚开始讨论MQ的时候,就已经说过了MQ有一个致命的缺点就是:上游无法知道下游的执行结果。由于发布操作不返回任何消息给生产者,那你怎么知道服务器是否已经持久化了持久消息到硬盘呢?服务器可能会在把消息写入磁盘前就宕机了,消息因此丢失,而你却不知道。 而这就是事务发挥作用的地方。

在AMQP中,在把信道设置成事务模式后。你通过信道发送那些想要确认的消息,之后还有多个其他AMQP命令。这些命令是执行还是忽略,取决于第一条消息发送是否成功。一旦你发送完所有命令,就可以提交事务了。如果事务中的首次发布成功了,那么信道会在事务中完成其他AMQP命令。如果发送失败的话,其他AMQP命令将不会执行。

RabbitMQ中与事务机制有关的方法有三个,分别是Channel里面的txSelect(),txCommit()以及txRollback()。

  • txSelect用于将当前Channel设置成transaction模式
  • txCommit用于提交事务
  • txRollback用于回滚事务

在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定是到达broker了,如果在txCommit执行之前broker异常奔溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。

虽然事务是正式AMQP 0-9-1规范的一部分,但是它却有致命的缺陷:几乎吸干了Rabbit的性能。使用事务不但会降低大约2-10倍的消息吞吐量,而且会使生产者应用程序产生同步。而你使用消息通信就是想要避免同步。知晓了所有这一切之后,RabbitMQ团队决定拿出更好的方案来保证消息投递:发送方确认模式

和事务相仿,你需要告诉Rabbit将信道设置成为confirm模式.而且你只能通过重新创建信道来关闭该设置。一旦信道进人confirm模式.所有在信道上发布的消息都会被指派一个唯一的ID号(从1开始)。一旦消息被投递给所有匹配的队列后,信道会发送一个发送方确认模式给生产者应用程序(包含消息的唯一ID )。这使得生产者知晓消息已经安全到达目的队列了。如果消息和队列是可持久化的,那么确认消息只会在队列将消息写入磁盘后才会发出。发送方确认模式的最大好处是它们是异步的。一旦发布了一条消息,生产者应用程序就可以在等待确认的同时继续发送下一条。当确认消息最终收到的时候,生产者应用的回调方法就会被触发来处理该确认消息。如果Rabbit发生了内部错误从而导致了消息的丢失,Rabbit会发送一条nack ( not acknowledged,未确认)消息。就像发送方确认消息那样,只不过这次说明的是消息已经丢失了。同时.由于没有消息回滚的概念(同事务相比),因此发送方确认模式更加轻量级,同时对Rabbit代理服务器的性能影响几乎可以忽略不计。

0 人点赞