33.2 AMQP
高级消息队列协议(AMQP)是面向消息的中间件的平台中立的线级协议。Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递
解决方案的开发。Spring Boot为通过RabbitMQ使用AMQP提供了一些便利,包括 spring-boot-starter-amqp “Starter”。
33.2.1 RabbitMQ支持
RabbitMQ是一个基于AMQP协议的轻量级,可靠,可扩展且可移植的消息代理。Spring使用 RabbitMQ 通过AMQP协议进行通信。
RabbitMQ配置由 spring.rabbitmq.* 中的外部配置属性控制。例如,您可以在 application.properties 中声明以下部分:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
如果上下文中存在 ConnectionNameStrategy bean,它将自动用于命名由自动配置的 ConnectionFactory 创建的连接。有
关 RabbitProperties 更多支持的选项,请参阅 。
有关详细信息,请参阅 了解AMQP,RabbitMQ使用的协议。
33.2.2发送消息
Spring的 AmqpTemplate 和 AmqpAdmin 是自动配置的,您可以将它们直接自动装入您自己的beans,如以下示例所示:
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
@Autowired
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
RabbitMessagingTemplate 可以以类似的方式注射。如果定义了 MessageConverter bean,它将自动关联到自动配置
的 AmqpTemplate 。
如有必要,任何定义为bean的 org.springframework.amqp.core.Queue 都将自动用于在RabbitMQ实例上声明相应的队列。
要重试操作,可以在 AmqpTemplate 上启用重试(例如,在代理连接丢失的情况下):
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
默认情况下禁用重试。您还可以通过声明 RabbitRetryTemplateCustomizer bean以编程方式自定义 RetryTemplate 。
33.2.3接收消息
当Rabbit基础结构存在时,任何bean都可以使用 @RabbitListener 进行注释以创建侦听器端点。如果未定
义 RabbitListenerContainerFactory ,则会自动配置默认值 SimpleRabbitListenerContainerFactory ,您可以使
用 spring.rabbitmq.listener.type 属性切换到直接容器。如果定义了 MessageConverter 或 MessageRecoverer bean,它将自动与默认
工厂关联。
以下示例组件在 someQueue 队列上创建一个侦听器端点:
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
有关详细信息,请参阅 @EnableRabbit 的Javadoc。
如果您需要创建更多 RabbitListenerContainerFactory 个实例,或者如果要覆盖默认值,Spring Boot会提
供 SimpleRabbitListenerContainerFactoryConfigurer 和 DirectRabbitListenerContainerFactoryConfigurer ,您可以使用它来初
始化 SimpleRabbitListenerContainerFactory 和 DirectRabbitListenerContainerFactory 具有与自动配置使用的工厂相同的设置。
您选择的容器类型无关紧要。这两个beans由自动配置公开。
例如,以下配置类公开了另一个使用特定 MessageConverter 的工厂:
@Configuration
static class RabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(myMessageConverter());
return factory;
}
}
然后你可以在任何 @RabbitListener - 注释方法中使用工厂,如下所示:
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory="myFactory")
public void processMessage(String content) {
// ...
}
}
您可以启用重试来处理侦听器抛出异常的情况。默认情况下,使用 RejectAndDontRequeueRecoverer ,但您可以定义自己
的 MessageRecoverer 。当重试耗尽时,如果代理配置了这样做,则拒绝该消息并将其丢弃或路由到死信交换。默认情况下,禁用重试。您还可
以通过声明 RabbitRetryTemplateCustomizer bean以编程方式自定义 RetryTemplate 。
重要
默认情况下,如果禁用重试并且侦听器抛出异常,则会无限期地重试传递。您可以通过两种方式修改此行为:
将 defaultRequeueRejected 属性设置为 false ,以便尝试零重新传递或抛出 AmqpRejectAndDontRequeueException 以表示
应拒绝该消息。后者是启用重试并且达到最大传递尝试次数时使用的机制。