Spring Boot是很优秀的框架,它的出现简化了新Spring应用的初始搭建以及开发过程,大大减少了代码量,目前已被大多数企业认可和使用。这个专栏将对Spring Boot框架从浅入深,从实战到进阶,不但我们要懂得如何去使用,还要去剖析框架源码,学习其优秀的设计思想。
汇总目录链接:【Spring Boot实战与进阶】学习目录
文章目录- 一、简介
- 二、集成RabbitMQ的简单例子
- 1、引入依赖
- 2、配置RabbitMQ连接信息
- 3、创建RabbitMQ配置类和消息队列
- 4、创建生产者
- 5、创建消费者
- 6、创建测试类
- 7、运行项目
- 三、项目实战
- 1、引入依赖
- 2、配置RabbitMQ连接信息
- 3、创建RabbitMQ配置类和消息队列
- 4、MsgConsumerListener(消费者)
- 5、TestController(消息生产和推送的接口)
- 6、调用接口推送消息
- 1、引入依赖
- 2、配置RabbitMQ连接信息
- 3、创建RabbitMQ配置类和消息队列
- 4、创建生产者
- 5、创建消费者
- 6、创建测试类
- 7、运行项目
- 1、引入依赖
- 2、配置RabbitMQ连接信息
- 3、创建RabbitMQ配置类和消息队列
- 4、MsgConsumerListener(消费者)
- 5、TestController(消息生产和推送的接口)
- 6、调用接口推送消息
一、简介
RabbitMQ是采用 Erlang语言实现AMQP协议的消息中间件,AMQP全称是 Advanced Message Queue Protocolo,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
常用的交换机有以下三种:
1、Direct Exchange(直连型交换机)
根据消息携带的路由键(routing key)将消息投递给对应队列,direct exchange 适用于消息的单播发送。 工作流程如下:
- 将一个队列绑定到某个交换机上,同时赋予该绑定 一个 route key。
- 当一个携带 route key为R 的消息被发送到 direct exchange 时,exchange 会将消息路由到 绑定值同样为 R 的队列。注意Route Key和绑定值要完全匹配才行。 direct exchange 经常用于在 多个 worker 中分配任务,当这样做时,需注意,在AMQP 0-9-1中,消息的负载均衡发生在 consumer之间,而不是在 queue之间。
2、Fanout Exchange(扇型交换机) 一个 fanout exchange 会将消息分发给所有绑定到此 exchange 的queue中,不管 queue中的 route key。如果有 N 个 Queue 绑定到 一个 fanout exchange 时,那么此时 exchange 收到消息时,会将此消息分发到 这 N 个 queue中,由于此性质, fanout exchange 也常用消息的广播。
3、Topic Exchange(主题交换机)
topic exchange 会根据 route key 将消息分发到与此消息的 route key 相匹配的并且绑定此exchange的一个或多个 queue。这里的相匹配与 direct exchange的完全匹配的路由规则不一样,topic exchange 在匹配规则上进行了扩展,规则如下:
- RoutingKey(路由键)为一个点号 “.” 分隔的字符串,如 “com.rabbitmq.client”、“java.util.concurrent”、"com.hidden.client"等
- BindingKey(绑定键) 和 RoutingKey一样也是点号 “.” 分隔的字符串
- BindingKey (绑定键) 中可以存在两种 特殊字符串 “*” 和 “#” ,用于做模糊匹配,其中 " # " 用于匹配一个单词," * "用于匹配多个单词
topic exchange 经常用于实现 publish/subscribe模型,即消息的多播模型。这里的Topic Exchange就适用于发布/订阅模型。RabbitMQ的一个原则就是,消息不能直接投递到 Queue中,必须先将消息投递到 Exchange中,然后由Exchange 按照路由规则将消息投递到对应的 Queue中。
二、集成RabbitMQ的简单例子
1、引入依赖
代码语言:javascript复制<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、配置RabbitMQ连接信息
代码语言:javascript复制spring:
rabbitmq:
host: 127.0.0.1
port: 5672
password: guest
username: guest
3、创建RabbitMQ配置类和消息队列
代码语言:javascript复制@Configuration
public class RabbitConfig {
@Bean
public Queue getQueue() {
return new Queue("QA");
}
}
4、创建生产者
代码语言:javascript复制@Component
public class MsgProducer {
@Resource
private AmqpTemplate rabbitTemplate;
public void send() {
rabbitTemplate.convertAndSend("QA", "这是一条最新消息");
}
}
5、创建消费者
代码语言:javascript复制@Component
@RabbitListener(queues = "QA")
public class MsgConsumer1 {
@RabbitHandler
public void process(String msg) {
System.out.println("【消费者接收的消息】: " msg);
}
}
6、创建测试类
代码语言:javascript复制@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Resource
private MsgProducer msgProducer;
// 发送单条消息
@Test
public void contextLoads() {
msgProducer.send();
}
}
7、运行项目
控制台输出:
代码语言:javascript复制【消费者接收的消息】: 这是一条最新消息
RabbitMQ后台管理端:
三、项目实战
1、引入依赖
代码语言:javascript复制<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、配置RabbitMQ连接信息
代码语言:javascript复制spring:
rabbitmq:
host: 127.0.0.1
port: 5672
password: guest
username: guest
mq:
email:
queue-name: email-queue
exchange-name: email-exchange
routing-key-name: email-routing-key
3、创建RabbitMQ配置类和消息队列
代码语言:javascript复制@Configuration
@Slf4j
public class RabbitMQConfig {
@Resource
private CachingConnectionFactory connectionFactory;
@Value("${mq.email.queue-name}")
private String queueName;
@Value("${mq.email.exchange-name}")
private String exchangeName;
@Value("${mq.email.routing-key-name}")
private String routingKeyName;
/**
* 单一消费者
*
* @return
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
factory.setTxSize(1);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功: correlationData({}), ack({}), cause({})", correlationData, ack, cause));
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失: exchange({}), route({}), replyCode({}), replyText({}), message:{}", exchange, routingKey, replyCode, replyText, message));
return rabbitTemplate;
}
@Bean
public Queue emailQueue() {
return new Queue(queueName, true);
}
@Bean
public DirectExchange emailExchange() {
return new DirectExchange(exchangeName, true, false);
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(routingKeyName);
}
}
4、MsgConsumerListener(消费者)
代码语言:javascript复制@Component
public class MsgConsumerListener {
@RabbitListener(queues = "${mq.email.queue-name}", containerFactory = "singleListenerContainer")
public void process(String msg) {
System.out.println("【消费者接收的消息】: " msg);
}
}
5、TestController(消息生产和推送的接口)
代码语言:javascript复制@RestController
public class TestController {
@Value("${mq.email.exchange-name}")
private String exchangeName;
@Value("${mq.email.routing-key-name}")
private String routingKeyName;
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/test")
public void test() {
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(exchangeName);
rabbitTemplate.setRoutingKey(routingKeyName);
rabbitTemplate.convertAndSend("我是队列里的一条消息");
}
}
6、调用接口推送消息
调用接口http://localhost:8080/test
控制台输出:
代码语言:javascript复制消息发送成功: correlationData(null), ack(true), cause(null)
【消费者接收的消息】: 我是队列里的一条消息