spring cloud stream 介绍(照搬)
Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用
Spring Integration与 Broker 进行连接。 Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。 Spring Cloud Stream 内部有两个概念:Binder 和 Binding。Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。
比如
Kafka的实现KafkaMessageChannelBinder,RabbitMQ的实现RabbitMessageChannelBinder以及RocketMQ的实现RocketMQMessageChannelBinder。Binding: 包括 Input Binding 和 Output Binding。Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。
版本选择
因为不是用于开发,仅供学习用所以我参照了下阿里的版本,选用了最新的,具体依据自身项目做参考
阿里git版本说明传送门
阿里组件
毕业版本依赖
一. pom.xml 配置
代码语言:txt复制<?xml version="1.0" encoding="UTF-8"?>代码语言:txt复制<project xmlns="http://maven.apache.org/POM/4.0.0"代码语言:txt复制 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"代码语言:txt复制 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">代码语言:txt复制 <modelVersion>4.0.0</modelVersion>代码语言:txt复制 <parent>代码语言:txt复制 <groupId>org.springframework.boot</groupId>代码语言:txt复制 <artifactId>spring-boot-starter-parent</artifactId>代码语言:txt复制 <version>2.4.2</version>代码语言:txt复制 </parent>代码语言:txt复制 <groupId>org.example</groupId>代码语言:txt复制 <artifactId>spring-alibaba-cloud-rocketmq-studytest</artifactId>代码语言:txt复制 <version>1.0-SNAPSHOT</version>代码语言:txt复制 <description>阿里巴巴cloud-rocketmq学习</description>代码语言:txt复制 <properties>代码语言:txt复制 <java.version>1.8</java.version>代码语言:txt复制 </properties>代码语言:txt复制 <dependencyManagement>代码语言:txt复制 <dependencies>代码语言:txt复制 <!-- spring cloud 版本依赖-->代码语言:txt复制 <dependency>代码语言:txt复制 <groupId>org.springframework.cloud</groupId>代码语言:txt复制 <artifactId>spring-cloud-dependencies</artifactId>代码语言:txt复制 <version>2020.0.0</version>代码语言:txt复制 <type>pom</type>代码语言:txt复制 <scope>import</scope>代码语言:txt复制 </dependency>代码语言:txt复制 <!-- spring alibaba cloud 版本依赖-->代码语言:txt复制 <dependency>代码语言:txt复制 <groupId>com.alibaba.cloud</groupId>代码语言:txt复制 <artifactId>spring-cloud-alibaba-dependencies</artifactId>代码语言:txt复制 <version>2021.1</version>代码语言:txt复制 <type>pom</type>代码语言:txt复制 <scope>import</scope>代码语言:txt复制 </dependency>代码语言:txt复制 </dependencies>代码语言:txt复制 </dependencyManagement>代码语言:txt复制 <dependencies>代码语言:txt复制 <!-- rocketmq 依赖-->代码语言:txt复制 <dependency>代码语言:txt复制 <groupId>com.alibaba.cloud</groupId>代码语言:txt复制 <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>代码语言:txt复制 </dependency>代码语言:txt复制 <!-- spring boot web依赖-->代码语言:txt复制 <dependency>代码语言:txt复制 <groupId>org.springframework.boot</groupId>代码语言:txt复制 <artifactId>spring-boot-starter-web</artifactId>代码语言:txt复制 </dependency>代码语言:txt复制 <!-- lombok -->代码语言:txt复制 <dependency>代码语言:txt复制 <groupId>org.projectlombok</groupId>代码语言:txt复制 <artifactId>lombok</artifactId>代码语言:txt复制 </dependency>代码语言:txt复制 </dependencies>代码语言:txt复制</project>二. 自定义消息channel与rocketMq配置
上面我们引入了spring cloud alibab rocketmq相关依赖,下面我们开始消息通道与yml关于rocketmq的配置undefined 由于阿里的spring-cloud-starter-stream-rocketmq 是依赖spring的stream binder实现的,所以rocketMq配置分为rocketMq的自定义配置与stream binder的公共配置,如下:
spring.cloud.stream.rocketmq为rocketmq自定义配置spring.cloud.stream.bindings为srping cloud stream binder公共配置,以此来达到对Apache KafkaRabbitMQ等消息中间件的扩展
- 1. 自定义普通消息
- 普通消息YML配置
spring:代码语言:txt复制 cloud:代码语言:txt复制 stream:代码语言:txt复制 # 阿里rocketMq配置 topic 与 group 均以 实例id% 为前缀配置 如实例id为 MQ_INST_XXXX_XXX 则group或topic 配置 MQ_INST_XXXX_XXX%grouID代码语言:txt复制 rocketmq:代码语言:txt复制 binder:代码语言:txt复制 # 【若为阿里云购买服务,则为控制台的对外或对内实例地址】【若自己搭建的服务,为自定义rocketmq服务地址127.0.0.1:9876】代码语言:txt复制 name-server: http://MQ_INST_XXXX_XXX.DD.FFF.aliyuncs.com:80代码语言:txt复制 # 阿里access-key 【购买阿里服务 控制台获取填写 若为自搭服务可不填】代码语言:txt复制 access-key: AAAAAAAAAAA代码语言:txt复制 # 阿里secret-key 【购买阿里服务 控制台获取填写 若为自搭服务可不填】代码语言:txt复制 secret-key: BBBBBBBBBBBBBB代码语言:txt复制 # rocketMq 自定义消息通道配置代码语言:txt复制 bindings:代码语言:txt复制 # 阿里rocketMq binder 生产者配置代码语言:txt复制 ### 普通生产消息通道代码语言:txt复制 customized_output_channel: {producer.group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV}代码语言:txt复制 # 阿里rocketMq binder 消费者配置代码语言:txt复制 ### 普通消息订阅通道代码语言:txt复制 customized_input_channel: {consumer.tags: test_consumer_tag}代码语言:txt复制 # stream binder 公共配置代码语言:txt复制 bindings:代码语言:txt复制 # spring cloud stream binder 生产者配置代码语言:txt复制 ### 普通消息通道代码语言:txt复制 customized_output_channel: {destination: MQ_INST_XXXX_XXX%mg_common_topic, content-type: application/json}代码语言:txt复制 # spring cloud stream binder 消费者配置代码语言:txt复制 ### 普通消息订阅通道代码语言:txt复制 customized_input_channel: {destination: MQ_INST_XXXX_XXX%mg_common_topic, group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, content-type: application/json}关于rocketmq的group 与 topic在yml中的书写方式,官方文档是这么写的undefined
topic 和 group 请以 实例id% 为前缀进行配置。比如 topic 为 "test",需要配置成 "实例id%test"官方文档地址 滑到最后,但是我试过去掉后也能正常使用(可能出于兼容自搭RocketMq服务的目的),可能是购买阿里服务的需要这么填写,消息轨迹或者其他内容需要获取实例信息,这样书写方便快速获取?具体原因还需观察源码,暂时按照官方的来。
- 自定义channel接口
代码语言:txt复制spring cloud stream 提供了自定义的Mesage接口
Source和Sink供开发者使用,通过在程序启动类或者服务类添加注解来启用, 如下:
import lombok.extern.slf4j.Slf4j;代码语言:txt复制import org.springframework.boot.SpringApplication;代码语言:txt复制import org.springframework.boot.autoconfigure.SpringBootApplication;代码语言:txt复制import org.springframework.cloud.stream.annotation.EnableBinding;代码语言:txt复制import org.springframework.cloud.stream.messaging.Sink;代码语言:txt复制import org.springframework.cloud.stream.messaging.Source;代码语言:txt复制@Slf4j代码语言:txt复制@EnableBinding({Source.class, Sink.class})代码语言:txt复制@SpringBootApplication代码语言:txt复制public class RocketApplication {代码语言:txt复制 public static void main(String[] args) {代码语言:txt复制 SpringApplication.run(RocketApplication.class, args);代码语言:txt复制 log.debug("==========rocketMq服务启动成功!==========");代码语言:txt复制 }代码语言:txt复制}代码语言:txt复制@Component代码语言:txt复制@EnableBinding(Source.class)代码语言:txt复制public class RocketMqService {
Source提供了生产者的接口,而Sink提供了消费者的接口,通过观察源码,我们可以发现,接口类的内容十分简单。
- source
package org.springframework.cloud.stream.messaging;代码语言:txt复制import org.springframework.cloud.stream.annotation.Output;代码语言:txt复制import org.springframework.messaging.MessageChannel;代码语言:txt复制/**代码语言:txt复制 * Bindable interface with one output channel.
*
* @author Dave Syer
* @author Marius Bogoevici
* @see org.springframework.cloud.stream.annotation.EnableBinding
*/
public interface Source {代码语言:txt复制 /**代码语言:txt复制 * Name of the output channel.
*/
String OUTPUT = "output";代码语言:txt复制 /**代码语言:txt复制 * @return output channel
*/
@Output(Source.OUTPUT)
MessageChannel output();代码语言:txt复制}- sink
package org.springframework.cloud.stream.messaging;代码语言:txt复制import org.springframework.cloud.stream.annotation.Input;代码语言:txt复制import org.springframework.messaging.SubscribableChannel;代码语言:txt复制/**代码语言:txt复制 * Bindable interface with one input channel.
*
* @author Dave Syer
* @author Marius Bogoevici
* @see org.springframework.cloud.stream.annotation.EnableBinding
*/
public interface Sink {代码语言:txt复制 /**代码语言:txt复制 * Input channel name.
*/
String INPUT = "input";代码语言:txt复制 /**代码语言:txt复制 * @return input channel.
*/
@Input(Sink.INPUT)
SubscribableChannel input();代码语言:txt复制}而且spring cloud stream 也支持我们自定义message通道,所以我们可以通过根据自己的业务来制定不同的消息通道,以此来满足我们的业务需求,示列如下:
- 自定义消息生产通道接口
import org.springframework.cloud.stream.annotation.Output;代码语言:txt复制import org.springframework.messaging.MessageChannel;代码语言:txt复制public interface OutputChannel {代码语言:txt复制 // 普通消息生产通道 对应yml自定义节点名称代码语言:txt复制 String NORMAL_PRODUCER_CHANNEL = "customized_output_channel";代码语言:txt复制 @Output(NORMAL_PRODUCER_CHANNEL)代码语言:txt复制 MessageChannel NormalOutput();代码语言:txt复制}- 自定义消息订阅通道接口
import org.springframework.cloud.stream.annotation.Input;代码语言:txt复制import org.springframework.messaging.SubscribableChannel;代码语言:txt复制public interface InputChannel {代码语言:txt复制 // 普通消息订阅通道 对应yml自定义节点名称代码语言:txt复制 String NORMAL_CONSUMER_CHANNEL = "customized_input_channel";代码语言:txt复制 @Input(NORMAL_CONSUMER_CHANNEL)代码语言:txt复制 SubscribableChannel normalConsumerChannel();代码语言:txt复制}启动类启用
代码语言:txt复制import com.study.rocketmq.channel.InputChannel;代码语言:txt复制import com.study.rocketmq.channel.OutputChannel;代码语言:txt复制import lombok.extern.slf4j.Slf4j;代码语言:txt复制import org.springframework.boot.SpringApplication;代码语言:txt复制import org.springframework.boot.autoconfigure.SpringBootApplication;代码语言:txt复制import org.springframework.cloud.stream.annotation.EnableBinding;代码语言:txt复制@Slf4j代码语言:txt复制@EnableBinding({InputChannel.class, OutputChannel.class})代码语言:txt复制@SpringBootApplication代码语言:txt复制public class RocketApplication {代码语言:txt复制 public static void main(String[] args) {代码语言:txt复制 SpringApplication.run(RocketApplication.class, args);代码语言:txt复制 log.debug("==========rocketMq服务启动成功!==========");代码语言:txt复制 }代码语言:txt复制}环境配置和代码配置已经好了,下面后门开始写消息生产方法和消息订阅
- 普通消息发送
// controller代码语言:txt复制@RestController代码语言:txt复制@RequestMapping("/msg")代码语言:txt复制public class TestMsgController {代码语言:txt复制 @Autowired代码语言:txt复制 ProducerService producerService;代码语言:txt复制 @GetMapping("/sendMsg/{msg}")代码语言:txt复制 public String sendMsg(@PathVariable("msg")String msg){代码语言:txt复制 producerService.sendNormalMsg(msg, "test_consumer_tag", "testKey");代码语言:txt复制 return "SUCCESS";代码语言:txt复制 }代码语言:txt复制}代码语言:txt复制import com.study.rocketmq.channel.OutputChannel;代码语言:txt复制import lombok.extern.slf4j.Slf4j;代码语言:txt复制import org.apache.rocketmq.common.message.MessageConst;代码语言:txt复制import org.springframework.beans.factory.annotation.Autowired;代码语言:txt复制import org.springframework.messaging.Message;代码语言:txt复制import org.springframework.messaging.support.MessageBuilder;代码语言:txt复制import org.springframework.stereotype.Service;代码语言:txt复制@Slf4j代码语言:txt复制@Service代码语言:txt复制public class ProducerService {代码语言:txt复制 @Autowired代码语言:txt复制 private OutputChannel outputChannel;代码语言:txt复制 /**代码语言:txt复制 * 发送普通消息
* @param message 消息内容
* @param ConsumerTag 消费者group标识
* @param MsgKey 消息key
* @return
*/
public boolean sendNormalMsg(String message, String ConsumerTag, String MsgKey){
// 构建消息
Message<String> messageBuild = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_TAGS, ConsumerTag)
.setHeader(MessageConst.PROPERTY_KEYS, MsgKey)
.build();
// 发送消息
boolean sendResult = outputChannel.NormalOutput().send(messageBuild);
if (sendResult){
log.info("普通消息发送成功-ConsumerTag:{}-MsgKey:{}", ConsumerTag, MsgKey);
}else {
log.error("普通消息发送失败!:{}", ConsumerTag, MsgKey);
}
return sendResult;
}
}代码语言:txt复制import com.study.rocketmq.channel.InputChannel;代码语言:txt复制import lombok.extern.slf4j.Slf4j;代码语言:txt复制import org.springframework.cloud.stream.annotation.StreamListener;代码语言:txt复制import org.springframework.messaging.handler.annotation.Payload;代码语言:txt复制import org.springframework.stereotype.Component;代码语言:txt复制/**代码语言:txt复制 * @ClassName MessageListener
* @author lgq
* @description //消息监听
* @Date 2021/6/9
* @Version V1.0
*/
@Slf4j
@Component
public class MessageListener {
代码语言:txt复制 // 通过StreamListener监听消息 只允许rocketmq_KEYS = testKey接收代码语言:txt复制 @StreamListener(value = InputChannel.NORMAL_CONSUMER_CHANNEL, condition = "headers['rocketmq_KEYS'] == 'testKey'")代码语言:txt复制 public void receivePayMsg(@Payload String payResult) {代码语言:txt复制 log.debug("接收到普通消息:{}", payResult);代码语言:txt复制 }代码语言:txt复制}通过ApiPost工具请求,默认打印SUCCESS字符,观察控制台发现没有发送成功。打开控制台也没看到我们本地的客户端注册上了。
ApiPost请求
错误日志
阿里云rocketMq控制台
后来通过查询资料得知,可能阿里的rocketMq服务版本比较高,ons客户端版本已经到了4.8而spring-cloud-starter-stream-
rocketmq所使用的版本才4.4.0,所以我们排除掉它自带的依赖,引入最新的。
image.png
代码语言:txt复制<!-- rocketmq 依赖-->代码语言:txt复制 <dependency>代码语言:txt复制 <groupId>com.alibaba.cloud</groupId>代码语言:txt复制 <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>代码语言:txt复制 <!-- 排除自带rocketMq-client依赖【低版本消息无法发送成功】-->代码语言:txt复制 <exclusions>代码语言:txt复制 <exclusion>代码语言:txt复制 <groupId>org.apache.rocketmq</groupId>代码语言:txt复制 <artifactId>rocketmq-client</artifactId>代码语言:txt复制 </exclusion>代码语言:txt复制 <exclusion>代码语言:txt复制 <groupId>org.apache.rocketmq</groupId>代码语言:txt复制 <artifactId>rocketmq-acl</artifactId>代码语言:txt复制 </exclusion>代码语言:txt复制 </exclusions>代码语言:txt复制 </dependency>代码语言:txt复制 <!-- rocketMq -->代码语言:txt复制 <dependency>代码语言:txt复制 <groupId>org.apache.rocketmq</groupId>代码语言:txt复制 <artifactId>rocketmq-client</artifactId>代码语言:txt复制 <version>4.8.0</version>代码语言:txt复制 </dependency>代码语言:txt复制 <dependency>代码语言:txt复制 <groupId>org.apache.rocketmq</groupId>代码语言:txt复制 <artifactId>rocketmq-acl</artifactId>代码语言:txt复制 <version>4.8.0</version>代码语言:txt复制 </dependency>然后重新启动,刷新阿里控制台,发现已经注册上了
阿里云rocketmq控制台
尝试重新发送消息
发送成功
- 2. 自定义延时/定时消息
* **YML** 添加如下配置代码语言:txt复制 bindings:代码语言:txt复制 # 阿里rocketMq binder 生产者配置代码语言:txt复制 ### 延时消息生产 producer.sync 属性需设置为true代码语言:txt复制 delay_output_channel: {producer.group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, producer.sync: true}代码语言:txt复制 # 阿里rocketMq binder 消费者配置代码语言:txt复制 ### 延时消息订阅代码语言:txt复制 delay_input_channel: {consumer.tags: test_delay_tag}代码语言:txt复制 bindings:代码语言:txt复制 # spring cloud stream binder 生产者配置代码语言:txt复制 ### 延时消息代码语言:txt复制 delay_output_channel: {destination: MQ_INST_XXXX_XXX%common-delay-topic, content-type: application/json}代码语言:txt复制 # spring cloud stream binder 消费者配置代码语言:txt复制 ### 延时消息订阅代码语言:txt复制 delay_input_channel: {destination: MQ_INST_XXXX_XXX%common-delay-topic, group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, content-type: application/json}代码语言:txt复制* **InputChannel** 接口添加如下方法代码语言:txt复制 // 延时/定时消息订阅通道 对应yml自定义节点名称代码语言:txt复制 String DELAY_CONSUMER_CHANNEL = "delay_input_channel";代码语言:txt复制 // 延时/定时消息订阅代码语言:txt复制 @Input(DELAY_CONSUMER_CHANNEL)代码语言:txt复制 SubscribableChannel delayConsumerChannel();代码语言:txt复制* **OutputChannel** 接口添加如下方法代码语言:txt复制 // 延时或定时消息生产通道代码语言:txt复制 String DELAY_PRODUCER_CHANNEL = "delay_output_channel";代码语言:txt复制 @Output(DELAY_PRODUCER_CHANNEL)代码语言:txt复制 MessageChannel delayOutput();代码语言:txt复制* **ProducerService** 添加如下方法代码语言:txt复制 /**代码语言:txt复制 * 延时消息发送
* @param message 延时消息体
* @param delayLevel 延时级别 1~18 (1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 【 1=1s 2=5s 3=10s】)
* @param ConsumerTag 消费者TAG标识 通过TAG区分消费对象
* @param MsgKey 消息key 可以通过该字段再次区分
* @return
*/
public boolean sendDelayMsg(String message, int delayLevel, String ConsumerTag, String MsgKey){
// 构建消息
Message<String> messageBuild = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_TAGS, ConsumerTag)
.setHeader(MessageConst.PROPERTY_KEYS, MsgKey)
.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayLevel)
.build();
// 发送消息
boolean sendResult = outputChannel.delayOutput().send(messageBuild);
if (sendResult){
log.info("延时消息发送成功-ConsumerTag:{}-MsgKey:{}", ConsumerTag, MsgKey);
}else {
log.error("延时消息发送失败!:{}", ConsumerTag, MsgKey);
}
return sendResult;
}代码语言:txt复制 /**代码语言:txt复制 * https://help.aliyun.com/document_detail/43349.html
* rocketMq 指定时间消息发送
* @param message 消息内容
* @param ConsumerTag 消费者group标识
* @param MsgKey 消息key
* @param fixedTime 指定时间戳 指定时间戳必须大于当前时间 否则立即消费 参数可设置40天内的任何时刻(单位毫秒),超过40天消息发送将失败
* @return
*/
public boolean sendFixedTimeMsg(String message, String ConsumerTag, String MsgKey, long fixedTime){
// 构建消息 __STARTDELIVERTIME 为发送定时任务需要的请求头
Message<String> messageBuild = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_TAGS, ConsumerTag)
.setHeader(MessageConst.PROPERTY_KEYS, MsgKey)
.setHeader("__STARTDELIVERTIME", fixedTime)
.build();
// 发送消息
boolean sendResult = outputChannel.delayOutput().send(messageBuild);
if (sendResult){
log.info("定时消息发送成功-ConsumerTag:{}-MsgKey:{}", ConsumerTag, MsgKey);
}else {
log.error("定时消息发送失败!:{}", ConsumerTag, MsgKey);
}
return sendResult;
}代码语言:txt复制* **MessageListener** 监听器添加如下监听代码语言:txt复制 // 监听定时/延时消息通道,只允许key = delayMsg 通过代码语言:txt复制 @StreamListener(value = InputChannel.DELAY_CONSUMER_CHANNEL, condition = "headers['rocketmq_KEYS'] == 'delayMsg'")代码语言:txt复制 public void receiveDelayMsg(@Payload String payResult) {代码语言:txt复制 log.debug("接收到延时消息:{}", payResult);代码语言:txt复制 }代码语言:txt复制 // 监听定时/延时消息通道,只允许key = fixTimeMsg通过代码语言:txt复制 @StreamListener(value = InputChannel.DELAY_CONSUMER_CHANNEL, condition = "headers['rocketmq_KEYS'] == 'fixTimeMsg'")代码语言:txt复制 public void receivefixTimeMsg(@Payload String payResult) {代码语言:txt复制 log.debug("接收到定时消息:{}", payResult);代码语言:txt复制 }代码语言:txt复制发送延时任务 级别定义为2【对应5s】 消息tag:test_delay_tag; 消息key:delayMsg; 消息体:延时5s;
@GetMapping("/sendMsg/{msg}")代码语言:txt复制 public String sendMsg(@PathVariable("msg")String msg){代码语言:txt复制 producerService.sendDelayMsg(msg, 2,"test_delay_tag", "delayMsg");代码语言:txt复制 return "SUCCESS";代码语言:txt复制 }代码语言:txt复制发送结果:
发送成功
发送定时任务 消息tag:test_delay_tag; 消息key:fixTimeMsg; 消息体:延时5s;指定18:18:00消费消息
@GetMapping("/sendMsg/{msg}")代码语言:txt复制 public String sendMsg(@PathVariable("msg")String msg) throws ParseException {代码语言:txt复制 SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");代码语言:txt复制 Calendar calendar = Calendar.getInstance();代码语言:txt复制 calendar.setTime(simpleDateFormat.parse("2021-06-09 18:18:00"));代码语言:txt复制 long time = calendar.getTime().getTime();代码语言:txt复制 producerService.sendFixedTimeMsg(msg, "test_delay_tag", "delayMsg", time);代码语言:txt复制 return "SUCCESS";代码语言:txt复制 }我们看到消息 是在18:18:01的时候消费的,重复实验里几次,发现偶尔会有误差但是差距不大【1s以内】,这也是能接受的,需要注意的是,
rocketMq定时参数可设置40天内的任何时刻(单位毫秒),超过40天消息发送将失败image.png
image.png
三. application.yml 完整配置
代码语言:txt复制spring:代码语言:txt复制 application:代码语言:txt复制 name: rocketmq-server代码语言:txt复制 cloud:代码语言:txt复制 stream:代码语言:txt复制 # 阿里rocketMq配置 topic 与 group 均以 实例id% 为前缀配置 如实例id为 MQ_INST_XXXX_XXX 则group或topic 配置 MQ_INST_XXXX_XXX%grouID代码语言:txt复制 rocketmq:代码语言:txt复制 binder:代码语言:txt复制 # 【若为阿里云购买服务,则为控制台的对外或对内实例地址】【若自己搭建的服务,为自定义rocketmq服务地址127.0.0.1:9876】代码语言:txt复制 name-server: http://MQ_INST_XXXX_XXX.mq-internet-access.mq-internet.aliyuncs.com:80代码语言:txt复制 # 阿里access-key 【购买阿里服务 控制台获取填写】代码语言:txt复制 access-key: LTAI4FwRvzLckUQ2xuFE4q6N代码语言:txt复制 # 阿里secret-key 【购买阿里服务 控制台获取填写】代码语言:txt复制 secret-key: 2RmSqPLLdE1lSOqBtjIrd21kGw0O12代码语言:txt复制 # 自定义轨迹信息存储TOPIC 默认为 RMQ_SYS_TRACE_TOPIC代码语言:txt复制 customized-trace-topic: rmq_sys_TRACE_DATA_cn-qingdao-publictest代码语言:txt复制 # rocketMq 自定义消息通道配置代码语言:txt复制 bindings:代码语言:txt复制 # 阿里rocketMq binder 生产者配置代码语言:txt复制 ### 延时消息生产 producer.sync 属性需设置为true代码语言:txt复制 delay_output_channel: {producer.group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, producer.sync: true}代码语言:txt复制 ### 普通生产消息代码语言:txt复制 customized_output_channel: {producer.group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV}代码语言:txt复制 # 阿里rocketMq binder 消费者配置代码语言:txt复制 ### 延时消息订阅代码语言:txt复制 delay_input_channel: {consumer.tags: test_delay_tag}代码语言:txt复制 ### 普通消息订阅代码语言:txt复制 customized_input_channel: {consumer.tags: test_consumer_tag}代码语言:txt复制 bindings:代码语言:txt复制 # spring cloud stream binder 生产者配置代码语言:txt复制 ### 延时消息代码语言:txt复制 delay_output_channel: {destination: MQ_INST_XXXX_XXX%common-delay-topic, content-type: application/json}代码语言:txt复制 ### 普通消息代码语言:txt复制 customized_output_channel: {destination: MQ_INST_XXXX_XXX%mg_common_topic, content-type: application/json}代码语言:txt复制 # spring cloud stream binder 消费者配置代码语言:txt复制 ### 延时消息订阅代码语言:txt复制 delay_input_channel: {destination: MQ_INST_XXXX_XXX%common-delay-topic, group: MQ_INST_XXXX_XXX%GID_AQUARIUS_DELAY, content-type: application/json}代码语言:txt复制 ### 普通消息订阅代码语言:txt复制 customized_input_channel: {destination: MQ_INST_XXXX_XXX%mg_common_topic, group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, content-type: application/json}代码语言:txt复制# 服务端口号代码语言:txt复制server:代码语言:txt复制 port: 8083代码语言:txt复制# slf4j日志配置代码语言:txt复制logging:代码语言:txt复制 level:代码语言:txt复制 root: info代码语言:txt复制 com.study: debug四.spring-alibaba-cloud-rocketmq 详细配置选项
官方文档(https://links.jianshu.com/go?to=https://github.com/alibaba/spring-
cloud-alibaba/blob/master/spring-cloud-alibaba-
docs/src/main/asciidoc-zh/rocketmq.adoc#rocketmq-consumer-
properties)
五.MQ消费轨迹异常
代码语言:txt复制关于阿里云控制台,消费消息轨迹显示未消费(或者其他),但确实已经消费了,可以升级rocketMq- client版本解决。之前我的版本是4.8.0,升级4.9.1后问题解决。
image.png
<!-- rocketMq -->代码语言:txt复制 <dependency>代码语言:txt复制 <groupId>org.apache.rocketmq</groupId>代码语言:txt复制 <artifactId>rocketmq-client</artifactId>代码语言:txt复制 <version>4.9.1</version>代码语言:txt复制 </dependency>代码语言:txt复制 <dependency>代码语言:txt复制 <groupId>org.apache.rocketmq</groupId>代码语言:txt复制 <artifactId>rocketmq-acl</artifactId>代码语言:txt复制 <version>4.9.1</version>代码语言:txt复制 </dependency>代码语言:txt复制 <dependency>代码语言:txt复制 <groupId>com.alibaba.cloud</groupId>代码语言:txt复制 <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>代码语言:txt复制 <!-- 排除自带rocketMq-client依赖【低版本消息无法发送成功】-->代码语言:txt复制 <exclusions>代码语言:txt复制 <exclusion>代码语言:txt复制 <groupId>org.apache.rocketmq</groupId>代码语言:txt复制 <artifactId>rocketmq-client</artifactId>代码语言:txt复制 </exclusion>代码语言:txt复制 <exclusion>代码语言:txt复制 <groupId>org.apache.rocketmq</groupId>代码语言:txt复制 <artifactId>rocketmq-acl</artifactId>代码语言:txt复制 </exclusion>代码语言:txt复制 </exclusions>代码语言:txt复制 </dependency>


