文章目录
- 概述
- 添加依赖
- 配置文件配置RabbitMQ的地址信息
- 接口定义
- 接收方 @EnableBinding @StreamListener
- 测试
- 消费组
- 发送复杂对象
- 消息回执
- 代码
概述
官网 : https://spring.io/projects/spring-cloud-stream
概括来说,Spring Cloud Stream 进一步封装了消息队列,可以做到代码层面对消息队列无感知。
这里我们仅仅是做个入门级别的介绍,更多用法还是参考官网上的指导说明,毕竟最权威了。
添加依赖
无需多说,要想使用Spring Cloud Stream ,第一步肯定是添加依赖了 ,如下
这里使用的消息队列是 RabbitMQ ,如果你是用的是kafka,换成对应的spring-cloud-starter-stream-kafka
依赖即可
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
配置文件配置RabbitMQ的地址信息
spring-cloud-starter-stream-rabbit是Spring Cloud Stream对RabbitMQ的封装,包含了对RabbitMQ的自动化配置,比如连接的RabbitMQ的默认地址localhost,默认端口5672,默认用户guest,默认密码guest,如果采用的是如上默认配置,可以不用修改配置。
这里我把配置文件放到了远端的Git,通过config server 拉取配置。
RabbitMQ的安装 ,这里我选择了使用Docker镜像,安装如下
在Docker CE中安装RabbitMQ
接口定义
可知: Sink和Source两个接口分别定义了输入通道和输出通道,Processor通过继承Source和Sink,同时具有输入通道和输出通道。这里我们就模仿Sink和Source,自定义一个消息通道。
代码语言:javascript复制package com.artisan.order.message;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface ArtisanSink {
// 同一个服务里面的通道名字不能一样,在不同的服务里可以相同名字的通道
// 否则启动抛出如下异常 bean definition with this name already exists
String INPUT = "MyMsgInput";
@Input(ArtisanSink.INPUT)
SubscribableChannel input();
}
如上定义了一个名为MyMsgInput的消息输入通道,@Input注解的参数则表示了消息通道的名称
接收方 @EnableBinding @StreamListener
StreamReceive 用来接收RabbitMQ发送来的消息
代码语言:javascript复制package com.artisan.order.message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
/**
* 接收方
*/
@Component
// Step1 注解 绑定刚才的接口
@EnableBinding(ArtisanSink.class)
@Slf4j
public class StreamReceive {
// Step2 @StreamListener 绑定对象的名称
@StreamListener(ArtisanSink.INPUT)
public void processStreamMsg(Object msg){
log.info("StreamReceive: {}",msg);
}
}
- 第一步: 使用了
@EnableBinding
注解实现对消息通道的绑定,我们在该注解中还传入了一个参数ArtisanSink.class,ArtisanSink是一个自定义接口,主要功能是实现对输入消息通道绑定的定义。 - 第二步:在StreamReceive 类中定义了processStreamMsg方法,重点是在该方法上添加了
@StreamListener
注解,该注解表示该方法为消息中间件上数据流的事件监听器,ArtisanSink.INPUT参数表示这是input消息通道上的监听处理器。
测试
模拟发送发发送消息,方便起见,我们直接在Controller层写个方法吧
代码语言:javascript复制package com.artisan.order.controller;
import com.artisan.order.message.Sink;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class MsgStreamController {
@Autowired
private ArtisanSink sink;
@GetMapping("/sendMsgByStream")
public void sendMsgByStream(){
String message = "I am one msg sent by Spring Cloud Stream";
sink.input().send(MessageBuilder.withPayload(message).build());
}
}
通过 @Autowired自动注入刚才的Sink接口,然后调用 sink.input().send方法发送消息即可。
启动服务,观察RabbitMQ上的队列 ,自动创建了一个
点进去看下
MyMsgInput和 在接口中的定义一致 。
访问: http://localhost:8081/sendMsgByStream
观察日志:
代码语言:javascript复制2019-04-13 10:56:32.749 INFO 820 --- [nio-8081-exec-4] com.artisan.order.message.StreamReceive : StreamReceive: I am one msg sent by Spring Cloud Stream
接收方收到了一条消息如上,OK。
消费组
需求: 由于服务可能会有多个实例同时在运行,我们只希望消息被一个实例所接收
先来改造下项目,启动多个服务实例
为了多启动几个节点,我们需要把定义在远端Git上的要加载到bootstrap.yml中的端口信息给注释掉,否则第二个端口因端口冲突起不来。
然后通过如下方式在JVM参数中指定启动端口
第一个app 启动端口 -Dserver.port=8082
第一个app 启动端口 -Dserver.port=5656
启动后查看在Eureka Server上的注册情况
再看看RabbitMQ的消息队列情况,两个 OK
旧版本中 ,如果不做任何设置,此时发送一条消息将会被所有的实例接收到 ,但是可以通过消息分组来解决 。 具体可参考: https://segmentfault.com/a/1190000011796459
主要是配置分组
代码语言:javascript复制spring:
cloud:
stream:
bindings:
# MyMsgInput 自定义 order消费组
MyMsgInput:
# 消息组的名称
group: order
#输入通道的主题名
destination: MyMsgInput
#存在消息队列中的消息,如果是复杂对象,则以JSON的形式展示
content-type: application/json
新版本: Spring Boot : 2.0.3.RELEASE Spring Cloud : Finchley.RELEASE
经过测试 不存在这个问题
把这俩节点的日志信息都清空掉,重新发送个消息
我们就用5656这个节点好了 ,http://localhost:5656/sendMsgByStream 经过验证只有5656这一个节点收到了消息。无需设置分组。
发送复杂对象
上面的例子中我们发送的是一个字符串,
如果是复杂对象呢? 来测试下
代码语言:javascript复制 @GetMapping("/sendMsgByStream2")
public void sendMsgByStream2(){
OrderDTO orderDTO = new OrderDTO();
orderDTO.setOrderId("11111");
orderDTO.setOrderAmount(new BigDecimal(9999));
sink.input().send(MessageBuilder.withPayload(orderDTO).build());
}
启动5656端口的服务,访问 http://localhost:5656/sendMsgByStream2
观察日志:
代码语言:javascript复制2019-04-13 17:06:47.438 INFO 13764 --- [nio-5656-exec-1] com.artisan.order.message.StreamReceive : StreamReceive: OrderDTO(orderId=11111, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=9999, orderStatus=null, payStatus=null, orderDetailList=null)
OK。
这是我们如果把消息消费方注释掉,让消息累计在消息队列中,我们去看下消息队列中存储的复杂对象的格式
启动5656端口的服务,访问 http://localhost:5656/sendMsgByStream2
代码语言:javascript复制org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
消息回执
消费者收到消息后给发送方一个ACK确认,该如何做呢?
比如接收到消息后,返回给ArtisanSource.OUTPUT一个消息,直接使用@SendTo直接即可,就会将返回的字符串发送给ArtisanSource.OUTPUT通道
定义一个
代码语言:javascript复制package com.artisan.order.message;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface ArtisanSource {
String OUTPUT = "MyMsgOutput";
@Output(ArtisanSource.OUTPUT)
MessageChannel output();
}
写一个该消息的接收方
代码语言:javascript复制package com.artisan.order.message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
/**
* 接收方
*/
@Component
// Step1 注解 绑定刚才的接口
@EnableBinding(ArtisanSource.class)
@Slf4j
public class StreamReceive2 {
// Step2 @StreamListener 绑定对象的名称
@StreamListener(ArtisanSource.OUTPUT)
public void processStreamMsg2(String msg){
log.info("OUTPUT StreamReceive: {}",msg);
}
}
启动微服务,访问 http://localhost:5656/sendMsgByStream2
代码语言:javascript复制2019-04-13 18:06:51.817 INFO 972 --- [nio-5656-exec-1] com.artisan.order.message.StreamReceive : INPUT StreamReceive: OrderDTO(orderId=11111, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=9999, orderStatus=null, payStatus=null, orderDetailList=null)
2019-04-13 18:06:51.823 INFO 972 --- [nio-5656-exec-1] c.artisan.order.message.StreamReceive2 : OUTPUT StreamReceive: received OK
代码
https://github.com/yangshangwei/springcloud-o2o/tree/master/artisan_order