RabbitMQ,也称为开源消息代理,支持多种消息协议,可以部署在分布式系统上。它非常轻巧,可以轻松部署应用程序。它主要作为一个队列,首先可以对输入的消息进行操作。RabbitMQ可在许多操作系统和云环境中运行,并为大多数流行语言提供各种开发人员工具。它是生产者 - 消费者风格模式,生产者发送消息,消费者使用它。RabbitMQ的主要功能如下:
- 异步消息
- 分布式部署
- 管理和监督
- 企业级和云就绪型
安装
对于RabbitMQ,您首先需要在系统中安装ErLang,因为RabbitMQ程序是用ErLang编程语言编写的。在ErLang之后,您可以按照其中的说明从其主页下载最新版本的RabbitMQ。
在微服务中使用RabbitMQ
RabbitMQ是在微服务架构中实现消息队列的最简单的免费选项之一。这些队列模式可以通过在各种微服务之间进行通信来帮助扩展应用程序。我们可以将这些队列用于各种目的,例如核心微服务之间的交互,微服务的分离,实现故障转移机制以及通过消息代理发送电子邮件通知。
在两个或多个核心模块需要相互通信的地方,我们不应该进行直接的HTTP调用,因为它们可以使核心层紧密耦合,并且当每个核心模块有更多实例时很难管理。此外,每当服务关闭时,HTTP调用模式将失败,因为重新启动后,无法跟踪旧的HTTP请求调用。这导致需要RabbitMQ。
在微服务中设置RabbitMQ
在微服务架构中,对于此演示,我们将使用通过各种核心微服务发送电子邮件通知的示例模式。在这种模式中,我们将有一个生产者,任何核心微服务,它将生成电子邮件内容并将其传递给队列。然后,这个电子邮件内容由消费者使用,消费者总是在队列中收听新消息。
请注意,我们使用Spring Boot作为我们的微服务,因此我们将为Spring提供配置。
1)生产者: 该层负责生成电子邮件内容并将此内容传递给RabbitMQ中的消息代理。
a)在属性文件中,我们需要提及队列名称和交换类型以及安装RabbitMQ服务器的主机和端口。
代码语言:javascript复制queue.name=messagequeue
fanout.exchange=messagequeue-exchange
spring.rabbitmq.host: localhost
spring.rabbitmq.port: 5672
spring.rabbitmq.username: guest
spring.rabbitmq.password: guest
b)我们需要创建一个配置类,它将使用队列名称和交换类型将队列绑定到微服务模块。
代码语言:javascript复制@Configuration
public class RabbitConfiguration {
@Value("${fanout.exchange}")
private String fanoutExchange;
@Value("${queue.name}")
private String queueName;
@Bean
Queue queue() {
return new Queue(queueName, true);
}
@Bean
FanoutExchange exchange() {
return new FanoutExchange(fanoutExchange);
}
@Bean
Binding binding(Queue queue, FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
}
c)最后,我们需要一个util类,它将用于使用Spring框架提供的RabbitTemplate将实际的电子邮件内容发送到队列。
代码语言:javascript复制@Component
public class QueueProducer {
protected Logger logger = LoggerFactory.getLogger(getClass());
@Value("${fanout.exchange}")
private String fanoutExchange;
private final RabbitTemplate rabbitTemplate;
@Autowired
public QueueProducer(RabbitTemplate rabbitTemplate) {
super();
this.rabbitTemplate = rabbitTemplate;
}
public void produce(NotificationRequestDTO notificationDTO) throws Exception {
logger.info("Storing notification...");
rabbitTemplate.setExchange(fanoutExchange);
rabbitTemplate.convertAndSend(new ObjectMapper().writeValueAsString(notificationDTO));
logger.info("Notification stored in queue sucessfully");
}
}
d)然后,您可以从模块中的任何位置调用Produ方法。
代码语言:javascript复制{
queueProducer.produce(notificationDTO);
}
2)消费者: 该层负责使用FIFO方法从RabbitMQ消息代理消费消息,然后执行与电子邮件相关的操作。
a)在属性文件中,我们需要提到队列名称和交换类型,以及安装RabbitMQ服务器的主机和端口。
代码语言:javascript复制queue.name=messagequeue
fanout.exchange=messagequeue-exchange
spring.rabbitmq.host: localhost
spring.rabbitmq.port: 5672
spring.rabbitmq.username: guest
spring.rabbitmq.password: guest
b)我们需要创建一个配置类,它将使用队列名称和交换类型将队列绑定到微服务模块。此外,在消费者的RabbitMQ配置中,我们需要创建一个 MessageListenerAdapter
bean,它将使其充当使用者并始终在队列管道中侦听传入消息。这个MessageListenerAdapter
将有一个带有Consumer util类和defaultListenerMethod
的参数化构造函数 ,我们可以在其中指定与电子邮件相关的操作。
@Configuration
public class RabbitConfiguration {
private static final String LISTENER_METHOD = "receiveMessage";
@Value("${queue.name}")
private String queueName;
@Value("${fanout.exchange}")
private String fanoutExchange;
@Bean
Queue queue() {
return new Queue(queueName, true);
}
@Bean
FanoutExchange exchange() {
return new FanoutExchange(fanoutExchange);
}
@Bean
Binding binding(Queue queue, FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(QueueConsumer consumer) {
return new MessageListenerAdapter(consumer, LISTENER_METHOD);
}
}
c)然后,我们需要创建一个具有指定消息监听器方法的类QueueConsumer
,我们可以在其中进行实际的电子邮件发送操作。
@Component
public class QueueConsumer {
@Autowired
MailServiceImpl mailServiceImpl;
protected Logger logger = LoggerFactory.getLogger(getClass());
public void receiveMessage(String message) {
logger.info("Received (String) " message);
processMessage(message);
}
public void receiveMessage(byte[] message) {
String strMessage = new String(message);
logger.info("Received (No String) " strMessage);
processMessage(strMessage);
}
private void processMessage(String message) {
try {
MailDTO mailDTO = new ObjectMapper().readValue(message, MailDTO.class);
ValidationUtil.validateMailDTO(mailDTO);
mailServiceImpl.sendMail(mailDTO, null);
} catch (JsonParseException e) {
logger.warn("Bad JSON in message: " message);
} catch (JsonMappingException e) {
logger.warn("cannot map JSON to NotificationRequest: " message);
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}
结论
使用RabbitMQ,您可以避免服务之间的直接HTTP调用,并消除核心微服务的紧密耦合。这将帮助您在更高级别扩展微服务,并在微服务之间添加故障转移机制。
原文标题《RabbitMQ in Microservices》
作者:Akash Bhingole
译者:February
不代表云加社区观点,更多详情请查看原文链接