1.1 简介
1.1.1 概述
在一个系统中我们可能包含前端页面、接口服务、大数据层,可能在接口服务中使用的是 RabbitMQ 而在大数据层中使用的是 Kafka,那么我只会 RabbitMQ 不会 Kafka 岂不是还要去学习,白天 996 晚上 007 简直要命。那么有没有一个像 JDBC 一样的能够屏蔽细节让我们可以迅速切换。 Spring Cloud Stream 是一个构建消息驱动微服务应用的框架。它基于 Spring Boot 构建独立的、生产级的 Spring 应用,并使用 Spring Integration 为消息代理提供链接。应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中 binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。 Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前只实现了 Kafka 和 RabbitMQ 的 Binder。
1.1.2 设计思想
在没有 binder(绑定器) 这个概念的情况下,我们的 Spring Boot 应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候 Spring Cloud Stream 提供了一种解耦合的方式。通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。Stream 对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(RabbitMQ 切换为 Kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。Spring Cloud Stream 遵循发布-订阅模式(在 RabbitMQ 就是 Exchange,在 Kakfa 中就是Topic),INPUT 对应于消费者,OUTPUT 对应于生产者。
Binder
:绑定器,Spring Cloud 提供了 Binder 抽象接口以及 KafKa 和 Rabbit MQ 的 Binder 的实现,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件,使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。Channel
:通道,是队列 Queue 的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过 Channel 对队列进行配置。Source
:Source 是一个接口,该接口是 Spring Cloud Stream 中默认实现的对输出消息通道绑定的定义。Sink
:Sink 是一个接口,该接口是 Spring Cloud Stream 中默认实现的对输入消息通道绑定的定义。
1.1.3 相关依赖
代码语言:javascript复制<!-- 集成 RabbitMQ -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!-- 集成 Kafka -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
1.1.4 常用注解
注解 | 描述 |
---|---|
@Input | 注解标识输入通道,通过该输入通道收到的信息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列消息接收 |
@EnableBinding | 指信道 chennel 和 exchange 绑定在一起 |
1.2 消息生产者
1.2.1 配置文件
代码语言:javascript复制server:
port: 8081
spring:
application:
name: cloud-stream-provider
cloud:
stream:
# 在此处配置要绑定的 RabbitMQ 的服务信息
binders:
# 表示定义的名称,用于 binding 的整合
defaultRabbit:
# 消息中间件类型
type: rabbit
# 设置 RabbitMQ 的相关环境配置
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
# 服务的整合处理
bindings:
# 通道的名称
output:
# 表示要使用的 exchange 名称定义
destination: myExchange
# 设置消息类型,本次为 json,文本则设为 text/plain
content-type: application/json
# 设置要绑定的消息服务的具体设置, 可能会报红但是没影响
binder: defaultRabbit
1.2.2 启动类
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/11/17
* @description
*/
@SpringBootApplication
public class ProviderApplication {
public static void main(String[] args) {
SpringApplication.run(ProviderApplication.class, args);
}
}
1.2.3 生产者
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/11/17
* @description service 接口
*/
public interface SendMessage {
public String send();
}
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/11/17
* @description service 实现类
*/
// @EnableBinding 注解实现对消息通道的绑定
@EnableBinding(Source.class)
public class SendMessageImpl implements SendMessage {
@Autowired
private MessageChannel output;
@Override
public String send() {
String str = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(str).build());
System.out.println("我发出了:" str);
return null;
}
}
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/11/17
* @description controller
*/
@RestController
@RequestMapping("/provider")
public class ProviderController {
@Autowired
private SendMessage sendMessage;
@GetMapping("/send")
public Object send() {
String send = sendMessage.send();
System.out.println(send);
return null;
}
}
1.2.4 启动
启动服务之后,可以在 Rabbit MQ 控制台找到一个名为 myExchange 的 Exchange,需要注意的是 Spring Cloud Stream 遵循发布-订阅模式,只要订阅了 myExchange 的就会收到消息【通过分组解决】。
1.3 消息消费者
1.3.1 配置文件
代码语言:javascript复制server:
port: 8901
spring:
application:
name: cloud-stream-consumer_01
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
bindings:
# 通道名称
input:
destination: myExchange
content-type: application/json
binder: defaultRabbit
1.3.2 启动类
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/11/17
* @description
*/
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
1.3.3 消费者
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author gaohu9712@163.com
* @date 2020/11/17
* @description
*/
@EnableBinding(Sink.class)
public class ConsumerController {
// 该注解表示该方法为消息中间件上数据流的事件监听器,Sink.INPUT 参数表示这是 input 消息通道上的监听处理器
@StreamListener(Sink.INPUT)
public void listener(Message<String> message) {
System.out.println("我是 8902 我收到了:" message.getPayload());
}
}
1.3.4 测试
这里我们创建两个测试类来进行测试,首先请求消息生产者发出消息,然后可以看到两个消息消费者都受到了同一条消息。。
1.4 分组
1.4.1 概述
上面的测试中我们发现只要订阅了主题的消费者都会收到消息进行消费,但是我们有些时候仅仅只需要一个消费者去消费。stream 消息分组可以完美解决这个问题,处于一个分组内的消费者属于竞争关系,消息只会被一个消费者消费。
1.4.2 配置文件
代码语言:javascript复制spring:
cloud:
stream:
bindings:
input:
# 通过 group 配置分组信息
group: GroupA
1.4.3 测试
持久化 如果有分组的服务,重启之后则可以消费待消费的消息,如果没有分组的服务,不能消费重启之前的消息,会造成消息丢失。
1.5 自定义通道
1.5.1 消息输入通道
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/11/17
* @description 消费者通道
*/
public interface MySink {
String INPUT = "mychannel";
@Input(INPUT)
SubscribableChannel input();
}
1.5.2 消息输出通道
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/11/17
* @description 生产者通道
*/
public interface MySource {
@Output(MySink.INPUT)
MessageChannel output();
}
1.5.3 生产者
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/11/17
* @description 绑定自定义通道的生产者
*/
@EnableBinding(MySource.class)
public class SendMessageImpl implements SendMessage {
@Autowired
private MySource mySource;
@Override
public String send() {
String str = UUID.randomUUID().toString();
mySource.output().send(MessageBuilder.withPayload(str).build());
return "我发出了:" str;
}
}
1.5.4 生产者配置文件
代码语言:javascript复制spring:
application:
name: cloud-stream-provider
cloud:
stream:
# 在此处配置要绑定的 RabbitMQ 的服务信息
binders:
# 表示定义的名称,用于 binding 的整合
defaultRabbit:
# 消息中间件类型
type: rabbit
# 设置 RabbitMQ 的相关环境配置
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
# 服务的整合处理
bindings:
mychannel:
content-type: application/json
binder: defaultRabbit
group: GroupA
1.5.5 消费者
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/11/17
* @description 绑定自定义通道的消费者
*/
@EnableBinding(MySink.class)
public class ConsumerController {
@StreamListener(MySink.INPUT)
public void listener(Message<String> message) {
System.out.println("我是 8901 我收到了:" message.getPayload());
}
}
1.5.6 消费者配置文件
代码语言:javascript复制spring:
application:
name: cloud-stream-consumer_01
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
bindings:
mychannel:
content-type: application/json
binder: defaultRabbit
group: GroupA
1.5.7 启动
启动之后我们就可以看到 Rabbit MQ 中出现了我们自定义的 mychannel