1、Spring Boot 与 RabbitMQ 整合
接着上一篇的 RabbitMQ 的 6 种工作模式,现在开始项目中的实战了(上一篇也挺重要的,用法基本上都在上篇)。
这份整合方案我想是全网最独特的了,而且参考的是最新版本的教程。
因为 Spring 已经为我们封装好了 RabbitMQ 的一些配置,所以我们直接用就好了。
pom包
代码语言:html复制<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.4.5</version>
</dependency>
1.1、配置
我们可以看下 Spring 自动装配为我们做了什么。
我们找到 spring-boot-autoconfigure-2.4.5.jar 这个 jar 包,展开后 amqp 包下的东西就是一些 RabbitMQ 的条件配置。
咳咳,具体有啥东西可以自己去看,我就说一下铺垫后我想说的东西,就是 RabbitProperties 这个类,它扫描了项目配置文件下的 RabbitMQ 配置(spring.rabbitmq),所以一些 RabbitMQ 的连接配置比如地址啊端口之类的,我们可以直接配置在 yml 文件中,项目启动的时候就会自动去扫描并且创建一些 Connection 之类的。
在 yml 配置中我配置了一个全局的手动 ACK。
代码语言:yaml复制spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
然后补充一点就是配置 virtual-host 有什么作用。每个 virtual-host 路径里面所有东西都是隔离的,比如队列啊交换器之列的。
在一些队列和交换器的定义上,因为我觉得在定义这些东西上代码有些重复,所以我就加以改造,只需要定义好相关的队列和交换器枚举,然后在项目启动的时候就会自动创建 Bean 了。
我挑一些范例来讲一下。就比如创建交换器。
代码语言:text复制@Getter
@AllArgsConstructor
public enum ExchangeEnum {
/**
* 交换器列表
*/
DEFAULT_EXCHANGE(BuiltinExchangeType.DIRECT,"default_exchange",1,0,"默认交换器","defaultExchange"),
FANOUT_EXCHANGE(BuiltinExchangeType.FANOUT,"fanout_exchange",1,0,"fanout交换器","fanoutExchange"),
TOPIC_EXCHANGE(BuiltinExchangeType.TOPIC,"topic_exchange",1,0,"topic交换器","topicExchange");
private BuiltinExchangeType exchangeType;
/**
* 交换器名称
*/
private String name;
/**
* 是否持久化
*/
private int durable;
/**
* 是否自动删除
*/
private int autoDelete;
/**
* 描述
*/
private String digest;
/**
* bean名称
*/
private String beanName;
}
public class RabbitMqBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar {
...
/**
* 注册交换器
*
* @param registry BeanDefinitionRegistry
*/
private void registerExchange(BeanDefinitionRegistry registry) {
for (ExchangeEnum value : ExchangeEnum.values()) {
BeanDefinitionBuilder rootBeanDefinition = null;
switch (value.getExchangeType()) {
case FANOUT:
rootBeanDefinition = BeanDefinitionBuilder.rootBeanDefinition(FanoutExchange.class);
break;
case TOPIC:
rootBeanDefinition = BeanDefinitionBuilder.rootBeanDefinition(TopicExchange.class);
break;
default:
rootBeanDefinition = BeanDefinitionBuilder.rootBeanDefinition(DirectExchange.class);
}
rootBeanDefinition.addConstructorArgValue(value.getName());
rootBeanDefinition.addConstructorArgValue(getBooleanValue(value.getDurable()));
rootBeanDefinition.addConstructorArgValue(getBooleanValue(value.getAutoDelete()));
registry.registerBeanDefinition(value.getBeanName(), rootBeanDefinition.getBeanDefinition());
}
}
这里我使用了 Spring 的一个扩展点,ImportBeanDefinitionRegistrar 拿到了 BeanDefinitionRegistry,然后我将需要的 Bean 的定义信息放置到里面。
最关键的地方来了,因为在创建交换器的时候我肯定需要设置一些构造参数啊,但是在之前的用法中创建的都是无参构造器 Bean,就算有参的也是用 Bean 注解去定义的。
为这个问题我绞尽脑汁,后来还是被我点点点给猜出来了,那就是 addConstructorArgValue,看名字就猜出来它的作用了,添加构造器参数。这个方法的用法就是按照你添加参数的顺序去匹配构造器。其他的地方就是差不多的意思了。
代码语言:java复制@Component
public class DirectMqListener {
@RabbitListener(queues = QueueConstant.EMAIL_PUSH_QUEUE)
public void emailLister1(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
System.out.println("监听1:");
System.out.println(message);
channel.basicQos(1);
channel.basicAck(tag, false);
} catch (IOException e) {
e.printStackTrace();
}
}
@RabbitListener(queues = QueueConstant.EMAIL_PUSH_QUEUE)
public void emailLister2(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
System.out.println("监听2:");
System.out.println(message);
channel.basicAck(tag, false);
}
@RabbitListener(queues = QueueConstant.SMS_PUSH_QUEUE)
public void smsLister(String message) {
System.out.println(message);
}
}
以上分别是限制接收消息的数量和多个消费者监听的情况下执行的日志。
我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!