springboot整合rocketmq实现顺序消费

2021-12-06 10:02:41 浏览数 (1)

消息队列已然成为当下非常火热的中间件,而rocketmq作为阿里开源的中间件产品,历经数次超大并发的考验,已然成为中间件产品的首选。而有时候我们在使用消息队列的时候,往往需要能够保证消息的顺序消费,而rocketmq是可以支持消息的顺序消费的。rocketmq在发送消息的时候,是将消息发送到不同的队列(queue,也有人称之为分区)中,然后消费端从多个队列中读取消息进行消费,很明显,在这种全局模式下,是无法实现顺序消费的。为了实现顺序消费,我们需要把有顺序的消息按照他的顺序,将他们发送到同一个queue中,这样消费端在消费的时候,就保证了其顺序。但是顺序消费的性能肯定也相对差一些,因为只能使用一个队列。

好了,接下来我们使用springboot来看一下顺序消费是如何实现的。 官网上给出了一个顺序消费的案例,但是都是通过main方法的形式演示的(http://rocketmq.apache.org/docs/order-example/)。

一. 添加依赖:

代码语言:javascript复制
<dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-spring-boot-starter</artifactId>
      <version>2.1.0</version>
</dependency>

二. 配置rocketmq地址(需要先自己搭建好rocketmq服务)

在application.yml中配置:

代码语言:javascript复制
rocketmq:
  name-server: 192.168.1.11:9876;192.168.1.12:9876;192.168.1.13:9876
  producer:
    group: my-group1
    sendMessageTimeout: 300000

这里我使用的rocketmq的集群,如果是单机版,name-server只写一个地址即可

三. 一个简单的生产消费案例:

我们使用controller 来下一个生产者,这样当我通过浏览器发起请求是,就调用生产者来生产一条消息,同时写一个消费者,来监听对应的消息,实现消费

生产者代码:

代码语言:javascript复制
@RestController
@RequestMapping("/mq")
@Slf4j
public class ProducerController {

    @Resource
    private RocketMQTemplate rocketMQTemplate;


    @RequestMapping("/sync/send1")
    public String syncSendString(){
        //发送一个同步 消息,会返回值 ---发送到 stringTopic主题
        SendResult sendResult = rocketMQTemplate.syncSend("topicTest", "Hello, World!");
        System.out.printf("syncSend1 to topic %s sendResult=%s %n", stringTopic, sendResult);
        //consumer result:------- StringConsumerNewNS received: Hello, World!
        return sendResult.toString();
    }


}

上面的案例,就是我向"topicTest" 的主题中发送一个 Hello,World 的字符串

消费者代码:

代码语言:javascript复制
/**
 * RocketMQMessageListener
 */
@Service
@RocketMQMessageListener(nameServer = "${rocketmq.nameserver}", topic = "topicTest", consumerGroup = "string_consumer")
public class StringConsumerNewNS implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.printf("------- StringConsumerNewNS received: %s n", message);
    }
}

这个消费者需要绑定nameserver, 然后监听 topicTest 这个主题。要注意,要把这个消费者放到能够被spring容器扫描到的地方。

接下来启动服务: 在浏览器访问: localhost:{your port}/mq/sync/send1 此时就会生产出一条消息,观察控制台,就会看到Hello, World打印出来,代表消息消费成功

四. 实现顺序消费

生产者: 此时要生产多条消息,方便观察顺序,我们依然写一个controller

代码语言:javascript复制
/**************验证rocketmq顺序消费***************/
    @RequestMapping("/send/ordered")
    public String sendOrderedMsg(){
        /**
         * hashKey: 为了保证报到同一个队列中,将消息发送到orderTopic主题上
         */
        rocketMQTemplate.syncSendOrderly("orderTopic","no1","order");
        rocketMQTemplate.syncSendOrderly("orderTopic","no2","order");
        rocketMQTemplate.syncSendOrderly("orderTopic","no3","order");
        rocketMQTemplate.syncSendOrderly("orderTopic","no4","order");
        return "success";
    }

这里要注意,我是向 orderTopic主题发送4条消息,内容分别是 no1 no2 no3 no4. 第三个参数是order ,他的作用是会根据他的hash值计算发送到哪一个队列,我用的是同一个值order,那么他们的hash一样就可以保证发送到同一个队列里

消费者。要注意,消费者在消费的时候,默认是异步多线程消费的,所以无法保证顺序,我们要指定同步消费才行;先看代码:

代码语言:javascript复制
/**
 * 监听顺序消息,保证顺序缴费
 */
@Component
@Slf4j
@RocketMQMessageListener(topic = "orderTopic", consumerGroup = "ordered-consumer",consumeMode = ConsumeMode.ORDERLY)

public class OrderedMsqConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("consumer 顺序消费,收到消息{}",message);
    }
}

这里边指定了 consumeMode = ConsumeMode.ORDERLY, 默认值是 consumeMode = ConsumeMode.CONCURRENT

修改完毕后,启动项目;

浏览器访问:http://localhost:8888/mq/send/ordered

观察控制台日志,顺序打印: no1 no2 no3 no4

好了实现了顺序消费;相关源码已上传至github: https://github.com/lsqingfeng/action/ (springboot分支)欢迎大家关注交流

0 人点赞