Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务

2021-08-17 16:27:37 浏览数 (1)

文章目录

  • 概述
  • 添加依赖
  • 配置文件配置RabbitMQ的地址信息
  • 接口定义
  • 接收方 @EnableBinding @StreamListener
  • 测试
  • 消费组
  • 发送复杂对象
  • 消息回执
  • 代码

概述

官网 : https://spring.io/projects/spring-cloud-stream

概括来说,Spring Cloud Stream 进一步封装了消息队列,可以做到代码层面对消息队列无感知。

这里我们仅仅是做个入门级别的介绍,更多用法还是参考官网上的指导说明,毕竟最权威了。


添加依赖

无需多说,要想使用Spring Cloud Stream ,第一步肯定是添加依赖了 ,如下

这里使用的消息队列是 RabbitMQ ,如果你是用的是kafka,换成对应的spring-cloud-starter-stream-kafka依赖即可

代码语言:javascript复制
		<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>		

配置文件配置RabbitMQ的地址信息

spring-cloud-starter-stream-rabbit是Spring Cloud Stream对RabbitMQ的封装,包含了对RabbitMQ的自动化配置,比如连接的RabbitMQ的默认地址localhost,默认端口5672,默认用户guest,默认密码guest,如果采用的是如上默认配置,可以不用修改配置。

这里我把配置文件放到了远端的Git,通过config server 拉取配置。

RabbitMQ的安装 ,这里我选择了使用Docker镜像,安装如下

在Docker CE中安装RabbitMQ


接口定义

可知: Sink和Source两个接口分别定义了输入通道和输出通道,Processor通过继承Source和Sink,同时具有输入通道和输出通道。这里我们就模仿Sink和Source,自定义一个消息通道。

代码语言:javascript复制
package com.artisan.order.message;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface ArtisanSink {

    // 同一个服务里面的通道名字不能一样,在不同的服务里可以相同名字的通道
    // 否则启动抛出如下异常  bean definition with this name already exists
    String INPUT = "MyMsgInput";


    @Input(ArtisanSink.INPUT)
    SubscribableChannel input();

}

如上定义了一个名为MyMsgInput的消息输入通道,@Input注解的参数则表示了消息通道的名称


接收方 @EnableBinding @StreamListener

StreamReceive 用来接收RabbitMQ发送来的消息

代码语言:javascript复制
package com.artisan.order.message;


import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

/**
 * 接收方
 */

@Component
// Step1 注解  绑定刚才的接口
@EnableBinding(ArtisanSink.class)
@Slf4j
public class StreamReceive {


    // Step2  @StreamListener 绑定对象的名称
    @StreamListener(ArtisanSink.INPUT)
    public void processStreamMsg(Object msg){
        log.info("StreamReceive: {}",msg);
    }

}
  • 第一步: 使用了@EnableBinding注解实现对消息通道的绑定,我们在该注解中还传入了一个参数ArtisanSink.class,ArtisanSink是一个自定义接口,主要功能是实现对输入消息通道绑定的定义。
  • 第二步:在StreamReceive 类中定义了processStreamMsg方法,重点是在该方法上添加了@StreamListener注解,该注解表示该方法为消息中间件上数据流的事件监听器,ArtisanSink.INPUT参数表示这是input消息通道上的监听处理器。

测试

模拟发送发发送消息,方便起见,我们直接在Controller层写个方法吧

代码语言:javascript复制
package com.artisan.order.controller;

import com.artisan.order.message.Sink;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
public class MsgStreamController {

    @Autowired
    private ArtisanSink sink;

    @GetMapping("/sendMsgByStream")
    public void sendMsgByStream(){
        String message = "I am one msg sent by Spring Cloud Stream";
        sink.input().send(MessageBuilder.withPayload(message).build());
    }
}

通过 @Autowired自动注入刚才的Sink接口,然后调用 sink.input().send方法发送消息即可。

启动服务,观察RabbitMQ上的队列 ,自动创建了一个

点进去看下

MyMsgInput和 在接口中的定义一致 。

访问: http://localhost:8081/sendMsgByStream

观察日志:

代码语言:javascript复制
2019-04-13 10:56:32.749  INFO 820 --- [nio-8081-exec-4] com.artisan.order.message.StreamReceive  : StreamReceive: I am one msg sent by Spring Cloud Stream

接收方收到了一条消息如上,OK。


消费组

需求: 由于服务可能会有多个实例同时在运行,我们只希望消息被一个实例所接收

先来改造下项目,启动多个服务实例

为了多启动几个节点,我们需要把定义在远端Git上的要加载到bootstrap.yml中的端口信息给注释掉,否则第二个端口因端口冲突起不来。

然后通过如下方式在JVM参数中指定启动端口 第一个app 启动端口 -Dserver.port=8082 第一个app 启动端口 -Dserver.port=5656

启动后查看在Eureka Server上的注册情况

再看看RabbitMQ的消息队列情况,两个 OK


旧版本中 ,如果不做任何设置,此时发送一条消息将会被所有的实例接收到 ,但是可以通过消息分组来解决 。 具体可参考: https://segmentfault.com/a/1190000011796459

主要是配置分组

代码语言:javascript复制
spring:
  cloud:
    stream:
      bindings:
        # MyMsgInput 自定义   order消费组
        MyMsgInput:
          # 消息组的名称
          group: order
          #输入通道的主题名
          destination: MyMsgInput
          #存在消息队列中的消息,如果是复杂对象,则以JSON的形式展示
          content-type: application/json

新版本: Spring Boot : 2.0.3.RELEASE Spring Cloud : Finchley.RELEASE

经过测试 不存在这个问题

把这俩节点的日志信息都清空掉,重新发送个消息

我们就用5656这个节点好了 ,http://localhost:5656/sendMsgByStream 经过验证只有5656这一个节点收到了消息。无需设置分组。


发送复杂对象

上面的例子中我们发送的是一个字符串,

如果是复杂对象呢? 来测试下

代码语言:javascript复制
 @GetMapping("/sendMsgByStream2")
    public void sendMsgByStream2(){
        OrderDTO orderDTO = new OrderDTO();
        orderDTO.setOrderId("11111");
        orderDTO.setOrderAmount(new BigDecimal(9999));
        sink.input().send(MessageBuilder.withPayload(orderDTO).build());
    }

启动5656端口的服务,访问 http://localhost:5656/sendMsgByStream2

观察日志:

代码语言:javascript复制
2019-04-13 17:06:47.438  INFO 13764 --- [nio-5656-exec-1] com.artisan.order.message.StreamReceive  : StreamReceive: OrderDTO(orderId=11111, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=9999, orderStatus=null, payStatus=null, orderDetailList=null)

OK。

这是我们如果把消息消费方注释掉,让消息累计在消息队列中,我们去看下消息队列中存储的复杂对象的格式

启动5656端口的服务,访问 http://localhost:5656/sendMsgByStream2

代码语言:javascript复制
org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers

消息回执

消费者收到消息后给发送方一个ACK确认,该如何做呢?

比如接收到消息后,返回给ArtisanSource.OUTPUT一个消息,直接使用@SendTo直接即可,就会将返回的字符串发送给ArtisanSource.OUTPUT通道

定义一个

代码语言:javascript复制
package com.artisan.order.message;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface ArtisanSource {

    String OUTPUT = "MyMsgOutput";

    @Output(ArtisanSource.OUTPUT)
    MessageChannel output();
}

写一个该消息的接收方

代码语言:javascript复制
package com.artisan.order.message;


import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

/**
 * 接收方
 */

@Component
// Step1 注解  绑定刚才的接口
@EnableBinding(ArtisanSource.class)
@Slf4j
public class StreamReceive2 {


    // Step2  @StreamListener 绑定对象的名称
    @StreamListener(ArtisanSource.OUTPUT)
    public void processStreamMsg2(String msg){
        log.info("OUTPUT StreamReceive: {}",msg);
    }

}

启动微服务,访问 http://localhost:5656/sendMsgByStream2

代码语言:javascript复制
2019-04-13 18:06:51.817  INFO 972 --- [nio-5656-exec-1] com.artisan.order.message.StreamReceive  : INPUT StreamReceive: OrderDTO(orderId=11111, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=9999, orderStatus=null, payStatus=null, orderDetailList=null)
2019-04-13 18:06:51.823  INFO 972 --- [nio-5656-exec-1] c.artisan.order.message.StreamReceive2   : OUTPUT StreamReceive: received OK 

代码

https://github.com/yangshangwei/springcloud-o2o/tree/master/artisan_order

0 人点赞