什么是消息中间件
是指应用程序对应用程序的一种高效可靠的消息传递的通信方法,通过提供消息传递和消息排序模型,他可以在分布式环境下扩展进程间的通信,典型的生产者和消费者的代表
消息中间件的作用
- 解耦
- 削峰
- 冗余
- 可扩展
- 可恢复
- 顺序保证
- 缓冲
- 异步通讯
RabbitMQ起源
在之前的mq都是商业实现,但是他们各自为政,各个消息中间件不能互通,因此为了打破这个壁垒,先是有了JMS,他视图通过公共的接口隐藏单独MQ产品供应商的实际接口,从而跨越了壁垒,从而解决互通问题,java应用程序只要针对JMS API接口,其他的由JMS处理就好了,但是使用单独标准化接口来适配众多不同的接口,最终还是有问题出现的,程序变得非常复杂而脆弱,
然后又提出了另外一个协议AMQP的公开标准,他面对消息的中间件设计,基于协议的客户端与消息中间间可传达消息,并不受产品和开发语言的条件限制,
最后,Rabbitmq就出现了,他是实现了AMQP协议的mq。
特性 | 描述 |
---|---|
可靠性 | 使用一些机制保证了持久化,传输确认,发布确认 |
灵活的路由 | 消息进入队列之前,可通过交换机路由消息 |
扩展性 | 多个rabbbitmq搭建一个集群 |
高可用性 | 队列可以在集群中的机器上设置镜像,是的部分节点挂了,照样使用 |
多种协议 | 除了支持原生的AMQP,还支持STOMP.MQTT |
多种语言 | 支持多种语言,Java python |
管理界面 | 后台有一个用户界面,是的用户可以监控和管理消息 |
插件机制 | 支持许多插件,实现多方面扩展,也可自定义插件 |
RabbitMQ整体模型架构
- producer,生产者,就是投递消息的一方
- consumer,消费者,就是接受消息的一方
- broker,消息中间件的服务节点,简单看做一个RabbitMQ服务节点
- queue,队列,用于存储消息,
- exchange,交换机,生产者将消息发到交换器中,由交换器去分发消息。
- Routingkey,路由键,用于指定消息的路由规则
- Bindingkey,用于把交换机和队列关联起来
交换机分为几类
- fanout 他会把所有发送给交换机的消息路由到所有与交换器绑定的队列中
- direct 他会把消息发送到路由键和绑定键完全匹配的队列中
- topic 他会根据路由键和绑定建进行模糊匹配,把消息发送给匹配成功的队列
- headers 不依赖路由键和绑定键规则来路由消息,而是根据消息的headers属性进行匹配,当发送消息到交换器时候,rabbitmq会把该消息的headers(一对键值对的形式)和路由键和交换器绑定指定的键值对进行匹配,如果匹配则消息路由到该队列,否则不会路由到队列,性能很差,几乎没有使用
rabbitmq的队列可以自动创建,有谁创建呢
如果队列不存在,消费者没有消息消费,生产者会丢失消息,因此两者都要进行创建队列,但是已经存在了队列,不管是谁二次创建,都不会影响队列的属性。
Rabbitmq有哪些方式可以是不可达的消息返回给生产者的方式
- 发布消息的参数设置mandatory为true,当交换器根据自身的类型无法匹配具体的队列,则就会返回给生产者,这个时候生产者可以调用channel.addReturnListener添加RenturnListener监听器实现获取没有正确路由的消息
- 当设置inmediate参数为true,当消息发现路由匹配的队列都没有一个消费者,则这条消息将不会进入队列,否则直接投递,但是此参数已经不再使用建议使用
什么是备用交换器
当我们不想使用上面说的消息没有被路由的情况下,可以使用备用交换器再去处理这些消息,也叫备胎交换器,在声明交换器的时候添加alternate-exchange参数
代码语言:javascript复制
Map<String,Object> args=new HashMap();
args.put("alternate-exchange","MyAge");
channel.exchangeDeclare("normalExchange","direct",true,false,args)
TTL过期时间
消息的过期时间
设置过期有两种方式一种是队列属性设置,一种是每条消息设置,实现方式如下
代码语言:javascript复制
//声明队列的时候设置
Map<String,Object> args=new HashMap();
args.put("x-message-ttl",60000);
channel.queueDeclare(queueName,durable,exclusive,autoDelete,args)
//针对每条消息设置
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);//持久化
builder.expiration("60000")
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange.persistent", "persistent",properties, "persistent_test_message".getBytes());
队列中不设置消息过期时间,说明消息不会过期,如果设置为0,则表示此时可以直接将消息投递到消费者,否则该消息将会立刻丢弃,
如果两者都设置,则取较小的设置,对于队列的设置ttl的方式一旦消息过期,就会从队列中删除,第二种是即使消息过期,也不会立刻删除,因为每条消息的过期是在即将投递的时候判定的(这是因为如果不这样做,就会扫描整个队列,性能比较差)
队列的过期时间
在声明队列中使用x-expires设置
代码语言:javascript复制
Map<String,Object> args=new HashMap();
args.put("x-expires",180000);
channel.queueDeclare(queueName,false,false,false,args)
控制队列在被自动删除之前没有任何消费者,队列没有被重新声明,并且在过期时间段内未调用过Basic.get命令,则达到过期时间的时候就会被删除。不能设置为0.
死信队列
DLX(Dead-Letter-Exchange),称之为死信交换器,死信邮箱,当一个消息在队列中成为死信之后,就会被重新发型到一个交换器中,这个交换器就是死信交换器,和他绑定的队列就是死信队列,成为死信队列的原因有三点
- 消息过期
- 消息被拒绝,且设置requeue参数为false(不在回到队列)
- 队列达到最大长度
channel.exchangeDeclare("exchange.dlx","direct",true)
channel.exchangeDeclare("exchange.normal","fanout",true)
Map<String,Object> map =new HashMap<>();
map.put("x-message-ttl",60000);
map.put.("x-dead-letter-exchange","exchange.dlx")
map.put.("x-dead-letter-routing-key","routingkey")
channel.queueDeclare("queue.normal",true,false,false,args)
channel.queueBind("queue.normal","exchange.normal","")
channel.queueDeclare("queue.dlx",true,false,false,args)
channel.queueBind("queue.dlx","exchange.dlx","")
channel.basicPublish("exchange.normal","rk",MessageProperties.PERSISTENT_TEXT_PLAIN,
"dlx".getBytes())
- 生产者发送一条消息到交换器中
- 消息直接路由到队列中
- 由于队列中的消息设置了过期时间,
- 消息过期的时候,就会路由到死信交换器中
- 死信交换器在路由到死信队列中
其实我们可以发现死信队列不仅仅可以处理异常情况下,没有处理的消息,而且还可以做一个延迟队列的机制。
延迟队列
延迟队列使用场景
- 在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟没有支付就按照异常处理
- 用户希望通过自己的手机控制家里的智能设备在指定时间工作
根据不同需求,消息分别被路由到不同的延迟队列中,每个队列设置了不同的过期时间,当消息过期的时候,就会把消息路由到队列绑定的死信交换器,然后由死信队列进行消费,从而达到延迟的效果
持久化
如果我们把交换器和队列和消息都设置了持久化是否能保证消息百分百不丢失
但是否定的
- 消费者在消费消息的时候,此时不幸的是服务宕机了,这个时候还没有处理,这条消息就会丢失,
- 当生产者把消息发送给rabbitmq之后,还有需要有一段时间存入磁盘,如果这段时间内rabbitmq服务节点宕机,重启,就会丢失
如何解决上面问题呢
- 消费者丢失消息,可以设置手动确认机制
- 生产者丢失消息,可以引入镜像队列或者引入事务机制,或发送方确认机制保证消息不丢失
生产者确认机制
我们之前使用持久化是因为服务器异常崩溃导致的消息丢失,但是当消息还没有进入服务器的时候,服务宕机,就会导致消息丢失,因此rabbitmq使用两种方式实现消息不丢失
- 通过事务机制
- 通过发送发确认机制
事务机制相关方法有三个
代码语言:javascript复制
channel.txselect,将当前信道设置为事务模式
channel.txcommit,提交事务
channel.txRollback,回滚事务
代码语言:javascript复制try{
channel.txselect()
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,Messagepropertis.
Persistent_text_plain,"tansaction message".getBytes())
channel.txcommit
}catch(Exception e){
channel.txRollback();
}
- 客户端发送tx.select设置信道为事务模式
- broker恢复tx.select-ok,确认信道设置为事务模式
- 发送消息之后,客户发送提交事物commit
- broker回复tx.commit-ok,确认事务提交
- 但是如果发生异常,就会捕获异常
- 捕获异常的同时提交事物回滚
- broker回复事物回滚成功
发送方确认,有三种方式处理
- 普通的confirm
- 批量的confirm
- 异步的confirm
普通的confirm
代码语言:javascript复制try{
channel.confirmselect()//设置信道为publish confirm模式
//正常发消息
channel.basicPublish("exchange","routionkey",null,"message".getbytes())
//等待服务放确认
if(chanel.waitForConfirms()){
System.out.println("send messae fail")
}
}catch(InterruptedException e){
e.printStackRrace();
}
生产则调用channeL.confirmselect方法设置信道为confirm模式,之后rabbitmq返回confirm.select-ok命令表示成功将信道设置为confirm,所有被发送的消息都会被ack或nack一次。不会出现消息级被ack也被nack
如果想要发送多条消息,可以进行for循环,就可以了,不过不需要把confrm.confirmselec包在循环体内,
注意事项
- 事务机制和确认机制互斥,否则报错
- 事务机制和确认机制确保的是消息能够正确的发送rabbitmq中,这里指的是交换机,如果交换器没有绑定队列那么消息也会丢失。
批量confirm
confirm的优势是,并不一定需要同步确认,我们每发送一批数据之后,在调用channel.waitForConfirms方法等待服务器确认返回
代码语言:javascript复制try{
channel.confirmselect();
int msgcount=0;
while(true){
channel.basicPublish("exchange","routingkey",null,"messge".getBytes())
if( msgcount>Batch_count){
msgcount=0;
try{
if(channel.waitForConfirms()){
//将缓存中的消息清空
continue;
}
//缓存中的消息重新放
}catch(Exception e){
e.printExceptionTrace()
//缓存中的消息重新放
}
}
}
}catch(Exception e){
e.printExceptionTrace()
}
程序需要定量发送,然后调用waitForConfirms来等待rabbitmq确认返回,相比普通的confirm性能得到了提高,但是如果失败或拒绝就需要批量的消息重新发送,会带来重复消息数量,当消息经常丢失的场景,这种的性能也不是很高。
异步confirm
客户端channel接口提供了addConfirmListener,他可以添加一个confirmListener回调,这个ConfirmListener接口提供了两个接口handleAck和handleNack,分别处理Basic.ack和Basic.nack.
两个接口的都有一个参数deliveryTag(标记消息的唯一序列号),当然我要维护一个集合,建议是首映有序集合,当每发送一条消息,这个集合元素加1,每当代用handleack或handleNack的方法时候,集合中删除一条(multiple=false)或者多条(multiple=true),
代码语言:javascript复制channel.confirmselect()
channel.addConfirmListener(new ConfirmListener(){
public void handleAck(long deliveryTag,boolean multiple){
if(multiple){
confirmset.headSet(deliveryTag 1).clear;
}else{
confirmset.remove(deliveryTag);
}
}
public void handleNack(long deliveryTag,boolean multiple){
if(multiple){
confirmset.headSet(deliveryTag 1).clear;
}else{
confirmset.remove(deliveryTag);
}
//这里重新发送消息场景
}
})
//循环发送消息
while(true){
long nextSeqNo=channel.getNextPublishSeqNo();
channel.basicPublish(ConfrimConfig,exchangeName,ConfirmConfig.routingKey
MessageProperties.PERSISTENT_TEXT_PLAIN,
ConfirmConfig.msg.getBytes());
confirmset.add(nextSeqNo)
}
消息分发
当队列有多个消费者的时候,队列根据轮询的方式进行分发给消费者,但是如果现在负载过重,需要更多消费者消费的时候,此时很多时候轮询的方式导致有些消费者可能负载过重,有些消费者空闲,这样就会导致整体应用的吞吐量下降
此时我们可以用channel.basicQos,限制每个消费者的消费大小,例如设置为5,当消费者接受到的消息达到5条的时候,就不会想消费者继续分发消息,此时当消费者消费一条之后,继续分发一条,这中机制就相当于一个滑动窗口,注意的是basicQos对拉模式不生效
代码语言:javascript复制
basicQos(int prefetchCount)
basicQos(int prefetchCount,boolena global)
basicQos(int prefetchSize,int prefetchCount,boolean global
prefetchCount:设置预期的上限
prefetchSize:消费者所能接收未确认消息的总体大小的上限
global:true代表信道上所有消费者遵从prefetchCount的上限,false,信道上新的消费者遵从prefetchCount的上限
代码语言:javascript复制
Consumer consumer1= ...
Consumer consumer2= ...
channel.basicQos(3,flase);//每个消费这的最大限制
channel.basicQos(5,true);//每个信道的最大限制
channel.basicConsume(queue1,false,consumer1);
channel.basicConsume(queue2,false,consumer2);
上面则代表,每个消费者最多消费3个未确认的消息,两个消费者能收到未确认的消息个数和的上限是5.
消息的传输保障
- at most once,最多一次,消息可能丢失,但绝不会重复
- at least once,最少一次,消息不会丢失,但是户重复
- Exactly once,恰好一次,每条消息肯定会被传输一次且仅传输一次
rabbitmq支持最多一次,和最少一次,其中最少一次要考虑下面
- 消息生产者需要开启事务机制,confirm机制,以确保消息可以可靠的传输到rabbitmq
- 消息生产者需要配合参数mandatory或者备份交换器,保证消息在路由到队列中保证不会丢弃
- 消息和队列都进行持久化,保证rabbitmq服务器在遇到异常情况下不会造成消息丢失
- 消费者消费消息的同时需要设置参数autoACK=false,通过手动确认消息正确消费,避免在消费端引起必须要的消息丢失
而最多一次的方式,不需要考虑上面那些方面,生产者和消费者随意处理
恰好一次,rabbitmq是无法保证的,因此我们处理重复的消息需要在业务客户端进行处理,比如引入GUID的概念,可以让消息本省就具备幂等性,或借助redis去重处理。
vhost
每一个rabbitMQ服务器都能创建虚拟的消息服务器,我们叫做虚拟主机,简称vhost,每个vhost本质上是一个独立的rabbitmq服务器,拥有独立的队列,交换器和绑定关系,且拥有自己的权限,vhost就想虚拟机与物理机服务器一样,他在各个实例提供逻辑上的分离,为不同程序安全保密的运行数据,将多个客户区分开,有可以避免队列和交换器名字冲突,vhost之间是绝对的隔离。
简单的命令
代码语言:javascript复制
rabbitmqctl add_vhost vhost1 建立新的vhost
rabbitmqctl list_vhosts [name tracing]罗列当前vhost的相关信息
rabbitmqctl set_permissions [-p vhost] {user} 为指定用户授予权限
rabbitmqctl clear_permissions [-p vhost1] {user} 为执行用户删除权限
rabbitmqctl add_user {username}{password} 添加用户名和密码
rabbitmqctl change_password {username}{password} 修改指定用户的密码
rabbitmqctl list_users 罗列当前的所有用户
rabbitmqctl stop [pid_file] 停止运行rabbitmq的Erlang和Rabbitmq服务应用
rabbitmqctl shutdown 停止运行rabbitmq的Erlang和Rabbitmq服务应用,和上一个区别就是不需要
指定pid_file而可以阻塞等待指定进程的关闭
rabbitmqctl stop_app 停止rabbitmq服务应用,但是Erlang虚拟机还是处于运行状态
rabbitmqctl start_app 启动rabbitmq应用
rabbitmqctl reset 将节点重置还远最初状态
rabbitmqctl join_cluster {cluster_node} [--ram] 将节点加入指定集群
rabbitmqctl cluster_status 显示集群状态
rabbitmqctl forget_cluster_node 将节点从集群中删除允许离线执行
rabbitmq节点有几种类型,分别有啥作用
节点类型分为两种,一种是磁盘节点(disc)一种是内存节点(ram)
代码语言:javascript复制[{nodes},[{disc,[rabbit@rabbit@node1]},{ram,[rabbit@rabbit@node2]}]]
不管是单一节点还是集群节点,要么是内存节点,要么是磁盘节点,内存节点将所有队列,交换器,绑定关系,用户,权限存在内存中,磁盘节点则将这些信息放到磁盘中,单节点必然是磁盘节点,否则重新启动之后,系统配置的信息都会丢失,而在集群中可以配置部分节点为内存节点,可以获得更好的性能,rabbitmq只要求集群至少有一个磁盘节点,其他节点可以是内存节点,从而节点离开集群,他们必须将变更通知至少一个磁盘节点。
如果集群中只有一个磁盘节点,而不幸的是此节点也崩溃了,此时集群可以中产发送或接受消息,但是不能创建队列和交换器,绑定关系,用户等信息,因此建议生产环境保证有两个以上的磁盘节点。
rabbitmq日志
如果rabbitmq服务出现了异常,此时日志就起到了关键的作用,rabbitmq日志默认的存放的目录在$RABBITMQ_HOMT/var/log/rabbitmq文件内,这文件夹里放了两个日志文件,RABBITMQ_NODENAME-sasl.log和RABBITMQ_NODENAME.log.
RABBITMQ_NODENAME-sasl.log存放的是Erlang相关信息,比如Erlang崩溃的报告,有助于调式无法启动的rabbitmq
RABBITMQ_NODENAME.log存放的是应用服务的日志
当我们要查看日志的时候我们经常使用下面命令
代码语言:javascript复制tail -f $RABBITMQ_HOME/var/log/rabbitmq/rabbit@$HOSTNAME.log -n 200
但是如果rabbitmq是持久性运行,打印了太多日志,我们可以使用切分日志,轮换日志,如下面命令
代码语言:javascript复制
rabbitmqctl rotate_logs {suffix}
比如使用下面命令
rabbitmqctl rotate_logs .bak
我们再次查看日志
ls -al
rabbit@node1.log
rabbit@node1.log.bak
之后看到日志目录会新建日志文件,并且将老的日志文件添加.bak
集群迁移
集群迁移使用来解决集群故障短时间内不能修复的情况,此时就要把所有的数据,客户端连接等迁移到新的集群中,以确保集群的可用性。
集群的迁移包括元数据重建,数据迁移,以及客户端连接切换
元数据重建
我们可以通过手动创建或者客户端创建,但是元数据的重建对于手工是一件分厂繁琐的事情,因此我们可以使用web管理界面,从旧的集群中导出元数据,然后在把元数据导入新的集群,
但是,使用web界面导入元数据,要考虑三件事情,
- 元集群突发故障或者rabbitmq management插件节点机器不可用,就无法导出元数据,因此我们可以周期性的将新的metadata.json进行备份,这样再次遇到就可以直接拿到新的元数据
- rabbitmq版本不一致的情况,存在数据不兼容的情况,一般情况下,高版本的可以向下兼容,如果是低版本的向高版本迁移,小问题可以手动处理,如密码加密方式改变,我可以手动处理,还有就是对于队列,交换器,绑定关系的数据,我们可以手动复制到新的元数据文件中,然后重新建立元数据
- 如果按照上面方式重建元数据,会导致所有的元数据只会落到集群的一个节点,其他节点处于空闲状态,此时我们只能通过程序解析低版本的元数据json,然后重新创建
数据迁移和客户端连接的切换
元数据的重建是一个前期准备工作,而数据迁移和客户端连接切换才是主要工作.
首先就是把生产者的客户端和元集群断开连接,然后在于新的集群建立连接,这样就可把新的消息介入到新的集群
然后就是等待原集群的未处理的消息,处理完之后,在建立新集群的消费者连接,但是如果原集群已经不可用,此时就要立刻把消息切换到新的集群,而原集群的未处理的消息需要进一步处理,如果能修复,后期在将数据迁移到新集群,如果修复不了,数据就会丢失.
自动化迁移
要实现自动化迁移,需要在使用相关资源时候就做好准备,方便在迁移过程中无缝切换,生产者和消费者要能及时的感知到交换器,队列,集群的信息,为此我们就可以把相关信息放到zookeeper,正如上图所示,主要三个组件集群,zookeeper,以及生产者和消费者客户端,当我们建立连接的时候,具体的操作如下
- 首先在zookeeper建立相应的元数据,如交换器和队列以及集群信息
- 此时生产者客户端连接zookeeper,然后根据生产者对应的交换器找到zookeeper中的交换器节点,读取对应的集群1,同时添加监听watch
- 然后在根据集群名称1找到zookeeper找到集群节点对应的集群信息,如果ip列表,此时我们就获取生产者的所有的连接信息,接下来就可以发送消息了
- 此时消费者根据指定的队列在zookeeper中找到队列节点,继而找到集群节点,最终找到所需的所有连接信息,同时要对队列建立监听watch.
此时要进行集群迁移,我们要把cluster1迁移到cluster2中,首先我们需要在clusrer2中建立元数据,然后修改zk中的channel和queue的元数据信息,比如将zookeeper中的队列和交换器中的cluster=cluster1修改成cluser=cluster2,此时客户端就可以感知到变化,然后迅速关闭当前连接之后再与新集群cluser2建立连接,此时生产者和消费者就可以正常在集群中消费消息和生产消息,需要考虑的是如果元集群中的消息没有消费完成,此时就要使用数据迁移方式进行同步数据到集群cluster2中,
当然上面方式是建立在有空闲集群的情况下,如果给每个集群都配置一个空闲集群其实是一种浪费,因此我可以使用下面方式进行备份,如下图
如上四个集群,两两进行备份
元数据管理和监控
试想如果我们的生产环境,别开发不小心的删除了队列,导致消息无法消费,对于核心的业务场景是一个不可估量的影响,因此对于消息中间件的元数据的修改是要进行监控
首先我们可以对绝大部分人进行授权仅仅是读写权限,这样就可以减少风险,但是也不能完全的避免超级用户不小心操作,导致元数据的更改,因此我们要对元数据进一步监控
我们每次修改元数据要进行申请,之后在数据库和集群中建立元数据,在数据库和集群之间有一个校验元数据一致性的系统,如果存在不一直,就会通知监控系统,然后通知到管理员,管理员可以手动的修改或者删除元数据,这样就可以及时的知道系统是否被人修改,从而尽快处理,防止出现灾难性问题。
Federation
Federation插件设计的目的就是使rabbitmq在不同的broker节点之间进行消息传递而无须建立集群,该功能很多场景都非常有用
- Federation插件可以在不同管理域(可能设置了不同的用户和vhost,也可能运行在不同版本Rabbitmq和Erlang上)中的Broker或者集群之间传递消息
- Federation插件基于AMQP 0-9-1协议在不同的Broker之间进行通讯,并设计成能够容忍部位定的网络连接情况
- 一份Broker节点中可以同时存在联邦交换器或队列,或本地交换器和本地队列,只要在特定的交换器或队列创建Federation link
- FederationB不需要在N个Broker节点之间创建0(N^2)个连接。意味着Feferation在使用时更容易扩展
联邦交换器
现在有一个广州的业务ClientA需要连接广州broker3,并向交换器exchageA发送消息,此时的网络延迟小,但是当北京ClientB要连接广州broker3发送消息,但是北京和广州举例远,延迟比较严重,此时我如何优化呢,如下图
如上图,我们在broker3的exchangeA和broker1之间建立一条连接Federation link,此时Federation插件会在broker1建立一个同名的exchangeA,同时会建立一个内部交换器,名字为excahngeA->broker3B(broker3的集群名,也可以进行修改),此时可以把broker1中的exchangeA和内部交换器进行绑定,同时Federatin插件也会在broker1北京建立一份队列federation:exchangeA->boker3B,
同时Federation插件也会把broker1中的Federation:exchangeA->broker3B和broker3建立一条AMQP连接来实时消费队列federation:excahngeA->broker3b的消息。
此时我们如果北京的clientB连接broker1并向exchangeA发送消息,这样ClientB发送完消息进行确认,同时Federation link会转发消息到ExchangeA,最终消息会路由到queueA,完美的解决了网络延迟的问题
联邦队列
联邦队列可在多个broker节点之前为单个队列提供负载的功能
如上图,队列queue1和queue2原本在broker2中,因为某种需求将其配置成联邦队列,并将broker1建立为upstream,Federation插件会在broker1建立同名的队列queue1和queue2,同时和broker2建立link,当一个clientA连接broker2并消费队列中的消息,但是当clientA消费完了本队列消息之后,Federation link会拉取broker1拉取消息到本地,再由消费者clientA进行消费。
消费者即可以消费broker2中的队列,又可以消费broker1中的队列,Federation的这种分布式队列的部署可以提升单个队列,如果broker1中队列的消息来不及消费,那么broker2一端部署的消费者可以为其消费,达到某种意义上的负载均衡。还需要注意的是,当broker2中的队列消息消费完之后,不能使用Basic.get进行消费broker1消息,因为Basic.get是一个异步方法,如果要从broker1中的队列拉去消息,必须等待阻塞通过Federation link拉取消息存入broker2中的队列之后在消费消息.所以federatin queue而言只能使用Basic.consume进行消费.
注意的是联邦队列没有传递性如下图
queue1和queue2建立联邦,而queue2和queue3建立联邦,但是queue1和queue3没有任何关系。且与联邦交换器不一样,一条消息可以在联邦队列中转发无限次,
Shovel
和Federation具备同样的功能就是数据的转发,Shovel能够可靠的持续的从一个broker中的队列(源端)拉取数据并转发到另外一份broker中的交换器(目的端),作为源端的队列和目的端的交换器可以在一个broker中,也可以在不同的broker
- 松耦合,可以移动位于不同管理域中的broker的消息这些broker可以包含不同的用户和vhost,也可以是不同的rabbitmq版本和erlang版本
- 支持广域网,Shovel同样被设计成基于AMQP协议在不同的broker之间进行通信,可以容忍时断时续的网络,保证消息可靠性
- 高度订制,Shovel连接成功之后,可以进行配置以执行相关的AMQP命令。
如上图,有两个broker,broker1位于192.168.1.1,broker2位于192.168.1.2,exchange1和queue通过rk1进行绑定,excahnge2和queue2通过rk2进行绑定,此时在queue1和exchange2之间用shovel进行连接,
当客户端发送消息到exchange1会路由到queue1队列,最后在流转到exchange2中,最后转到queue2队列中,如果Shovel设置了add_forward_headers参数为true,流转的消息headers会有特殊标记
其实我们也可以直接把queue2当做目的端,虽然看起来时直接路由消息到queue2中,其实是通过默认的交换器进行转发的。
当然我也可以直接配置交换器就是源端也是可行,实际上是在exchange1是通过shovel建立一个队列,并绑定了exchange1,然后通过shovel拉取消息进而转发到交换器exchange2,
shovel可以为源端和目的端配置多个broker地址。这样即使源端或目的端失效,也可以重新连接其他broker,可以设置参数reconnect_delay参数以避免由于重连行为导致的网络泛红,或者可以重连失败后直接停止连接,
在生产环境中,消息堆积是一件很正常的事情我就可以使用shovel解决,缓解消息堆积严重的集群,当集群的消息有所缓解就可以再把消息转发到原集群。
存储机制
不管是持久化的消息还是非持久化的消息都可以落入磁盘,如果是持久化的消息会直接落入磁盘,非持久化的消息会在内存吃紧的情况下,进入磁盘,
持久化是一个逻辑上的概念, 他可以分为两部分,一部分是队列索引(rabbit_queue_index)和消息存储(rabbit_msg_store)
rabbit_queue_index,负责维护队列落盘消息的消息,包括消息的存储地点,是否已经交付给消费者,是否已经被消费者ack等,每个队列对应一个队列索引
rabbit_msg_store,还可以分为两部分,msg_store_persistent和msg_store_tansient,msg_store_presistent负责持久化消息的持久化,msg_store_tansient负责非持久化消息的持久化,重启之后消息丢失。
rabbit_queue_index存储较大消息,这个消息大小可以使用queue_index_embed_msgs_below配置,默认4096B,他是以顺序的段文件来进行存储,后缀为.idx,每个段文件存储的消息条数为16384,使用参数SEGMENT_ENTRY_COUNT设置,
rabbit_msg_store处理的消息会以追加的方式记录在文件中,当文件大小超过指定的大小,会重新建立一个文件,重新写入,文件名以.rdq从0开始累加,且在记录消息存储的时候rabbitmq会在ETS表中记录消息的在文件中的位置映射和文件的先关信息。
队列的结构
通常队列有两部分组成,rabbit_amqqueue_process和backing_queue,rabiit_amqqueue_process负责协议相关的消息处理,即接受生产的发布的消息,小消费者交付消息,处理消息的确认,backing_queue是消息存储的形式与引擎,向rabbit_amqqueue_process提供相关接口。
如果消息投递的队列是空队列,并且消费者订阅了此队列,则消息不需要经过队列直接交给消费者,如果消息无法投递给消费者,则消息会在队列中存储,而消息会随着系统的负载转换不通的状态
- alpha,消息内容和消息索引存储在内存
- beta,消息内容存在磁盘,消息索引保存在内存中
- gamma,消息内容保存在磁盘中,消息索引在磁盘和内存中都会存在
- delta,消息内容和消息索引都会在磁盘中
rabbitmq在运行的时候会计算当期内存能够保存的最大消息数量,如果alpha状态消息大于最大数量的值(target_ram_count),此时就会因为消息的转态转移,多余的消息会变成beta,gamma,delta不同的状态alpha状态最消耗内存,但是少消耗CPU,delta基本不消耗内存但消耗Io和cpu,delta状态的消息需要两次Io操作才会拿到消息,一次是拿到消息索引,第二次才会拿到消息内容.beta和gamma一次Io就可以拿到消息
一般情况下,消息会按照Q1-Q2-delta-Q3-Q4进行流转,但是这个不一定都是这样,还是要根据系统的负载,如果负载很高就会把一部分消息保存在磁盘节省内存空间,如果负载低,就会重新回到内存中,使整个队列有很好的弹性。
惰性队列
此队列是在3.6.0版本引入的,是为了尽可能的将消息存入磁盘中,而在消费者消费到消息的时候才会把消息加载到内存中,他的目的就是是队列尽可能的存储更多到的消息
一般情况下,消息尽可能的放到内存,这样可以加快将消息发送给消费者,即使持久化的消息,也是把一部分备份在内存中,当rabbitmq在释放内存你的时候,会把内存中的消息置换到磁盘,此过程是一个非常耗时的过程,会导致阻塞队列进而无法接受新的消息,
不管是持久化消息还是非持久化消息都会进入磁盘,但是如果是非持久化消息在重启之后依然会丢失,如果是持久化消息他的Io操作是无法避免,因此持久化消息和惰性队列是最佳的拍档,
惰性队列和普通队列相比,只有很小的内存开销,例如我发送一千万消息,每条消息1KB,并且此时没有任何消费者,那么普通队列消耗的内存你是1.2G,而惰性队列只消耗1.5M内存,
如果普通队列要转变成惰性队列,那么我们需要忍受同样的性能消耗,首先需要把内存中的消息换页至磁盘,然后才能接受消息,反之,将一个惰性队列转变成普通队列,会批量的把消息导入到内存中.
流控
是在2.8.0版本引入的,是防止生产速度过快导致服务器难以支撑,一旦触发会阻塞集群中所有的Connection,
Erlang进程之间不共享内存是通过消息传递进行通信,每个进程有一个自己的进程邮箱,默认是没有对进程邮箱大小进行限制,所以当大量消息持续发送某个进程,会导致该进程邮箱过大,最终导致内存溢出并崩溃,因此没有流控的支持,很快就会达到内存的阀码,
流控是基于信用证算法实现的,他是通过监控进程邮箱,当某个进程负载过高而来不及处理消息时候,这个进程的进程有效就会堆积消息,当堆积到一定程度,就会阻塞而不接受上游的新消息,从而慢慢的上游进程也会堆积消息,最后导致负责网络数据包接收的进程阻塞而暂停接收新的数据.
上图就是信用证算法,如processB ,其中{{credit_from,C},value}代表向process C能发送多少消息,每发一条就减1,直到为0,就不会再发送消息给processC,{{credit_to,A},value}代表的是在接受多少条就向processA发送增加credit值的通知,process A接收到通知后,就会增加对应value的值,继续发送消息,
当上游的发送速度大于下游的接受速度,最后就会导致信用值用完,此时进程就会阻塞,一直阻塞传递到上游,当下游通知上游增加信用值的时候,此时上游进程处于阻塞状态将会接触,然后一个个传递到能够解除最上游的阻塞状态。
流控不仅仅可以作用在connection而且会作用于信道channel,队列,对于上图,形成一个流控链,对于处于整个流控链中的任意进程,只要改该进程阻塞,上游进程必定被阻塞,即某个进程达到性能瓶颈就会导致上游阻塞,所以我们可以利用这个机制找到瓶颈之所在。
镜像队列
如果rabbitmq集群只有一个broker,当这个broker宕机的时候,服务就会临时不可用,并且可能导致消息丢失,虽然我们可以使用消息和队列持久化,但是由于缓存的原因,消息在放之后和别写入磁盘中还有一段时间坑产生问题,即使可以通过publisher confirm机制知道确保客户端知道哪些消息已经入磁盘,尽管如此,我们也不希望因单节点故障导致服务不可用,、
如果rabbitmq集群是多个broker,那么服务的整体可用性对于单节点是有弹性的,但是我们要注意的是,尽管交换器和绑定关系能够在单节点中不会出现问题,但是队列和其上的消息却不行,这是因为队列进程及内存仅仅维护在单个节点上,所以一个节点的失效表现队列不可用。
因此镜像队列就是为了解决上面问题
镜像队列就是在集群的其他broker节点也有一个同样的队列,当集群的一个节点失效,队列会自动切换到镜像队列的另外一个节点保证服务可用性,
slave会准确的按照master执行命令的命令进行动作,因此master和slave的转态是一只的,当master失效的时候,会选举出一个最老的slave成为新的master,发送到镜像队列的消息会同时发送到master和slave,如果master挂了,消息还会在slave上,提升为新的master,消息依然不会丢失,主要的是除了发送消息外的所有动作都会向master发送,然后又master将命令执行结果同步给slave.
镜像队列的backing_queuq比较特殊,其实现并非是rabbit_variable_queuq,他内部包含了普通backing_queue进行本地消息消息持久化处理,而且增加了将消息和ack复制到所有镜像的功能,
所有对rabbit_mirror_queue_master的操作都会通过GM的方式同步给各个slave,GM负责消息的广播,rabbit_mirror_queue_slave负责回调处理,而master回调通过coordinator负责完成,正如前面说的,除了发布消息Basic.Publish,其他所有的操作都由master完成,然后将结果通过GM同步给其他slave.slave收到消息后,通过回调交由rabbit_mirror_queuq_slave进行实际的处理,