写在前面的话:本篇文章主要来讲解,RabbitMQ的路由知识,在开始之前,笔者问了自己几个问题。笔者觉得一旦知道了这几个问题的答案,那么关于RabbitMQ的路由这部分知识基本上算是熟悉了,所以还希望大家带着这些问题来阅读本篇文章。
1. 为什么RabbitMQ需要路由呢?它们有什么用? 2.路由都有哪些分类?各自有什么特点? 3.这些路由是如何实现消息转发的?
一、RabbitMQ的路由知识介绍
1、对于生产者来说,它可以生产一类消息,也可以生产多类消息,一旦生产了多类消息,那么生产者如何将不同类的消息发送给不同的消费者呢?
对于这个问题的答案,其实最常用的办法,便是路由,可以让生产者根据不同的路由规则,将不同类型的消息转发给不同的消费者。
RabbitMQ就是采用的这种策略,不过对于生产者来说,直接连接的是路由(exchange)而不是消费者。路由会根据设置的路由规则不同,将不同类型的消息分发到不同的消息队列(queue)中,等待消费者去将自己所属队列中的消息取走。
下面是RabbitMQ 的示意图:
2、RabbitMQ这个路由规则介绍
对于RabbitMQ这个路由规则,是通过Exchange定义的类型(type)来控制的,而RabbitMQ共有四种exchange类型:direct, topic, headers, fanout。
1). Direct exchange的路由算法:就是将exchange的binding_key和消息的routing_key进行比较,如果完全匹配这说明是需要分发的队列。
2). Fanout exchange的路由算法:它将所接收到的消息广播给所有绑定的队列。
3). Topic exchange的路由算法:binding key的逻辑跟direct一样,其接收到的消息会分发到所有与其routing key相匹配的绑定队列。
(备注:Topic类型的exchange消息的routing key是有一定限制的,必须是一组使用“.”分开的单词。单词可以是任意的,但是一般来说以能准确的表达功能的为佳。如以下的例子都是合法的:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".Routing key可以是任意多个单词组成,但其总长度不能超过255个字节。)
4). Headers Exchange的路由算法: 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。工作流程:
1>绑定一个队列到头交换机上时,会同时绑定多个用于匹配的头(header)。
2>传来的消息会携带header,以及会有一个 “x-match” 参数。当 “x-match” 设置为 “any” 时,消息头的任意一个值被匹配就可以满足条件,而当 “x-match” 设置为 “all” 的时候,就需要消息头的所有值都匹配成功。
备注:头交换机可以视为直连交换机的另一种表现形式。但直连交换机的路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。灵活性更强(但实际上我们很少用到头交换机)。
3 、既然生产者与路由绑定,并且会根据设置的路由规则将消息放到不同的队列,那么队列是怎么与路由关联起来的呢?
答案是通过绑定关系,RabbitMQ中,路由与Queue之间连接,但是这种连接是通过一个叫做Binding(绑定)的东西关联起来的。当然这种Binding并不是随机的,而是有规则,这里的规则叫做binding_key,用到amqp中的API如下所示:
func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table)error
二、下面我们用例子来说明上面的几种路由类型是如何使用的。
1、direct类型例子
1)、代码:左边是生产者代码,右边是消费者代码。
对于direct来说,需要通过binding_key和消息的routing_key来进行分发消息到不同的队列。 生产者代码中,定义了exchange名字testdirect以及类型direct,然后生产数据的时候publish指定接受的exchange是testdirect,并同时指定routing_key的两种key值:hello和other。 消费者代码中,同样定义了exchange叫做testdirect和direct类型,绑定Queue的时候,只需要关联成testdirect就可以了。
2)、运行结果
同样是启动两个消费者,一个用来接收来自hello消息队列的消息,一个用来接收来自other消息队列的消息。
发送端,
2、fanout类型例子:
1)、代码:左边是消费者代码,右边是生产者代码。
对于路由类型fanout来说,代码实现比较简单。1)生产者一端需要声明exchange的名字,在生产消息publish的时候指定exchange的名字即可。2)消费者一端同样需要声明exchange的名字,这里的名字需要与生产者的是一致的,同时在绑定queue的时候,也就是QueueBind的时候指定exchange对应的名字即可。
2)、运行效果
1>打开两个终端,分别启动一个消费者,如下图所示:
通过生产者来发送消息给路由fanout类型的hellotest,发送内容如下所示。通过上图中消费者的输出可以看出,在消费者每发一次消息的时候,消费者1和消费者2都会即时收到这个消息内容。
3)、查看rabbitmq上面的数据
下面我们通过rabbitmq的命令,来查看rabbitmq sever上面的exchange,binding以及queue的对应关系,如下图所示:
通过命令显示的情况来看,1)两个消费者分别生成了两个queue,名字是amq.开头的随机码来表示的。2)hellotest 的类型是fanout。3)binding关系可以看出hellotest与这两个队列都有绑定关系。
3、topic类型
1)、代码:左边是消费者代码,右边是生产者代码。
topic类型的代码,1)生产者需要在定义exchange的时候声明topic,然后在Publish的时候,关联上exchange的名字testtopic和可以自己定义的route_key值。2)消费者需要需要声明相同的exchange,并指定类型为topic,在绑定消息队列的时候,关联对应的exchange,同时设置binding_key,这里设计成依赖与命令行输入的方式。
2)、运行结果
通过生产者依次发送4次消息,如下图所示:
消费者一:配置的route_key值是"flower.*"和"*.big",表示的是第一个字符是flower,后续字符任意匹配;第一个任意字符,第二个字符是big的两种key值,都会被写入这个消息队列。所以该消费者消费了三个消息"animal.big","flower.small"和"flower.big"。
消费者二:配置的route_key是"#",表示的是什么字符都接受,类似于fanout模式。所以该消费者会消费所有的消息。
消费者三:配置的是完全匹配格式"flower.big",所以该消费者只会命中"flower.big"这一消息。
3)、查看rabbitmq上面的数据
通过下面的绑定命令,我们可以看到这三个不同的消息队列,绑定的exchange都是testtopic这个路由,至于这个队列对应的是那个消费者,可以参看2)中的日志信息。
4、header类型
备注:因为header类型的实现与上面的direct、fanout、topic的匹配规则完全不相同,甚至可以说是另外一套实现方式,所以在本篇文章中,不做举例说明,后续会单独整理一章来介绍。
参考文档:
深入理解AMQP协议:https://blog.csdn.net/weixin_37641832/article/details/83270778
《RabbitMQ入门之Go语言教程》(5) 主题交换器:http://raylei.cn/index.php/archives/63/
作者说明:文章首发在微信公众号:灰子学技术,最近一直在学习Go语言相关的知识,同时也在整理,欢迎大家过来关注,共同学习,进步。如果喜欢,或是觉得有所帮助,还请各位顺手转发一下,谢谢。