文章目录
1. RabbitMQ【消息中间件】
1.1. 介绍
1.2. 安装
1.3. 四种类型的交换器(exchange)
1.3.1. direct(点对点、单播,直连)
1.3.2. fanout(扇形,广播模式,订阅模式)
1.3.3. topic(主题)
1.3.4. header(头,首部)
1.3.5. 交换机属性
1.4. Queue【队列】
1.5. springBoot整合RabbitMQ
1.5.1. 入门
1.5.2. RabbitTemplate
1.5.2.1. 方法
1.5.3. RabbitAdmin
1.5.4. 消息监听
1.5.4.1. @RabbitListener
1.5.4.2. @RabbitHandler
1.6. 消息确认(SpringBoot整合)
1.6.1. confirm模式
1.6.1.1. 发送消息的确认
1.6.1.2. 消费消息的确认
1.7. 参考文章
RabbitMQ【消息中间件】
介绍
- RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
- AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
- RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
安装
docker pull rabbitmq:3.7-management
docker run --name rabbitmq -p 15672:15672 -p 5672:5672 -d df80af9ca0c9
- 安装运行成功之后访问:
http://[ip]:15672
即可登录
四种类型的交换器(exchange)
- https://baijiahao.baidu.com/s?id=1577456875919174629&wfr=spider&for=pc
direct(点对点、单播,直连)
- 直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的
fanout(扇形,广播模式,订阅模式)
- 扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列。不同于直连交换机,路由键在此类型上不启任务作用。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的发送给这所有的N个队列
- 路由键对这个交换机 不起作用,只要发送给扇形交换机的消息,那么都会发送给和其绑定的所有队列
topic(主题)
- 直连交换机的
routing_key
方案非常简单,如果我们希望一条消息发送给多个队列,那么这个交换机需要绑定上非常多的routing_key
,假设每个交换机上都绑定一堆的routing_key
连接到各个队列上。那么消息的管理就会异常地困难。 - 所以
RabbitMQ
提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的routing_key
,主题交换机会根据这个规则将数据发送到对应的(多个)队列上。 主题交换机的routing_key
需要有一定的规则,交换机和队列的binding_key
需要采用*.#.*.....
的格式,每个部分用.
分开,其中:*
表示一个单词rabbit.*
能够匹配到rabbit.new
rabbit.*
不能够匹配到rabbit.new.old
#
表示任意数量(零个或多个)单词。rabbit.#
能够匹配到rabbit.new
rabbit.#
能够匹配到rabbit.new.old
- 假设有一条消息的
routing_key
为fast.rabbit.white
,那么带有这样binding_key
的几个队列都会接收这条消息
header(头,首部)
- 类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
- 此交换机有个重要参数:”x-match”
- 当”x-match”为“any”时,消息头的任意一个值被匹配就可以满足条件
交换机属性
- 除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:
Name
:交换机名称Durability
:是否持久化。如果持久性,则RabbitMQ重启后,交换机还存在Auto-delete
:当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它
Queue【队列】
- 基本的属性如下:
name
:名称durable
:是否持久化,如果不持久化,那么重启后将会不存在exclusive
:独享(只被一个连接(connection)使用,而且当连接关闭后队列即被删除)autoDelete
:自动删除,当最后一个消费者退订后即被删除arguments
:其他
springBoot整合RabbitMQ
入门
- 添加依赖
<!-- rabbitmq启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置连接信息
spring.rabbitmq.host=192.168.0.86 ## 主机地址
spring.rabbitmq.port=5672 ## 端口
spring.rabbitmq.username=admin ## 用户名
spring.rabbitmq.password=123456 ## 密码
spring.rabbitmq.virtual-host=/ ## 虚拟主机,这里的用户名和密码一定要对这个虚拟主机有权限
- 配置一个Topic交换机和对应的队列,配置类如下,会自动创建
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Topic交换机的配置类
* 1、配置完成之后,当使用到的时候会自动创建,不需要手动的创建,当然使用rabbitAdmin也是可以手动创建的
*/
@Configuration //指定这是一个配置类
public class TopicConfig {
/**
* 创建队列 queue1
* @return
*/
@Bean
public Queue queue1(){
return new Queue("queue_1",true);
}
/**
* 创建队列 queue2
* @return
*/
@Bean
public Queue queue2(){
//指定名称和持久化
return new Queue("queue_2",true);
}
/**
* 创建topic交换机
*/
@Bean
public TopicExchange topic1(){
return new TopicExchange("topic_1");
}
/**
* 将交换机topic1和队列queue1通过路邮键message_1绑定在一起
* @param topic1 交换机1 ,这里通过名称匹配,因为是通过@Bean自动注入的
* @param queue1 队列1 这里通过名称匹配,因为是通过@Bean自动注入的
* @return
*/
@Bean
public Binding bindTopic1AndQueu1(TopicExchange topic1,Queue queue1 ){
return BindingBuilder.bind(queue1).to(topic1).with("message_1");
}
/**
* 将交换机topic1和队列queue2通过路邮键message_2绑定在一起
* @param topic1
* @param queue1
* @return
*/
@Bean
public Binding bindTopic1AndQueu2(TopicExchange topic1,Queue queue2 ){
return BindingBuilder.bind(queue2).to(topic1).with("message_2");
}
}
- 启动类添加注解
@EnableRabbit
@EnableRabbit //开启rabbitmq
@SpringBootApplication
public class DemoApplication extends SpringBootServletInitializer {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
//继承SpringBootServletInitializer实现war包的发布
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(DemoApplication.class);
}
}
- 发送消息和接受消息
@Service
public class RabbitServiceImpl implements RabbitServie {
@Resource
private RabbitTemplate rabbitTemplate发送消息;
//使用rabbitTemplate发送消息
@Override
public void send() {
Map<String, Object> map=new HashedMap();
map.put("name", "陈加兵");
rabbitTemplate.convertAndSend("topic_1", "message_1", map);
}
//使用rabbitTemplate接收消息
@Override
public void get() {
Map<String, Object> map=(Map<String, Object>) rabbitTemplate.receiveAndConvert("queue_1");
System.out.println(map);
}
}
RabbitTemplate
- springBoot自动注入,直接使用即可
- 实体类发送消息之前一定需要序列化
- 用于发送和接收消息
方法
void convertAndSend(String exchange, String routingKey, final Object object)
:发送消息exchange
:交换机routingKey
:路由键object
:需要发送的对象
Object receiveAndConvert(String queueName)
:接收指定队列的消息queueName
:消息队列的名字
RabbitAdmin
- springBoot已经为我们自动注入了AmqpAdmin,用于创建交换机、队列、绑定
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
@ConditionalOnMissingBean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
}
- 测试类如下:
@RunWith(SpringRunner.class)
@SpringBootTest // springBoot测试类,可以自定义测试类,不过需要引用这两个注解
public class RabbitMqTest {
@Resource
private AmqpAdmin amqpAdmin; //自动注入即可
@Test
public void test4() {
DirectExchange directExchange = new DirectExchange("test_direct");
Queue queue = new Queue("direct_1", true);
// 创建一个直连的交换机
amqpAdmin.declareExchange(directExchange);
// 创建一个队列
amqpAdmin.declareQueue(queue);
//创建绑定关系
amqpAdmin.declareBinding(BindingBuilder.bind(queue)
.to(directExchange).with("direct_message"));
}
}
消息监听
- 消息监听使用的注解是
@RabbitListener
,可以监听指定的队列,一旦这个队列中有消息了,那么就会执行 - 在启动类上添加
@EnableRabbit
开启基于注解的rabbit的消息监听
@RabbitListener
- 消息监听的注解,可以监听一个或者多个队列,一旦队列中有了信息,那么就会执行,一旦被执行就意味着这条消息被消费了(不一定,后面会讲到消息确认机制,这里是默认会被消费的)
/**
* rabbitmq的消息处理类
* @author 陈加兵
*/
@Component //注入
public class MessageHandler {
/**
* 使用@RabbitListener这个注解监听指定的队列,一旦这个队列有了消息,那么将会执行
* @param log 消息的内容,如果接收的消息内容是log对象,那么将会被反序列化,存入这个log中
* 消息一旦被监听到了并且被执行了,那么这条队列的消息将会被删除了
*/
@RabbitListener(queues={"direct_1"})
public void received(Log log){
System.out.println("------接收到消息----");
System.out.println("消息内容为:" log);
}
/**
* 使用org.springframework.amqp.core.Message对象来接收消息,可以显示消息头一些信息
* @param message
*/
@RabbitListener(queues={"direct_1"})
public void received1(Message message){
System.out.println("------接收到消息1----");
byte[] body=message.getBody();
System.out.println(message.getMessageProperties());
}
}
@RabbitHandler
@RabbitListener
可以标注在类上面,需配合 @RabbitHandler 注解一起使用@RabbitListener
标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型
/**
* 处理消息的类,使用@RabbitListener监听队列,结合@RabbitHandler处理不同内容类型的消息
* @author Administrator
*
*/
@RabbitListener(queues={"direct_1"}) //监听direct_1这个队列的消息
@Component //注入
public class RabbitMessage {
/**
* 定义处理的方法是接收内容的类型为Log类型
* @param log
*/
@RabbitHandler
public void receivedLog(Log log){
System.out.println("接收了log对象");
System.out.println(log);
}
/**
* 定义接收内容为User类型的消息
* @param user
*/
@RabbitHandler
public void receivedMap(User user){
System.out.println("接收了user对象");
System.out.println(user);
}
}
消息确认(SpringBoot整合)
- 消息确认可以分为事务模式(类似jdbc的操作),confirm模式(可以使用异步回调模式,更加高效)
- rabbitmq默认是自动确认的,即是一条消息被发送了或者被消费了,无论你生产者或者消费者有没有发送或者消费成功,那么都是自动确认为已发送或者已接收了,但是在业务中接收了一条消息不一定就是成功消费了,如果这个业务没有正常完成,我们希望的是能够消息回滚,就像是mysql的事务机制,因此此时我们就需要手动确认这条消息被消费了,而不是自动确认
- 消息确认可以分为事务模式(类似jdbc的操作),confirm模式,具体的可以参考https://blog.csdn.net/u013256816/article/details/55515234
confirm模式
- confirm不同于事务模式的地方是可以使用异步的确认模式
- 在配置文件中配置,如下:
# 开启发送确认
spring.rabbitmq.publisher-confirms=true
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
# 开启ACK,开启之后只有手动提交才会消费消息
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
发送消息的确认
ConfirmCallback
: 这个是rabbitmq的确认的回调接口,当消息发送之后,会异步调用实现这个接口的方法ReturnCallback
:这个是rabbitmq的失败回调接口,当消息发送失败之后,会异步调用实现这个接口的方法- 一个消息的发送的业务类如下:
/**
* 消息发送的业务层
* SendMessageService : 发送消息的接口
* ConfirmCallback : 消息发送成功的回调接口
* ReturnCallback : 消息发送失败的回调接口(找不到对应的路由或者因为各种原因消息没有成功投递到rabbitmq中都会出发回调)
* @author 陈加兵
* @since 2018年11月15日 下午4:45:37
*/
@Service
public class SendMessageServiceImpl implements SendMessageService,ConfirmCallback,ReturnCallback {
@Resource
private RabbitTemplate rabbitTemplate; //注入rabbitMq的template,用于发送和消费消息
private Logger logger=LoggerFactory.getLogger(SendMessageServiceImpl.class); //日志
/**
* 消息发送失败的回调方法,实现ReturnCallback接口的方法
* 1、消息没有投递成功,包括没有找到对应的队列或者路由键
*/
@Override
public void returnedMessage(Message message, int replyCode,
String replyText, String exchange, String routingKey) {
logger.info("返回的失败代码=" replyCode " 返回的失败信息=" replyText);
logger.info("交换机=" exchange " 绑定的路由键=" routingKey);
}
/**
* 消息发送确认的回调方法
* 如果消息没有到exchange,则confirm回调,ack=false
* 如果消息到达exchange,则confirm回调,ack=true
* 判断消息有没有成功发送,只需要判断ack的值,correlationData是发送消息的时候传递过来的值(String)
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack,
String cause) {
//如果ack==true,表示消息发送成功
if (ack) {
logger.info("消息发送成功,下面开始处理业务。。。。。。。。。。。。。。。");
logger.info("correlationData=" correlationData);
}else {
logger.info("消息发送失败。。。。。。。。。。。。。。。。");
logger.info("cause=" cause);
}
}
/**
* 发送消息的方法
*/
@Override
public void sendMessage(Log log) throws Exception {
rabbitTemplate.setConfirmCallback(this); //设置
rabbitTemplate.setReturnCallback(this);
CorrelationData data=new CorrelationData();
data.setId("success"); //定义内容,在消息发送成功的回调方法中可以获取这个值
rabbitTemplate.convertAndSend("amq.direct", "message", log,data); //发送消息
}
}
消费消息的确认
- 开启ack之后,默认是不会自动消费的,只有手动ack才会被消费
- 手动ack和nack使用的类是
com.rabbitmq.client.Channel
channel.basicAck()
:手动ackdeliveryTag
:该消息的indexmultiple
:是否批量,如果为true
将一次性ack所有小于deliveryTag的消息,如果为false
,那么将ack当前的消息
channel.basicNack(deliveryTag, multiple, requeue)
deliveryTag
:该消息的indexmultiple
:是否批量,如果为true
将一次性ack所有小于deliveryTag的消息,如果为false
,那么将ack当前的消息requeue
:被丢弃消息是否重新进入队列,如果是true将会重新进入队列
- 实例如下
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import com.techwells.demo.domain.Log;
/**
* 监听队列direct_1
* @author 陈加兵
* @since 2018年11月15日 下午6:55:56
*/
@Component
@RabbitListener(queues={"direct_1"})
public class ReceivedMessageHandler {
private Logger logger=LoggerFactory.getLogger(ReceivedMessageHandler.class);
/**
* 接收消息类型为Log的消息
* 如果不手动提交的话,默认是不会被自动确认消费的,只有手动提交了,才会被真正的消费
* @param log 消息的实体类
* @param channel
* @param message rabbitmq的Message类
* @throws IOException
*/
@RabbitHandler //处理消息
public void handleMessage(Log log,Channel channel,Message message) throws IOException{
logger.info("成功接收到消息........" log);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //手动提交ack,消费消息
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
logger.info("成功被消费。。。。。。。。。。");
}
/**
* 处理消息类型为String类型的消息
* @param str
* @param channel
* @param message
* @throws IOException
*/
@RabbitHandler //处理消息
public void handleStringMessage(String str,Channel channel,Message message) throws IOException{
logger.info("成功接收到消息........" str);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //nack,不消费这条消息,一般是业务失败才会不消费
}
}
参考文章
1、https://www.kancloud.cn/yunxifd/rabbitmq/96997
2、中文文档
3、https://www.cnblogs.com/ityouknow/p/6120544.html
4、事务
5、ACK