Spring boot集成RocketMQ

2022-11-02 16:42:34 浏览数 (1)

前言

之前安装好了RocketMQ,这一篇就简单记录一下Spring boot是怎么集成RocketMQ的,如果有需要安装RocketMQ的同学看这一篇,Linux在线安装RocketMQ,如果没有linux环境的同学也可以本地启动,只需要有java环境即可。

集成RocketMQ

如果没有项目先创建一个spring boot项目

引入依赖

引入RocketMQ的依赖

代码语言:javascript复制
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>

配置yml文件

导入依赖后需要在项目的yml配置文件中加入RocketMQ的相关配置,配置内容如下:

代码语言:javascript复制
server:
  port: 8181
spring:
  application:
    name: rocketmq-producer
rocketmq:
  name-server: 111.xxx.xxx.xxx:9876
  producer:
    group: test-grop

前两项都无需关注,后面rocketmq内的配置需要注意一下,主要是name-server配置rocketmq的外网服务的ip跟端口,分组就写个测试分组。

还有其他的配置都已经有了默认值,默认值可以去类中看,根据实际需要进行自定义配置。

创建生产者

生产者发送消息一般都是通过rocketMQTemplate来发送消息,原理是在引入starter包后会有自动配置类RocketMQAutoConfiguration,里面定义了几个bean

  • defaultMQProducer
  • defaultLitePullConsumer
  • rocketMQTemplate

如果不重写覆盖bean会默认使用这些bean的内容。如果没有建立topic可以手动执行命令创建topic

代码语言:javascript复制
 ./mqadmin updateTopic -n localhost:9876  -t test_topic

写一个controller类,来进行生产者发送消息的模拟。

代码语言:javascript复制
@RestController
public class RocketController {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送同步消息
     */
    @GetMapping("/rocket")
    public void rocket() {
        SendResult sendResult = rocketMQTemplate.syncSend("test_topic", "哈喽啊");
        System.out.println("同步内容" sendResult.toString());
    }

    /**
     * 发送过滤消息
     */
    @GetMapping("/rocket/tag2")
    public void rocketTag1() {
        rocketMQTemplate.convertAndSend("test_topic:tag1", "哈喽啊,只有Tag1的消费者可以接受");
    }

    /**
     * 异步消息
     */
    @GetMapping("/rocket/tag1")
    public void rocketTag2() {
        rocketMQTemplate.asyncSend("test-topic", "哈喽啊,这是一条异步消息", new
                SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println(sendResult);
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        System.out.println(throwable);
                    }
                });
    }


}

发送消息内容可以选择同步发送异步发送单向发送发送过滤消息等发送方式

创建消费者

消费者的消费策略可以在RocketMQMessageListener注解上可以进行配置,主要是 topicconsumerGroupselectorExpression 这三个参数。

代码语言:javascript复制
@RocketMQMessageListener(
        topic = "test_topic",
        consumerGroup = "consumer-group-test",
        selectorExpression = "tag1 || tag2")
@Component
public class TestConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        System.out.println("receive message: "   msg);
    }

}

分析一下参数内容

  • topic 这个是必须指定的,否则没有消息来源。
  • consumerGroup 是消费者组,这个必须制定。一条消息只能被同一个消费者组里的一个消费者消费。
  • selectorExpression 是用于消息过滤的,我们在生产的时候定义了tag内容,消费者可以指定消费某些tag的消息,具体策略如下:
    • 默认为 “*”,表示不过滤,消费此 topic 下所有消息
    • 配置为 “tagA”,表示只消费此 topic 下 TAG = tagA 的消息
    • 配置为 “tagA || tagB”,表示消费此 topic 下 TAG = tagA 或 TAG = tagB 的消息,以此类推

具体的其他配置项也可以去RocketMQMessageListener源码中查看。

验证

在生产者中定义了三种消息的发生方式,其中一个过滤发送方式,一个同步发送消息一个同步发送消息,都执行一下。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IM7okXLX-1666347780897)(https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/73cc518423fc47b99356b418273839a7~tplv-k3u1fbpfcp-watermark.image?)]

日志的第一条是同步消息发送产生的确认回执消息,第二条是异步消息产生的,第三天则是消费者产生的,因为消费者筛选了tag,所以只有过滤消息发送的tag1或者tag2的消息消费者才消费。

0 人点赞