通过分析SpringCloud Stream 消费者端的工作流程,涉及到的主要依赖有:
spring-cloud-stream
spring-rabbit
spring-amqp
spring-messaging
amqp-client
消息驱动
1 分析过程
1.1 准备工作
案例中通过rabbitMQ作为消息中间件,完成SpringCloud Stream消息驱动的分析
1.2 消息生产者
1.2.1 创建工程引入依赖
代码语言:txt复制<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
1.2.2 定义BINGDING
发送消息时需要定义一个接口,不同的是接口方法的返回对象是 MessageChannel,下面是 Spring Cloud Stream 内置的接口:
代码语言:txt复制public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
这就接口声明了一个 binding 命名为 “output”。这个binding 声明了一个消息输出流,也就是消息的生产者。
1.2.3 配置APPLICATION.YML
代码语言:txt复制server:
port: 7001 #服务端口
spring:
application:
name: stream_producer #指定服务名
rabbitmq:
addresses: 192.168.142.128
username: root
password: 123456
virtual-host: /test
cloud:
stream:
bindings:
output:
destination: root-default #指定消息发送的目的地,在rabbitmq中,发送到一个root-default的exchange中
binders: #配置绑定器
defaultRabbit:
type: rabbit
1.2.4 测试发送消息
代码语言:txt复制* 启动类
* 入门案例:
* 1.引入依赖
* 2.配置application.yml文件
* 3.发送消息的话,定义一个通道接口,通过接口中内置的messagechannel
* SpringCloudStream中内置接口 Source
* 4.@EnableBinding : 绑定对应通道
* 5.发送消息的话,通过MessageChannel发送消息
* 如果需要MessageChannel --> 通过绑定的内置接口获取
*
* @author
*/
@SpringBootApplication
@EnableBinding(Source.class)
public class ProducerApplicationDemo implements CommandLineRunner {
@Autowired
private MessageChannel output;
public static void main(String[] args) {
SpringApplication.run(ProducerApplicationDemo.class);
}
@Override
public void run(String... args) throws Exception {
//发送MQ消息
//messagesBuilder:工具类:创建消息
output.send(MessageBuilder.withPayload("hello world").build());
}
}
1.3 消息消费者
1.3.1 创建工程引入依赖
代码语言:txt复制<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
1.3.2 定义BINGDING
同发送消息一致,在Spring Cloud Stream中接受消息,需要定义一个接口,如下是内置的一个接口。
代码语言:txt复制public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
注释 @Input 对应的方法,需要返回 SubscribableChannel ,并且参入一个参数值。
这就接口声明了一个 binding 命名为 “input” 。
1.3.3 配置APPLICATION.YML
代码语言:txt复制server:
port: 7002 #服务端口
spring:
application:
name: rabbitmq-consumer #指定服务名
rabbitmq:
addresses: 192.168.142.128
username: wgy
password: 123456
virtual-host: /test
cloud:
stream:
bindings:
input: #内置的获取消息的通道 , 从wgy-default中获取消息
destination: wgy-default
binders:
defaultRabbit:
type: rabbit
1.3.4 测试
代码语言:txt复制/**
* 启动类
* 入门案例:
* 1.引入依赖
* 2.配置application.yml
* 3.需要配置一个通道的接口
* 内置获取消息的通道接口 sink
* 4.绑定通道
* 5.配置一个监听方法 : 当程序从中间件获取数据之后,执行的业务逻辑方法
* 需要在监听方法上配置@StreamListener
*
* @author
*/
@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsumerApplicationDemo {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplicationDemo.class);
}
/**
* 监听binding中的消息
* @param message
*/
@StreamListener(Sink.INPUT)
public void input(String message) {
System.out.println("获取到消息:" message);
}
}
1.4 定义工具类
1.4.1 消息生产者
代码语言:txt复制/**
* 负责向中间件发送数据
*
* @author
*/
@Component
@EnableBinding(Source.class)
public class MessageSender {
@Autowired
private MessageChannel output;
/**
* 发送消息
*
* @param obj
*/
public void send(Object obj) {
output.send(MessageBuilder.withPayload(obj).build());
}
}
1.4.2 消息消费者
代码语言:txt复制/**
* 负责向中间件获取数据
*
* @author
*/
@Component
@EnableBinding(Sink.class)
public class MessageListener {
/**
* 监听binding中的消息
*
* @param message
*/
@StreamListener(Sink.INPUT)
public void input(String message) {
System.out.println("获取到消息:" message);
}
}
1.4.3 测试
代码语言:txt复制/**
* 测试类
*
* @author
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProducerTest {
@Autowired
private MessageSender messageSender;
@Test
public void testSend() {
messageSender.send("hello 工具类");
}
}
具体如下图所示:
2 自定义消息通道
Spring Cloud Stream 内置了两种接口,分别定义了 binding 为 “input” 的输入流,和 “output” 的输出流,而在我们实际使用中,往往是需要定义各种输入输出流。使用方法也很简单。
代码语言:txt复制/**
* 自定义的消息通道
*
* @author
*/
public interface MyProcessor {
/**
* 消息生产者的配置
*/
String MYOUTPUT = "myoutput";
/**
* 消息消费者的配置
*/
String MYINPUT = "myinput";
@Output("myoutput")
MessageChannel myoutput();
@Input("myinput")
SubscribableChannel myinput();
}
- 一个接口中,可以定义无数个输入输出流,可以根据实际业务情况划分。上述的接口,定义了一个订单输入,和订单输出两个 binding。
- 使用时,需要在@EnableBinding注解中,添加自定义的接口。
- 使用@StreamListener做监听的时候,需要指定MyProcessor.MYINPUT
2.1 消息生产者
代码语言:txt复制/**
* 负责向中间件发送数据
*
* @author
*/
@Component
@EnableBinding(MyProcessor.class)
public class MessageSender {
@Autowired
@Qualifier(value = "myoutput")
private MessageChannel myoutput;
/**
* 发送消息
*
* @param obj
*/
public void send(Object obj) {
myoutput.send(MessageBuilder.withPayload(obj).build());
}
}
代码语言:txt复制server:
port: 7001 #服务端口
spring:
application:
name: stream_producer #指定服务名
rabbitmq:
addresses: 192.168.142.128
username: root
password: 123456
virtual-host: /test
cloud:
stream:
bindings:
output:
destination: root-default #指定消息发送的目的地,在rabbitmq中,发送到一个root-default的exchange中
myoutput:
destination: root-custom-output
binders: #配置绑定器
defaultRabbit:
type: rabbit
2.2 消息消费者
代码语言:txt复制/**
* 负责向中间件获取数据
*
* @author
*/
@Component
@EnableBinding(MyProcessor.class)
public class MessageListener {
/**
* 监听binding中的消息
*
* @param message
*/
@StreamListener(MyProcessor.MYINPUT)
public void input(String message) {
System.out.println("获取到消息:" message);
}
}
代码语言:txt复制server:
port: 7002 #服务端口
spring:
application:
name: rabbitmq-consumer #指定服务名
rabbitmq:
addresses: 192.168.142.128
username: root
password: 123456
virtual-host: /test
cloud:
stream:
bindings:
input: #内置的获取消息的通道 , 从root-default中获取消息
destination: root-default
myinput:
destination: root-custom-output
binders:
defaultRabbit:
type: rabbit
3 消息分组
通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。
实现的方式非常简单,我们只需要在服务消费者端设置属性即可,如以下实现:
代码语言:txt复制server:
port: 7002 #服务端口
spring:
application:
name: rabbitmq-consumer #指定服务名
rabbitmq:
addresses: 192.168.142.128
username: root
password: 123456
virtual-host: /test
cloud:
stream:
bindings:
input: #内置的获取消息的通道 , 从root-default中获取消息
destination: root-default
myinput:
destination: root-custom-output
group: group1 #设置消息的组名称(同名组中的多个消费者,只会有一个去消费消息)
binders:
defaultRabbit:
type: rabbit
在同一个group中的多个消费者只有一个可以获取到消息并消费
4 消息分区
有一些场景需要满足, 同一个特征的数据被同一个实例消费, 比如同一个id的传感器监测数据必须被同一个实例统计计算分析, 否则可能无法获取全部的数据。又比如部分异步任务,首次请求启动task,二次请求取消task,此场景就必须保证两次请求至同一实例.
4.1 消息消费者0
代码语言:txt复制server:
port: 7002 #服务端口
spring:
application:
name: rabbitmq-consumer #指定服务名
rabbitmq:
addresses: 192.168.142.128
username: root
password: 123456
virtual-host: /test
cloud:
stream:
instanceCount: 2 #消费者总数
instanceIndex: 0 #当前消费者的索引
bindings:
input: #内置的获取消息的通道 , 从root-default中获取消息
destination: root-default
myinput:
destination: root-custom-output
group: group1 #设置消息的组名称(同名组中的多个消费者,只会有一个去消费消息)
consumer:
partitioned: true #开启分区支持
binders:
defaultRabbit:
type: rabbit
从上面的配置中,我们可以看到增加了这三个参数:
- spring.cloud.stream.bindings.input.consumer.partitioned :通过该参数开启消费者分区功能;
- spring.cloud.stream.instanceCount:该参数指定了当前消费者的总实例数量;
- spring.cloud.stream.instanceIndex :该参数设置当前实例的索引号,从0开始,最大值为spring.cloud.stream.instanceCount 参数 - 1。我们试验的时候需要启动多个实例,可以通过运行参数来为不同实例设置不同的索引值。
4.2 消息消费者1
代码语言:txt复制server:
port: 7003 #服务端口
spring:
application:
name: rabbitmq-consumer #指定服务名
rabbitmq:
addresses: 192.168.142.128
username: root
password: 123456
virtual-host: /test
cloud:
stream:
instanceCount: 2 #消费者总数
instanceIndex: 1 #当前消费者的索引
bindings:
input: #内置的获取消息的通道 , 从root-default中获取消息
destination: root-default
myinput:
destination: root-custom-output
group: group1 #设置消息的组名称(同名组中的多个消费者,只会有一个去消费消息)
consumer:
partitioned: true #开启分区支持
binders:
defaultRabbit:
type: rabbit
4.3 消息生产者
代码语言:txt复制server:
port: 7001 #服务端口
spring:
application:
name: stream_producer #指定服务名
rabbitmq:
addresses: 192.168.142.128
username: root
password: 123456
virtual-host: /test
cloud:
stream:
bindings:
output:
destination: rootdefault #指定消息发送的目的地,在rabbitmq中,发送到一个root-default的exchange中
myoutput:
destination: root-custom-output
producer:
partition-key-expression: payload #分区关键字 对象中的id,对象
partition-count: 2 #分区大小
binders: #配置绑定器
defaultRabbit:
type: rabbit
从上面的配置中,我们可以看到增加了这两个参数:
- spring.cloud.stream.bindings.output.producer.partitionKeyExpression :通过该参数指定了分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键;
- spring.cloud.stream.bindings.output.producer.partitionCount :该参数指定了消息分区的数量。
到这里消息分区配置就完成了,我们可以再次启动这两个应用,同时消费者启动多个,但需要注意的是要为消费者指定不同的实例索引号,这样当同一个消息被发给消费组时,我们可以发现只有一个消费实例在接收和处理这些相同的消息。