前言
之前安装好了RocketMQ
,这一篇就简单记录一下Spring boot
是怎么集成RocketMQ
的,如果有需要安装RocketMQ
的同学看这一篇,Linux在线安装RocketMQ,如果没有linux环境的同学也可以本地启动,只需要有java
环境即可。
集成RocketMQ
如果没有项目先创建一个spring boot项目
引入依赖
引入RocketMQ的依赖
代码语言:javascript复制<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
配置yml文件
导入依赖后需要在项目的yml配置文件中加入RocketMQ的相关配置,配置内容如下:
代码语言:javascript复制server:
port: 8181
spring:
application:
name: rocketmq-producer
rocketmq:
name-server: 111.xxx.xxx.xxx:9876
producer:
group: test-grop
前两项都无需关注,后面rocketmq
内的配置需要注意一下,主要是name-server
配置rocketmq
的外网服务的ip跟端口,分组就写个测试分组。
还有其他的配置都已经有了默认值,默认值可以去类中看,根据实际需要进行自定义配置。
创建生产者
生产者发送消息一般都是通过rocketMQTemplate
来发送消息,原理是在引入starter
包后会有自动配置类RocketMQAutoConfiguration
,里面定义了几个bean
- defaultMQProducer
- defaultLitePullConsumer
- rocketMQTemplate
如果不重写覆盖bean会默认使用这些bean的内容。如果没有建立topic
可以手动执行命令创建topic
。
./mqadmin updateTopic -n localhost:9876 -t test_topic
写一个controller类,来进行生产者发送消息的模拟。
代码语言:javascript复制@RestController
public class RocketController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送同步消息
*/
@GetMapping("/rocket")
public void rocket() {
SendResult sendResult = rocketMQTemplate.syncSend("test_topic", "哈喽啊");
System.out.println("同步内容" sendResult.toString());
}
/**
* 发送过滤消息
*/
@GetMapping("/rocket/tag2")
public void rocketTag1() {
rocketMQTemplate.convertAndSend("test_topic:tag1", "哈喽啊,只有Tag1的消费者可以接受");
}
/**
* 异步消息
*/
@GetMapping("/rocket/tag1")
public void rocketTag2() {
rocketMQTemplate.asyncSend("test-topic", "哈喽啊,这是一条异步消息", new
SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
}
}
发送消息内容可以选择同步发送,异步发送,单向发送,发送过滤消息等发送方式
创建消费者
消费者的消费策略可以在RocketMQMessageListener
注解上可以进行配置,主要是 topic、consumerGroup、selectorExpression 这三个参数。
@RocketMQMessageListener(
topic = "test_topic",
consumerGroup = "consumer-group-test",
selectorExpression = "tag1 || tag2")
@Component
public class TestConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("receive message: " msg);
}
}
分析一下参数内容
- topic 这个是必须指定的,否则没有消息来源。
- consumerGroup 是消费者组,这个必须制定。一条消息只能被同一个消费者组里的一个消费者消费。
- selectorExpression 是用于消息过滤的,我们在生产的时候定义了tag内容,消费者可以指定消费某些tag的消息,具体策略如下:
- 默认为 “*”,表示不过滤,消费此 topic 下所有消息
- 配置为 “tagA”,表示只消费此 topic 下 TAG = tagA 的消息
- 配置为 “tagA || tagB”,表示消费此 topic 下 TAG = tagA 或 TAG = tagB 的消息,以此类推
具体的其他配置项也可以去RocketMQMessageListener源码中查看。
验证
在生产者中定义了三种消息的发生方式,其中一个过滤发送方式,一个同步发送消息一个同步发送消息,都执行一下。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IM7okXLX-1666347780897)(https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/73cc518423fc47b99356b418273839a7~tplv-k3u1fbpfcp-watermark.image?)]
日志的第一条是同步消息发送产生的确认回执消息,第二条是异步消息产生的,第三天则是消费者产生的,因为消费者筛选了tag,所以只有过滤消息发送的tag1或者tag2的消息消费者才消费。