微服务架构之Spring Boot(五十六)

2022-05-23 15:38:03 浏览数 (1)

33.2 AMQP

高级消息队列协议(AMQP)是面向消息的中间件的平台中立的线级协议。Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递

解决方案的开发。Spring Boot为通过RabbitMQ使用AMQP提供了一些便利,包括 spring-boot-starter-amqp “Starter”。

33.2.1 RabbitMQ支持

RabbitMQ是一个基于AMQP协议的轻量级,可靠,可扩展且可移植的消息代理。Spring使用 RabbitMQ 通过AMQP协议进行通信。

RabbitMQ配置由 spring.rabbitmq.* 中的外部配置属性控制。例如,您可以在 application.properties 中声明以下部分:

spring.rabbitmq.host=localhost

spring.rabbitmq.port=5672

spring.rabbitmq.username=admin

spring.rabbitmq.password=secret

如果上下文中存在 ConnectionNameStrategy bean,它将自动用于命名由自动配置的 ConnectionFactory 创建的连接。有

关 RabbitProperties 更多支持的选项,请参阅 。

有关详细信息,请参阅 了解AMQP,RabbitMQ使用的协议。

33.2.2发送消息

Spring的 AmqpTemplate 和 AmqpAdmin 是自动配置的,您可以将它们直接自动装入您自己的beans,如以下示例所示:

import org.springframework.amqp.core.AmqpAdmin;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

@Component

public class MyBean {

private final AmqpAdmin amqpAdmin;

private final AmqpTemplate amqpTemplate;

@Autowired

public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {

this.amqpAdmin = amqpAdmin;

this.amqpTemplate = amqpTemplate;

}

RabbitMessagingTemplate 可以以类似的方式注射。如果定义了 MessageConverter bean,它将自动关联到自动配置

的 AmqpTemplate 。

如有必要,任何定义为bean的 org.springframework.amqp.core.Queue 都将自动用于在RabbitMQ实例上声明相应的队列。

要重试操作,可以在 AmqpTemplate 上启用重试(例如,在代理连接丢失的情况下):

spring.rabbitmq.template.retry.enabled=true

spring.rabbitmq.template.retry.initial-interval=2s

默认情况下禁用重试。您还可以通过声明 RabbitRetryTemplateCustomizer bean以编程方式自定义 RetryTemplate 。

33.2.3接收消息

当Rabbit基础结构存在时,任何bean都可以使用 @RabbitListener 进行注释以创建侦听器端点。如果未定

义 RabbitListenerContainerFactory ,则会自动配置默认值 SimpleRabbitListenerContainerFactory ,您可以使

用 spring.rabbitmq.listener.type 属性切换到直接容器。如果定义了 MessageConverter 或 MessageRecoverer bean,它将自动与默认

工厂关联。

以下示例组件在 someQueue 队列上创建一个侦听器端点:

@Component

public class MyBean {

@RabbitListener(queues = "someQueue")

public void processMessage(String content) {

// ...

}

}

有关详细信息,请参阅 @EnableRabbit 的Javadoc。

如果您需要创建更多 RabbitListenerContainerFactory 个实例,或者如果要覆盖默认值,Spring Boot会提

供 SimpleRabbitListenerContainerFactoryConfigurer 和 DirectRabbitListenerContainerFactoryConfigurer ,您可以使用它来初

始化 SimpleRabbitListenerContainerFactory 和 DirectRabbitListenerContainerFactory 具有与自动配置使用的工厂相同的设置。

您选择的容器类型无关紧要。这两个beans由自动配置公开。

例如,以下配置类公开了另一个使用特定 MessageConverter 的工厂:

@Configuration

static class RabbitConfiguration {

@Bean

public SimpleRabbitListenerContainerFactory myFactory(

SimpleRabbitListenerContainerFactoryConfigurer configurer) {

SimpleRabbitListenerContainerFactory factory =

new SimpleRabbitListenerContainerFactory();

configurer.configure(factory, connectionFactory);

factory.setMessageConverter(myMessageConverter());

return factory;

}

}

然后你可以在任何 @RabbitListener - 注释方法中使用工厂,如下所示:

@Component

public class MyBean {

@RabbitListener(queues = "someQueue", containerFactory="myFactory")

public void processMessage(String content) {

// ...

}

}

您可以启用重试来处理侦听器抛出异常的情况。默认情况下,使用 RejectAndDontRequeueRecoverer ,但您可以定义自己

的 MessageRecoverer 。当重试耗尽时,如果代理配置了这样做,则拒绝该消息并将其丢弃或路由到死信交换。默认情况下,禁用重试。您还可

以通过声明 RabbitRetryTemplateCustomizer bean以编程方式自定义 RetryTemplate 。

重要

默认情况下,如果禁用重试并且侦听器抛出异常,则会无限期地重试传递。您可以通过两种方式修改此行为:

将 defaultRequeueRejected 属性设置为 false ,以便尝试零重新传递或抛出 AmqpRejectAndDontRequeueException 以表示

应拒绝该消息。后者是启用重试并且达到最大传递尝试次数时使用的机制。

0 人点赞