Rabbit MQ可以直连队列吗
生产者和消费者使用相同的参数声明队列。重复声明不会改变队列
代码语言:javascript复制//生产者
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送10条消息,依次在消息后面附加1-10个点
for (int i = 6; i > 0; i--){
String message = "helloworld";
channel.basicPublish("", QUEUE_NAME,null, message.getBytes());
}
//消费者
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定消费队列
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
doWork(message);
}
channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments) ; queue:队列名字 durable:队列持久化标志,true为持久化队列 exclusive:exclusive:排他队列,仅对创建的链接可见、链接中的channel都可见,其他链接不能重复声明,链接关闭队列会被自动删除 autoDelete:自动删除,如果该队列没有任何订阅的消>费者的话,该队列会被自动删除。这种队列适用于临时队列。 arguments:Map类型,队列参数设置 x-message-ttl:数字,消息队列中消息的存活时间,超过会被删除x-expires:数字,队列自身的空闲存活时间,指定时间内没有被访问,就会被删除 x-max-length和x-max-length-bytes:队列最大长度和空间,超出会删除老的数据 x-dead-letter-exchange和x-dead-letter-routing-key:设置死信 x-max-priority:队列支持的优先级别,需要生产者在发送消息时指定,消息按照优先级从高到底分发给消费者 channel.basicPublish(exchange, routingKey, mandatory, immediate,basicProperties, body); exchange: 交换机名 routingKey: 路由键 mandatory:为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者, channel.addReturnListener添加一个监听器,当broker执行basic.return方法时,会回调handleReturn方法,这样就可以处理变为死信的消息了;设为false时,出现上述情形broker会直接将消息扔掉; immediate: 3.0以前这个标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如 果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。3.0之后取消了该 参数 basicProperties:消息的详细属性,优先级别、持久化、到期时间等,headers类型的exchange要用到的是其中的headers字段。 body:消息实体,字节数组。 QueueingConsumer:一个已经实现好了的Consumer,相比于自己实现Consumer接口,这是个比较安全快捷的方式。该类基于jdk的BlockingQueue实现,handleDelivery方法中将收到的消息封装成Delivery对象,并存放到BlockingQueue中,这相当于消费者本地存放了一个消息缓存队列。nextDelivery()方法底层调用的BlockingQueue的阻塞方法take()。channel.basicConsume(queue, autoAck, consumer); queue:队列名。 autoAck:自动应答标志,true为自动应答。 consumer:消费者对象,可以自己实现Consumer接口,建议使用QueueingConsumer。
RabbitMQ的持久化机制
- 交换机持久化:exchange_declare创建交互机时通过参数指定
- 队列持久化:queue_declare创建队列时通过参数指定
- 消息持久化:new AMQPMessage创建消息时通过参数指定
append的方式写文件,会根据大小自动生成新的文件,rabbitmq启动时会创建两个进程,一个负责持久化消息的存储,另一个负责非持久化消息的存储(内存不够时)
消息存储时会在ets表中记录消息在文件中的映射以及相关信息(包括id、偏移量,有效数据,左边文件,右边文件),消息读取时根据该信息到文件中读取、同时更新信息
消息删除时只从ets删除,变为垃圾数据,当垃圾数据超出比例(默认50%),并且文件数达到3个,触发垃圾回收,锁定左右两个文件,整理左边文件有效数据、将右边文件有效数据写入左边,更新文件信息,删除右边,完成合并。当一个文件的有用数据等于0时,删除该文件。
写入文件前先写buffer缓冲区,如果buffer已满,则写入文件(此时只是操作系统的页存)每隔25ms刷一次磁盘,不管buffer满没满,都将buffer和页存中的数据落盘每次消息写入后,如果没有后续写入请求,则直接刷盘
RabbitMQ事务消息
通过对信道设置实现
- channel.txSelect();通知服务器开启事务模式;服务端会返回Tx.Select-Ok
- channel.basicPublish;发送消息,可以是多条,可以是消费消息提交ack
- channel.txCommit()提交事务;
- channel.txRollback()回滚事务;
消费者使用事务:
- autoAck=false,手动提交ack,以事务提交或回滚为准;
- autoAck=true,不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把消息移除了
如果其中任意一个环节出现问题,就会抛出IoException异常,用户可以拦截异常进行事务回滚,或决定要不要重复消息。事务消息会降低rabbitmq的性能.
RabbitMQ如何保证消息的可靠性传输
死信消息:
- 消息被消费方否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为 false 。
- 消息在队列的存活时间超过设置的TTL时间。
- 消息队列的消息数量已经超过最大队列长度。
那么该消息将成为死信消息。如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃
为每个需要使用死信的业务队列配置一个死信交换机,同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的routeKey,死信队列只不过是绑定在死信交换机上的队列,死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】
TTL:一条消息或者该队列中的所有消息的最大存活时间
如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用。只需要消费者一直消费死信队列里的消息
代码语言:javascript复制agruments.put("x-dead-letter-exchange", "dlx.exchange");
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
RabbitMQ的普通集群原理
image.png
元数据:
- 队列元数据:队列名称和它的属性
- 交换器元数据:交换器名称、类型和属性
- 绑定元数据:一张简单的表格展示了如何将消息路由到队列
- vhost元数据:为vhost内的队列、交换器和绑定提供命名空间和安全属性
为什么只同步元数据:
- 存储空间,每一个节点都保存全量数据,影响消息堆积能力
- 性能,消息的发布者需要将消息复制到每一个集群节点
客户端连接的是非队列数据所在节点:则该节点会进行路由转发,包括发送和消费
集群节点类型:
- 磁盘节点:将配置信息和元信息存储在磁盘上。
- 内存节点:将配置信息和元信息存储在内存中。性能优于磁盘节点。依赖磁盘节点进行持久化
RabbitMQ要求集群中至少有一个磁盘节点,当节点加入和离开集群时,必须通知磁盘节点(如果集群中唯一的磁盘节点崩溃了,则不能进行创建队列、创建交换器、创建绑定、添加用户、更改权限、添加和删除集群节点)。如果唯一磁盘的磁盘节点崩溃,集群是可以保持运行的,但不能更改任何东西。因此建议在集群中设置两个磁盘节点,只要一个可以,就能正常操作。