SpringBoot整合RabbitMQ
- 环境准备
- 生产者构建
- 消费者构建
环境准备
首先我们需要去创建模块。我们先创建一个生产者的模块。模块pom如下。 主要摘录
代码语言:javascript复制 <parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.5</version>
</parent>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<!-- rabbitmq的依赖-->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-web</artifactId>-->
<!-- </dependency>-->
</dependencies>
引入springboot的父级启动依赖我们就不用担心后面依赖版本的问题了。
我们后面的生产者主要采用了单元测试区启动,所以后面也引入流量单元测试的依赖。
后面的消费者也是这样的依赖。当然maven怎么写能用就行。
生产者构建
首先我们需要一个yml的配置文件,这里面主要写RabbitMQ的连接信息
代码语言:javascript复制spring:
# profiles:
# active: dev
rabbitmq:
host: .... #远程主机外网地址
username: shabi #远程用户名
password: 123456 #密码
virtual-host: shabi #虚拟机名称
port: 5672 #远程主机端口名称
当然前提是远程主机将RabbitMQ都配置好了。本人用docker容器配置的 具体看之前的文章里面有一段简洁的介绍 RabbitMQ初识以及简单模式初步 端口一定要开放,阿里云的防火墙面板也要开放这个端口。
这样做好后。我们利用springboot自动生成的模板来操作。有启动类和测试类。在这之前,我们还需要写一个配置类,主要配置队列啊,绑定,以及路由工作模式等等这些。
具体写法如下。
代码语言:javascript复制package com.jgdabc.rabbitconfig;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
//交换机
public static final String Exchange_Name = "boot_rabbit_topic_ee";
public static final String Queue_Name = "boot_rabbit_topic_qqq";
@Bean("bootExchange") //交换机的创建
public Exchange bootExchange()
{
return ExchangeBuilder.topicExchange(Exchange_Name).durable(true).build(); //绑定一个topic类型的交换机,持久化并构建
}
@Bean("bootQueue") //队列的创建
public Queue bootQueue()
{
return QueueBuilder.durable(Queue_Name).build();
}
// 队列和交换机的绑定关系
// 哪个队列
// 哪个交换机
// routing key
@Bean //这里其实进行一个绑定
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange)
{
//下面指定了和交换机和
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
这个配置类主要做了交换机和队列的创建以及绑定,具体的路由规则我们也指定了。在哪个虚拟机下面我们在yml文件里面进行了指定了。
然后启动类就不说了,一定要注意它的放置位置,涉及一个自动扫描的问题,我们尽量去把它扫描的范围放大,但是如果你再建立一个包把它放到里面可能就扫描不到相关的配置了,那么你就需要自己指定注解去扫描了。 另外如果用到测试的话也是一样的道理。
我们具体的消息发送方法写到测试里面,具体如下。
代码语言:javascript复制package com.jgdabc;
import com.jgdabc.rabbitconfig.RabbitConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class DemoApplicationTests {
// 注入RabbitTemplate
@Autowired
private RabbitTemplate template;
@Test
public void testSend()
{
template.convertAndSend(RabbitConfig.Exchange_Name,"boot.haha","hi");
}
}
然后去启动
但是其实这里testpassed并不意味着你的项目正确运行了。有时候一些资源扫描没有正确加载的话,也是没有问题的,所以其实自己要注意看日志信息。当然看效果就知道了。特别要注意启动类扫描的问题。
这是本次文件层次结构
我这里连续发了几回,如果没有消费者取走,会一直在这里。运行后我们去RabbitMQ Management里面去看。
我测试生成了几回所以会有这些数据。如果你运行成功的话,这里会自动创建出来队列显示,并ready下面有收到传送消息次数的数字。
可以点进这个队列去查看数据
可以看到可视化的绑定关系,里面有我之前绑定创建的。
这样就Ok了。当然还可以看其他的数据。
消费者构建
很简单 配置文件yml基本是一样的,username和密码。不一样是可以的,只要可以验证登录就行 。其他都一样。pom依赖也一样。
我们创建这样一个类用于获取数据消息
代码语言:javascript复制package com.jgdabc.boot_rabbit_consumer;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerSpringbootApplication{
@RabbitListener(queues = "boot_rabbit_topic_qq") //指定要消费消息的队列
public void ListenerQueue(Message message)
{
System.out.println(message);
}
}
然后我们去启动我们的启动类也就是这个
之前我们生产者不是把消息存储到消息队列了们,我们消费者启动,就会把这些消息消费掉。那么队列就会空掉。
启动。这些数据句获取到了。
然后我们去可视化界面看。你看这里的五条消息就没了。
这就是完整的整合入门。