SpringCloud Stream消息驱动

2023-10-25 11:22:10 浏览数 (2)

消息驱动概述

什么是消息驱动?

  • 什么是SpringCloudStream
  • 官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
  • 应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。
  • 通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。
  • 所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
  • 通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
  • Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
  • 目前仅支持RabbitMQ、Kafka。
  • 屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型
  • 官网

Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架,该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的Spring熟语和最佳实践上,包括支持持久化的发布/订阅、消费组以及消息分区这三个核心概念

  • 参考文档
  • 中文指导手册

设计思想

标准MQ
  • 生产者/消费者之间靠消息媒介传递信息内容
    • Message
  • 消息必须走特定的通道
    • 消息通道MessageChannel
  • 消息通道里的消息如何被消费呢,谁负责收发处理
    • 消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅
为什么是Cloud Stream?

比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同, 像RabbitMQ有exchange,kafka有Topic和Partitions分区

这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

  • stream凭什么可以统一底层差异?
    • 在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,
    • 由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性
    • 通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
    • 通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
    • 通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
  • Binder(对象)是什么?

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。 Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程

  • 通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
  • Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。
Stream中的消息通信方式遵循了发布-订阅模式
  • Topic主题进行广播
    • 在RabbitMQ就是Exchange
    • 在Kakfa中就是Topic

Spring Cloud Stream标准流程套路

  • Binder
    • 很方便的连接中间件,屏蔽差异
  • Channel
    • 通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  • Source和Sink
    • 简单的可理解为参照对象是Spring Cloud Stream自身, 从Stream发布消息就是输出,接受消息就是输入。

编码API与常用注解

案例说明

上文中《SpringCloud Bus消息总线》已经完成对RabbitMQ环境的本地搭建

  • 以下构建3个模块对SpringCloud Stream消息驱动整合RabbitMQ功能的演示
    • cloud-stream-rabbitmq-provider8801, 作为生产者进行发消息模块
    • cloud-stream-rabbitmq-consumer8802,作为消息接收模块
    • cloud-stream-rabbitmq-consumer8803 作为消息接收模块
      • 8802/8803构建一致,参考构建即可

消息驱动之生产者

搭建环境

  • 新建Module,cloud-stream-rabbitmq-provider8801
  • pom文件
代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <parent>
    <artifactId>cloud2023</artifactId>
    <groupId>top.ljzstudy.springcloud</groupId>
    <version>1.0-SNAPSHOT</version>
  </parent>
  <modelVersion>4.0.0</modelVersion>

  <artifactId>cloud-stream-rabbitmq-provider8801</artifactId>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <!--基础配置-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <scope>runtime</scope>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>
  • application.yaml文件
代码语言:javascript复制
server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
#          environment: # 设置rabbitmq的相关的环境配置
#            spring:
#              rabbitmq:
#                host: localhost
#                port: 5672
#                username: guest
#                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
#           binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: send-8801.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址
management:
  health:
    rabbit:
      enabled: false
  • 主启动类
代码语言:javascript复制
package top.ljzstudy.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class, args);
    }
}

编写业务

  • 编写发送消息接口Send()
代码语言:javascript复制
package top.ljzstudy.springcloud.serivice;

public interface MyMessageProvider {
    String send();
}
  • 接口实现类
代码语言:javascript复制
package top.ljzstudy.springcloud.serivice.impl;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import top.ljzstudy.springcloud.serivice.MyMessageProvider;

import javax.annotation.Resource;
import java.util.UUID;

@Slf4j
@EnableBinding(Source.class)//定义消息推送通道,开启Channel和Exchange进行绑定
public class MyMessageProviderImpl implements MyMessageProvider {

    @Resource
    private MessageChannel output;//消息发出管道
    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        log.info("******serial" serial);
        return serial;
    }
}
  • Controller
代码语言:javascript复制
package top.ljzstudy.springcloud.controller;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import top.ljzstudy.springcloud.serivice.MyMessageProvider;

import javax.annotation.Resource;

@RestController
public class SendMessageController {
    @Resource
    private MyMessageProvider myMessageProvider;
    @GetMapping(value="/sendMessage")
    public String sendMessage(){
        return myMessageProvider.send();
    }
}

测试

  • 启动Eureka7001注册中心
  • 启动RabbitMQ
    • DOS窗口下进入sbin路径,执行命令rabbitmq-plugins enable rabbitmq_management
    • 浏览器访问http://localhost:15672/,输入默认账密guest进入监控中心
  • 启动消息生产者8801
  • 访问http://localhost:8801/sendMessage
  • 大量访问,观察RabbitMQ监控中心曲线,同时观察控制台输出日志

消息驱动之消费者

搭建环境

  • 新建Module,cloud-stream-rabbitmq-consumer8802
  • pom文件
代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <parent>
    <artifactId>cloud2023</artifactId>
    <groupId>top.ljzstudy.springcloud</groupId>
    <version>1.0-SNAPSHOT</version>
  </parent>
  <modelVersion>4.0.0</modelVersion>

  <artifactId>cloud-provider-payment8002</artifactId>

  <dependencies>
    <!--eureka-client-->
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency><!-- 引入自己定义的api通用包,可以使用Payment支付Entity -->
      <groupId>top.ljzstudy.springcloud</groupId>
      <artifactId>cloud-api-commons</artifactId>
      <version>1.0-SNAPSHOT</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
      <groupId>org.mybatis.spring.boot</groupId>
      <artifactId>mybatis-spring-boot-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>druid-spring-boot-starter</artifactId>
      <version>1.1.10</version>
    </dependency>
    <!--mysql-connector-java-->
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <!--jdbc-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <scope>runtime</scope>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>
  </dependencies>

</project>
  • application.yaml文件
代码语言:javascript复制
server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
      #          environment: # 设置rabbitmq的相关的环境配置
      #            spring:
      #              rabbitmq:
      #                host: localhost
      #                port: 5672
      #                username: guest
      #                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
#           binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: receive-8802.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址
management:
  health:
    rabbit:
      enabled: false
  • 主启动类
代码语言:javascript复制
package top.ljzstudy.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamMQMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8802.class, args);
    }
}

编写业务

  • 编写监听器,监听生产者的消息信道
代码语言:javascript复制
package top.ljzstudy.springcloud.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        log.info("消费者1号,--->接受到的消息: " message.getPayload() "t port: " serverPort);
    }
}

测试

  • 依次启动Eureka7001,消息生产者8801和消息消费者8802
  • 访问http://localhost:8801/sendMessage,模拟8801向8802发消息
  • 观察RabbitMQ监控中心
  • 观察idea控制台Console输入日志

分组消费与持久化

构建消费者集群模拟生产环境

  • 仿造消费者8802,clone一份消费者8803模块
  • 依次启动RabbitMQ,注册中心7001,消息生产者8801,消息消费者8802/8803
  • 访问http://localhost:8801/sendMessage,向8802/8803发消息,观察8802/8803控制台输出

默认配置中生产环境下暴露的问题

运行后有两个问题
  • 有重复消费问题,目前是8802/8803同时都收到了,存在重复消费问题
  • 消息持久化问题
生产实际案例

  • 比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,
  • 那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。
  • 这时我们就可以使用Stream中的消息分组来解决

  • 注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
  • 不同组是可以全面消费的(重复消费),
  • 同一组内会发生竞争关系,只有其中一个可以消费。

配置分组消费

分组原理

  • 微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。
  • 不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。
8802/8803都变成不同组,group两个不同
  • 修改8802/8803,application.yaml文件
    • 8802
代码语言:javascript复制
# 新增配置
spring:
  cloud:
    stream:
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          group: ljzstudy_group1 
  • 8803
代码语言:javascript复制
# 新增配置
spring:
  cloud:
    stream:
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          group: ljzstudy_group2
  • 重启8802/8803服务,查看RabbitMQ监控中心配置
  • 访问http://localhost:8801/sendMessage,向8802/8803发消息,观察8802/8803控制台输出
  • 结论==>仍是出现重复消费现象,但由于是自定义的分组,消息持久化已经实现
  • 小结==>8802/8803实现了轮询分组,每次只有一个消费者,8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。
8802/8803都变成相同组,group两个相同
  • 修改8802/8803,application.yaml文件
代码语言:javascript复制
# 新增配置
spring:
  cloud:
    stream:
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          group: ljzstudy
  • 重启服务,观察控制台
  • 结论==>同一个组的多个微服务实例,每次只会有一个拿到

指定分组消费兼具消息持久化

  • 通过上述,解决了重复消费问题,再看看持久化
  • 停止8802/8803并去除掉8802的分组group: ljzstudy
  • 8803的分组group: ljzstudy没有去掉
  • 8801先发送4条消息到rabbitmq
  • 先启动8802,无分组属性配置,后台没有打出来消息
  • 程序加载完毕,未见日志消息
  • 结论==>默认分组回出现消息丢失现象
  • 再启动8803,有分组属性配置,后台打出来了MQ上的消息
  • 反观8803,在程序完全加载完毕之前就已经获取了,8801发送的消息
  • 结论==>自定义分组不仅可以避免重复消费问题,同时兼具消息持久化功能

0 人点赞