一、Fedration插件介绍
Fedration插件用来在不同的RabbitMQ集群之间复制队列消息,集群可以是内网也可以是公网,而这些对应用来说是透明的,即应用不会感知到,也不需要编写相关代码。
设想这样一个场景:系统有2个RabbitMQ/应用集群,一个部署在中国,另一个部署在美国,系统有发送短信的功能,出于性能和数据隐私的需要,中国集群只能发送给中国的手机号发短信,而海外的只能给中国以外的手机号发短信,但你们公司有中国、美国的雇员,他们都要登录一个内部系统,其中这个内部系统需要短信登录,这样的场景就可以通过Fedration插件将相关消息复制到对方集群中,由对方集群来完成相应功能。
二、Fedration相关术语
为了表达准确,我会尽量用英文原文表达相关概念。
1、Upstream(上游)、Downstream(下游)
上面有2个RabbitMQ集群,A和B,如果我们希望每次发到A的某个Exchange或队列的信息也发送到B,则A是Upstream,B是Upstream,即数据流向是从Upstream流向Downstream的。
不过Rabbit MQ里操作比较特殊,添加Upstream要在Downstream中加,而不是反过来在Upstream加Downstream。
2、Policy(策略)
即用来控制Fedration如何生效的,一个Fedration涉及到以下因素:
A、Upstream
因为是在Downstream中添加的,所以添加的时候需要指定Upstream的地址
B、应用到Exchange还是Queue(队列)
Fedration有几种类型,分别是应用到Exchange和Queue还是全部。
C、其它一些参数
这里先不介绍
RabbitMQ是用Policy来表达一个Fedration相关参数的,当然Policy还可以用来表达镜像队列控制的,这里不做介绍。
三、安装Fedration插件
首先是安装Erlang和RabbitMQ,这里不详述,注意两个版本对应上,我开发机上版本如下:
RabbitMQ:3.5.6
Erlang:18.1
启用插件:
代码语言:javascript复制./sbin/rabbitmq-plugins enable rabbitmq_management
./sbin/rabbitmq-plugins enable rabbitmq_federation
./sbin/rabbitmq-plugins enable rabbitmq_federation_management
四、创建Fedration
做实验之前,先规划下:
A、Upstream节点
172.21.107.236
B、Downstream节点
172.21.107.77
因为Fedration有分Exchange和Queue的,Queue就简单了,而Exchange分几种类型:Topic、Fanout、Direct。
为了方便实验我们只测试Topic类型的,Exchange和Queue的信息如下:
Exchange
Type:Topic
Name:oneplus_exchange
Routing Key:sms
Queue:
Name:sms
我们希望的结果是发送到236机器的oneplus_exchange这个Exchange的routing key为sms的消息都转发到77上,并且在236上不保存消息。
1、先添加Upstream
登录172.21.107.77后台:http://172.21.107.77:15672/#
点击Admin——Fedration Upstreams
添加界面如下:
Name根据命名规范自己定义,比较重要的参数如下:
URI:常用的格式如下:
amqp://{用户名}:{密码}@{ip}
以下为例子:
amqp://guest:guest@172.21.107.236
Prefetch count:和队列的这个参数一样,可以取多少条消息而不回复,和下面的Acknowledgement Mode配合使用,Acknowledgement Mode一般设为On Confirm,即确认才不重发。
其它参数先默认就OK了。
2、在Downstream上添加policy
注意是在Downstream上添加
还是在Admin标签上
重要的参数:
Pattern:匹配的正则,遵守左前缀匹配原则,如我想针对所有oneplus_exchange开头的这里就写oneplus_exchange
Apply to:应用到Exchange还是Queue还是两者都应用
我们实验是针对Exchange,所以只选Exchange。
federation-upstream:要应用到哪个Upstream上
3、在77上添加Exchange和Queue
Exchange
Queue
然后绑定Exchange和Queue:
4、在236上添加Exchange和Queue
添加之后有个灰的Exchange
注意:Exchange不要绑定Queue
Queue也有个灰色的
4、发送消息
代码语言:javascript复制$client = new Client('172.21.107.236', 5672, 'guest', 'guest');
$exchangeName = 'oneplus_exchange';
$exchangeType = 'topic';
$exchange = new Exchange($client, $exchangeName, $exchangeType);
$exchange->setDurable(true);
$routingKey = 'sms';
$message = new Message("hello" . str_repeat('123456789', 13));
$res = $exchange->publish($message, $routingKey);
var_dump($res);
执行之后到236(Upstream)上看:
再到77(Donwstream)上看:
可以看到236上队列上没消息,77上有消息了,整个过程中236相当于只是转发消息了。
五、总结
1、topic类型Exchange
发送给Upstream:
消息会复制给Downstream,如果Upstream也绑定了routing key,则Upstream也会保存1份消息。
发送给Downstream:
消息只在Downstream上有,不会复制到Upstream。
2、direct类型Exchange
发送给Upstream:
消息会复制给Downstream,如果Upstream也绑定了Queue,则Upstream也会保存1份消息。
发送给Downstream:
消息只在Downstream上有,不会复制到Upstream。
3、fanout类型Exchange
发送给Upstream:
消息会复制给Downstream,如果Upstream也绑定了Queue,则Upstream也会保存1份消息。
发送给Downstream:
消息只在Downstream上有,不会复制到Upstream。
即数据流向是单边的,只能从Upstream复制到Downstream,默认两边都会有1份,如果Upstream不想要则可以不绑定Queue。
MyBatis3使用
MyBatis源码分析一:核心组件
Codis源码分析之Slots迁移异步篇
nginx动态proxy_pass