RabbitMQ Fedration插件

2021-01-13 15:42:58 浏览数 (1)

一、Fedration插件介绍

Fedration插件用来在不同的RabbitMQ集群之间复制队列消息,集群可以是内网也可以是公网,而这些对应用来说是透明的,即应用不会感知到,也不需要编写相关代码。

设想这样一个场景:系统有2个RabbitMQ/应用集群,一个部署在中国,另一个部署在美国,系统有发送短信的功能,出于性能和数据隐私的需要,中国集群只能发送给中国的手机号发短信,而海外的只能给中国以外的手机号发短信,但你们公司有中国、美国的雇员,他们都要登录一个内部系统,其中这个内部系统需要短信登录,这样的场景就可以通过Fedration插件将相关消息复制到对方集群中,由对方集群来完成相应功能。

二、Fedration相关术语

为了表达准确,我会尽量用英文原文表达相关概念。

1、Upstream(上游)、Downstream(下游)

上面有2个RabbitMQ集群,A和B,如果我们希望每次发到A的某个Exchange或队列的信息也发送到B,则A是Upstream,B是Upstream,即数据流向是从Upstream流向Downstream的。

不过Rabbit MQ里操作比较特殊,添加Upstream要在Downstream中加,而不是反过来在Upstream加Downstream。

2、Policy(策略)

即用来控制Fedration如何生效的,一个Fedration涉及到以下因素:

A、Upstream

因为是在Downstream中添加的,所以添加的时候需要指定Upstream的地址

B、应用到Exchange还是Queue(队列)

Fedration有几种类型,分别是应用到Exchange和Queue还是全部。

C、其它一些参数

这里先不介绍

RabbitMQ是用Policy来表达一个Fedration相关参数的,当然Policy还可以用来表达镜像队列控制的,这里不做介绍。

三、安装Fedration插件

首先是安装Erlang和RabbitMQ,这里不详述,注意两个版本对应上,我开发机上版本如下:

RabbitMQ:3.5.6

Erlang:18.1

启用插件:

代码语言:javascript复制
./sbin/rabbitmq-plugins enable rabbitmq_management 
./sbin/rabbitmq-plugins enable rabbitmq_federation
./sbin/rabbitmq-plugins enable rabbitmq_federation_management

四、创建Fedration

做实验之前,先规划下:

A、Upstream节点

172.21.107.236

B、Downstream节点

172.21.107.77

因为Fedration有分Exchange和Queue的,Queue就简单了,而Exchange分几种类型:Topic、Fanout、Direct。

为了方便实验我们只测试Topic类型的,Exchange和Queue的信息如下:

Exchange

Type:Topic

Name:oneplus_exchange

Routing Key:sms

Queue:

Name:sms

我们希望的结果是发送到236机器的oneplus_exchange这个Exchange的routing key为sms的消息都转发到77上,并且在236上不保存消息。

1、先添加Upstream

登录172.21.107.77后台:http://172.21.107.77:15672/#

点击Admin——Fedration Upstreams

添加界面如下:

Name根据命名规范自己定义,比较重要的参数如下:

URI:常用的格式如下:

amqp://{用户名}:{密码}@{ip}

以下为例子:

amqp://guest:guest@172.21.107.236

Prefetch count:和队列的这个参数一样,可以取多少条消息而不回复,和下面的Acknowledgement Mode配合使用,Acknowledgement Mode一般设为On Confirm,即确认才不重发。

其它参数先默认就OK了。

2、在Downstream上添加policy

注意是在Downstream上添加

还是在Admin标签上

重要的参数:

Pattern:匹配的正则,遵守左前缀匹配原则,如我想针对所有oneplus_exchange开头的这里就写oneplus_exchange

Apply to:应用到Exchange还是Queue还是两者都应用

我们实验是针对Exchange,所以只选Exchange。

federation-upstream:要应用到哪个Upstream上

3、在77上添加Exchange和Queue

Exchange

Queue

然后绑定Exchange和Queue:

4、在236上添加Exchange和Queue

添加之后有个灰的Exchange

注意:Exchange不要绑定Queue

Queue也有个灰色的

4、发送消息

代码语言:javascript复制
$client = new Client('172.21.107.236', 5672, 'guest', 'guest');
$exchangeName = 'oneplus_exchange';
$exchangeType = 'topic';
$exchange     = new Exchange($client, $exchangeName, $exchangeType);
$exchange->setDurable(true);

$routingKey = 'sms';
$message = new Message("hello" . str_repeat('123456789', 13));
$res     = $exchange->publish($message, $routingKey);
var_dump($res);

执行之后到236(Upstream)上看:

再到77(Donwstream)上看:

可以看到236上队列上没消息,77上有消息了,整个过程中236相当于只是转发消息了。

五、总结

1、topic类型Exchange

发送给Upstream:

消息会复制给Downstream,如果Upstream也绑定了routing key,则Upstream也会保存1份消息。

发送给Downstream:

消息只在Downstream上有,不会复制到Upstream。

2、direct类型Exchange

发送给Upstream:

消息会复制给Downstream,如果Upstream也绑定了Queue,则Upstream也会保存1份消息。

发送给Downstream:

消息只在Downstream上有,不会复制到Upstream。

3、fanout类型Exchange

发送给Upstream:

消息会复制给Downstream,如果Upstream也绑定了Queue,则Upstream也会保存1份消息。

发送给Downstream:

消息只在Downstream上有,不会复制到Upstream。

即数据流向是单边的,只能从Upstream复制到Downstream,默认两边都会有1份,如果Upstream不想要则可以不绑定Queue。

MyBatis3使用

MyBatis源码分析一:核心组件

Codis源码分析之Slots迁移异步篇

nginx动态proxy_pass

0 人点赞