缘起
前面几章我们基本了解了RabbitMQ的基本概念,以及RabbitMQ是如何保证消息的可靠性的,那么本章开始,将真正用java代码去连接使用一些RabbitMQ,通过阅读本章内容,你会明白如何在java springboot的项目中使用RabbitMQ。
阅读人群
项目采用springboot搭建,所以你对springboot需要有一个基本的了解,并且我们假设已经在你的服务器或本机安装了RabbitMQ,所以本章不会涉及关于如何安装RabbitMQ知识。
前置条件
在前面我们说到,对于Exchanges来说,他的路由规则有以下三种
- Direct exchange:完全根据key进行投递的叫做Direct交换机。如果Routing key匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。例如,绑定时设置了Routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
- Fanout exchange:不需要key的叫做Fanout交换机。它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
- Topic exchange:对key进行模式匹配后进行投递的叫做Topic交换机。比如符号”#”匹配一个或多个词,符号””匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”。
项目采用springboot搭建,基础配置文件信息为
代码语言:txt复制spring.application.name=spirng-boot-rabbitmq-sender
spring.rabbitmq.host=192.168.23.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
对于这三种方式,我们的代码实现也会有所不同,我们一一讲解。
Direct exchange
Config
代码语言:txt复制@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue(){
return new Queue("hello");
}
}
producer
代码语言:txt复制@Component
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(){
String context = "hello " new Date();
System.out.println("Sender : " context);
this.rabbitTemplate.convertAndSend("hello", context);
}
}
consumer
代码语言:txt复制@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
@RabbitHandler
public void process(String hello){
System.out.println("Receiver : " hello);
}
}
测试类
代码语言:txt复制@SpringBootTest
public class Test {
@Autowired
private HelloSender helloSender;
@org.junit.jupiter.api.Test
void contextLoads(){
helloSender.send();
}
}
运行结果
Topic exchange
Config
代码语言:txt复制@Configuration
public class TopicRabbitConfig {
final static String message = "topic.message";
final static String messages = "topic.messages";
/**
* 创建队列
* @return
*/
@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}
/**
* 创建队列
* @return
*/
@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}
/**
* 将对列绑定到Topic交换器
* @return
*/
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
/**
* 将queueMessage队列绑定到Topic交换器并监听为topic.message的routingKey
* 也就是说,当你发送消息时的key为topic.message的话,他就会被投递到queueMessage队列中,这个队列名字需要与上面声明的方法名相同
*
* public Queue queueMessage() {
* return new Queue(TopicRabbitConfig.message);
* }
*
* @param queueMessage
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
/**
* 将队列绑定到Topic交换器 采用#的方式,与上述方法类似,只是本处采用通配符。也就是说,所有以topic.开头的消息都会被投递到queueMessages队列中
* @param queueMessages
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
producer
代码语言:txt复制@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " context);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context);
}
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " context);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.ffsda", context);
}
}
Consumer
代码语言:txt复制@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver1 : " message);
}
}
Consumer2
代码语言:txt复制@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver2 : " message);
}
}
测试类
代码语言:txt复制@SpringBootTest
public class Test {
@Autowired
private TopicSender topicSender;
@org.junit.jupiter.api.Test
public void send1() {
topicSender.send1();
}
@org.junit.jupiter.api.Test
public void send2() {
topicSender.send2();
}
}
运行结果
send1()
send1()的key为topic.message,所以queueMessage队列会被放入,queueMessages队列监听的key为通配符topic.#,所以也会被放入,所以消费者两个都会消费这条消息。
send2()
send2()的key为topic.ffsda,由于queueMessage队列监听的key为topic.message,所以不会被放入。queueMessages队列监听的key为通配符topic.#,所以会被放入,所以消费者两只有一个会消费这条消息。
Fanout exchange
略