前言
上篇我们说到了消息队列RabbitMQ的模式概念,那么这里将会针对模式使用SpringBoot联合RabbitMQ做一个案例,实现消息的生产和消费。
这一篇也是这个主题的最后一篇了,建议配合着看。助于理解。
博主会将Demo工程放在Gitee上,有兴趣的可以拉下来自己试试。
Gitee地址:https://gitee.com/lemon_ant/os.git
正文
准备工作
新建SpringBoot项目
添加配置文件
代码语言:javascript复制server.port=8080
spring.application.name=cl
#RabbitMq所在服务器IP
spring.rabbitmq.host=127.0.0.1
#连接端口号
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=root
#用户密码
spring.rabbitmq.password=123456
# 开启发送确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.virtual-host=/
添加pom文件
代码语言:javascript复制<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
启动类
代码语言:javascript复制@SpringBootApplication
public class OsApplication {
public static void main(String[] args) {
SpringApplication.run(OsApplication.class, args);
}
}
点对点模式
队列初始化
代码语言:javascript复制//当没有这个队列的时候会自动创建
@Configuration
public class PointInitialization {
@Bean
Queue toPoint(){
Queue queue = new Queue("point.to.point",true);
return queue;
}
}
生产者
代码语言:javascript复制@Component
public class PointProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String name){
String sendMsg = "点对点队列:" name " " new Date();
//指定队列
this.rabbitTemplate.convertAndSend("point.to.point",sendMsg);
}
}
消费者
代码语言:javascript复制@Component
public class PointConsumer {
//监听的队列名
@RabbitListener(queues = "point.to.point")
public void processOne(String name) {
System.out.println("point.to.point:" name);
}
}
测试类(模仿控制层)
代码语言:javascript复制@RestController
@RequestMapping("/Point")
public class PointController {
@Autowired
private PointProducer sayProducer;
@RequestMapping("/point/{name}")
public String send(@PathVariable String name){
sayProducer.send(name);
return "发送成功";
}
}
使用postman模拟请求
控制台结果
work模式
队列初始化
代码语言:javascript复制@Configuration
public class WorkInitialization {
//当没有这个队列的时候会自动创建
@Bean
Queue work(){
Queue queue = new Queue("WorkingMode",true);
return queue;
}
}
生产者
代码语言:javascript复制@Component
public class WorkProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String name){
String sendMsg = "工作模式:" name " " new Date();
//指定队列
this.rabbitTemplate.convertAndSend("WorkingMode",sendMsg);
}
}
消费者
代码语言:javascript复制//三个队列同时监听
@Component
public class WorkConsumer {
@RabbitListener(queues = "WorkingMode")
public void processOne(String name) {
System.out.println("WorkingMode1:" name);
}
@RabbitListener(queues = "WorkingMode")
public void processTwo(String name) {
System.out.println("WorkingMode2:" name);
}
@RabbitListener(queues = "WorkingMode")
public void processThree(String name) {
System.out.println("WorkingMode3:" name);
}
}
测试类(模仿控制层)
代码语言:javascript复制@RestController
@RequestMapping("/work")
public class WorkController {
@Autowired
private WorkProducer sayProducer;
@RequestMapping("/work/{name}")
public String send(@PathVariable String name){
sayProducer.send(name);
return "发送成功";
}
}
使用postman模拟请求
控制台结果
注意看时间,说明消息是轮询分发的,一个消息只由一个消费者消费。
发布/订阅者模式(Publish/Subscribe)
队列初始化
代码语言:javascript复制//类型为fanout
@Configuration
public class PublishInitialization {
//当没有这个队列的时候会自动创建
@Bean
Queue publishOne(){
Queue queue = new Queue("queue.publish.one",true);
return queue;
}
@Bean
Queue publishTwo(){
Queue queue = new Queue("queue.publish.two",true);
return queue;
}
@Bean
Queue publishThree(){
Queue queue = new Queue("queue.publish.three",true);
return queue;
}
//创建交换器
@Bean
FanoutExchange pulishExchange(){
FanoutExchange directExchange = new FanoutExchange("publishExchange");
return directExchange;
}
//绑定队列(不用指定routing key),参数名字要和bean名字一致
@Bean
Binding bindingPublishOne(Queue publishOne,FanoutExchange pulishExchange){
Binding binding = BindingBuilder.bind(publishOne).to(pulishExchange);
return binding;
}
@Bean
Binding bindingPublishTwo(Queue publishTwo,FanoutExchange pulishExchange){
Binding binding = BindingBuilder.bind(publishTwo).to(pulishExchange);
return binding;
}
@Bean
Binding bindingPublishThree(Queue publishThree,FanoutExchange pulishExchange){
Binding binding = BindingBuilder.bind(publishThree).to(pulishExchange);
return binding;
}
}
生产者
代码语言:javascript复制@Component
public class PublishProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String name){
String sendMsg = "发布订阅模式:" name " " new Date();
//指定队列
this.rabbitTemplate.convertAndSend("publishExchange","",sendMsg);
}
}
消费者
代码语言:javascript复制@Component
public class PublishConsumer {
@RabbitListener(queues = "queue.publish.one")
public void processOne(String name) {
System.out.println("queue.publish.one:" name);
}
@RabbitListener(queues = "queue.publish.two")
public void processTwo(String name) {
System.out.println("queue.publish.two:" name);
}
@RabbitListener(queues = "queue.publish.three")
public void processThree(String name) {
System.out.println("queue.publish.three:" name);
}
}
测试类(模仿控制层)
代码语言:javascript复制@RestController
@RequestMapping("/Publish")
public class PublishController {
@Autowired
private PublishProducer sayProducer;
@RequestMapping("/publish/{name}")
public String send(@PathVariable String name){
sayProducer.send(name);
return "发送成功";
}
}
使用postman模拟请求
控制台结果
注意看时间,交换机会将消息推送到所有绑定到它的队列。
路由模式
队列初始化
代码语言:javascript复制//类型为direct
@Configuration
public class RoutingInitialization {
//当没有这个队列的时候会自动创建
@Bean
Queue routingOne(){
Queue queue = new Queue("queue.routing.one",true);
return queue;
}
@Bean
Queue routingTwo(){
Queue queue = new Queue("queue.routing.two",true);
return queue;
}
@Bean
Queue routingThree(){
Queue queue = new Queue("queue.routing.three",true);
return queue;
}
//创建交换器
@Bean
DirectExchange routingExchange(){
DirectExchange directExchange = new DirectExchange("routingExchange");
return directExchange;
}
//绑定队列
@Bean
Binding bindingRoutingOne(Queue routingOne,DirectExchange routingExchange){
Binding binding = BindingBuilder.bind(routingOne).to(routingExchange).with("1");
return binding;
}
@Bean
Binding bindingRoutingTwo(Queue routingTwo,DirectExchange routingExchange){
Binding binding = BindingBuilder.bind(routingTwo).to(routingExchange).with("2");
return binding;
}
@Bean
Binding bindingRoutingThree(Queue routingThree,DirectExchange routingExchange){
Binding binding = BindingBuilder.bind(routingThree).to(routingExchange).with("3");
return binding;
}
}
生产者
代码语言:javascript复制@Component
public class RoutingProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String type){
String sendMsg = "路由模式:" type " " new Date();
//指定队列
if (type.equals("1")){
this.rabbitTemplate.convertAndSend("routingExchange","1",sendMsg);
}
if (type.equals("2")){
this.rabbitTemplate.convertAndSend("routingExchange","2",sendMsg);
}
if (type.equals("3")){
this.rabbitTemplate.convertAndSend("routingExchange","3",sendMsg);
}
}
}
消费者
代码语言:javascript复制@Component
public class RoutingConsumer {
@RabbitListener(queues = "queue.routing.one")
public void processOne(String name) {
System.out.println("queue.routing.one:" name);
}
@RabbitListener(queues = "queue.routing.two")
public void processTwo(String name) {
System.out.println("queue.routing.two:" name);
}
@RabbitListener(queues = "queue.routing.three")
public void processThree(String name) {
System.out.println("queue.routing.three:" name);
}
}
测试类(模仿控制层)
代码语言:javascript复制@RestController
@RequestMapping("/Routing")
public class RoutingController {
@Autowired
private RoutingProducer sayProducer;
@RequestMapping("/routing/{name}")
public String send(@PathVariable String name){
sayProducer.send(name);
return "发送成功";
}
}
使用postman模拟请求
我这里测试传的就是routing key,方便看。
控制台结果
这里用时间来区别。
主题模式(Topic)
队列初始化
代码语言:javascript复制//类型为topic
@Configuration
public class TopicInitialization {
//当没有这个队列的时候会自动创建
@Bean
Queue topicOne(){
Queue queue = new Queue("queue.topic.one",true);
return queue;
}
@Bean
Queue topicTwo(){
Queue queue = new Queue("queue.topic.two",true);
return queue;
}
@Bean
Queue topicThree(){
Queue queue = new Queue("queue.topic.three",true);
return queue;
}
//创建交换器
@Bean
TopicExchange topicExchange(){
TopicExchange directExchange = new TopicExchange("topicExchange");
return directExchange;
}
//绑定队列
@Bean
Binding bindingTopicOne(Queue topicOne,TopicExchange topicExchange){
Binding binding = BindingBuilder.bind(topicOne).to(topicExchange).with("#.error");
return binding;
}
@Bean
Binding bindingTopicTwo(Queue topicTwo,TopicExchange topicExchange){
Binding binding = BindingBuilder.bind(topicTwo).to(topicExchange).with("#.log");
return binding;
}
@Bean
Binding bindingTopicThree(Queue topicThree,TopicExchange topicExchange){
Binding binding = BindingBuilder.bind(topicThree).to(topicExchange).with("good.#.timer");
return binding;
}
}
生产者
代码语言:javascript复制@Component
public class TopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String routing){
String sendMsg = "主题模式:" routing " " new Date();
//指定队列
this.rabbitTemplate.convertAndSend("topicExchange",routing,sendMsg);
}
}
消费者
代码语言:javascript复制@Component
public class TopicConsumer {
@RabbitListener(queues = "queue.topic.one")
public void processOne(String name) {
System.out.println("queue.topic.one:" name);
}
@RabbitListener(queues = "queue.topic.two")
public void processTwo(String name) {
System.out.println("queue.topic.two:" name);
}
@RabbitListener(queues = "queue.topic.three")
public void processThree(String name) {
System.out.println("queue.topic.three:" name);
}
}
测试类(模仿控制层)
代码语言:javascript复制@RestController
@RequestMapping("/Topic")
public class TopicController {
@Autowired
private TopicProducer sayProducer;
@RequestMapping("/topic/{type}")
public String send(@PathVariable String type){
sayProducer.send(type);
return "发送成功";
}
}
请求以及对应结果
注意看请求的key和打印日志的对应关系。
尾言
消息队列在这里基本就结束了,结合前面两篇基本就能够了解队列的基本概念和用法了。