简介
Spring Cloud Stream 是一款基于 Spring Boot 的消息驱动微服务框架,支持多种消息中间件,如 RabbitMQ、Kafka、ActiveMQ 等。它通过抽象出消息通道(channel)和消息绑定(binding),简化了消息的生产者和消费者的开发过程。除了基本的消息通信功能,Spring Cloud Stream 还提供了一些高级特性,如消息分区、消息桥接、消息路由和过滤、消息拦截器等,以满足不同场景下的需求。本文将重点介绍 Spring Cloud Stream 中的消息拦截器。
消息拦截器是一种拦截和处理消息的机制,可以在消息发送和接收的过程中进行拦截和处理。通过消息拦截器,我们可以在消息发送和接收的过程中添加一些通用的处理逻辑,如消息头的添加、消息的日志记录、消息的加解密、消息的校验等。
Spring Cloud Stream 中的消息拦截器
Spring Cloud Stream 中的消息拦截器是通过 Spring AOP 实现的,它提供了一个名为 ChannelInterceptor
的接口,用于定义消息通道的拦截器。在 Spring Cloud Stream 中,我们可以通过配置 BindingService
来注册一个或多个 ChannelInterceptor
,从而实现消息通道的拦截器。
ChannelInterceptor
接口定义了三个方法:
preSend(Message<?> message, MessageChannel channel)
: 在消息发送到通道之前被调用,可以在此修改消息的内容或元数据,也可以在此进行消息的校验、加密等操作。postSend(Message<?> message, MessageChannel channel, boolean sent)
: 在消息发送到通道之后被调用,可以在此记录消息的日志等操作。afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex)
: 在消息发送完成后被调用,可以在此进行异常处理等操作。
除了 ChannelInterceptor
接口外,Spring Cloud Stream 还提供了一个名为 GlobalChannelInterceptor
的接口,它扩展了 ChannelInterceptor
接口,并添加了 order()
方法,用于指定拦截器的顺序。GlobalChannelInterceptor
接口中定义了一个常量 DEFAULT_ORDER
,用于指定默认的顺序。如果没有指定顺序,则使用默认的顺序。
示例
以下是一个使用消息拦截器的示例:
代码语言:javascript复制@SpringBootApplication
@EnableBinding(SampleInterceptor.class)
public class SampleInterceptorApplication {
@Autowired
private SampleInterceptor sampleInterceptor;
public static void main(String[] args) {
SpringApplication.run(SampleInterceptorApplication.class, args);
}
@StreamListener(SampleInterceptor.INPUT)
public void handle(String payload) {
System.out.println("Received: " payload);
}
@Bean
public BindingService bindingService() {
BindingService bindingService = mock(BindingService.class);
given(bindingService.bindProducer(any(), any(), anyBoolean()))
.willReturn(mock(MessageChannel.class));
given(bindingService.bindConsumer(any(), anyString(), any(), anyBoolean()))
.willReturn(mock(Binding.class));
return bindingService;
}
@Bean
public GlobalChannelInterceptor globalChannelInterceptor() {
return new SampleGlobalChannelInterceptor();
}
public static class SampleGlobalChannelInterceptor implements GlobalChannelInterceptor {
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
System.out.println("postSend: " message);
}
}
}
在上面的示例中,我们定义了一个名为 SampleInterceptor
的消息通道,其中定义了一个名为 input
的输入通道和一个名为 output
的输出通道。我们还定义了一个名为 SampleGlobalChannelInterceptor
的全局消息拦截器,用于在消息发送之后记录消息的日志。
在 SampleGlobalChannelInterceptor
中,我们实现了 postSend
方法,并在其中打印了消息的内容。
为了启用消息拦截器,我们需要在 SampleInterceptorApplication
类上添加 @EnableBinding
注解,并指定使用 SampleInterceptor
类。在 SampleInterceptorApplication
中,我们还定义了一个名为 handle
的方法,用于处理从 input
通道接收到的消息。在这个方法中,我们只是简单地将消息的内容打印到控制台上。
在 SampleInterceptorApplication
中,我们还定义了一个名为 bindingService
的方法,用于创建一个 BindingService
的 Mock 对象,并通过 Mockito 的 given
和 willReturn
方法模拟了消息通道的绑定。通过这种方式,我们可以在测试中使用 Mock 对象来模拟消息通道的行为,从而方便地进行单元测试。