消息队列已然成为当下非常火热的中间件,而rocketmq作为阿里开源的中间件产品,历经数次超大并发的考验,已然成为中间件产品的首选。而有时候我们在使用消息队列的时候,往往需要能够保证消息的顺序消费,而rocketmq是可以支持消息的顺序消费的。rocketmq在发送消息的时候,是将消息发送到不同的队列(queue,也有人称之为分区)中,然后消费端从多个队列中读取消息进行消费,很明显,在这种全局模式下,是无法实现顺序消费的。为了实现顺序消费,我们需要把有顺序的消息按照他的顺序,将他们发送到同一个queue中,这样消费端在消费的时候,就保证了其顺序。但是顺序消费的性能肯定也相对差一些,因为只能使用一个队列。
好了,接下来我们使用springboot来看一下顺序消费是如何实现的。 官网上给出了一个顺序消费的案例,但是都是通过main方法的形式演示的(http://rocketmq.apache.org/docs/order-example/)。
一. 添加依赖:
代码语言:javascript复制<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
二. 配置rocketmq地址(需要先自己搭建好rocketmq服务)
在application.yml中配置:
代码语言:javascript复制rocketmq:
name-server: 192.168.1.11:9876;192.168.1.12:9876;192.168.1.13:9876
producer:
group: my-group1
sendMessageTimeout: 300000
这里我使用的rocketmq的集群,如果是单机版,name-server只写一个地址即可
三. 一个简单的生产消费案例:
我们使用controller 来下一个生产者,这样当我通过浏览器发起请求是,就调用生产者来生产一条消息,同时写一个消费者,来监听对应的消息,实现消费
生产者代码:
代码语言:javascript复制@RestController
@RequestMapping("/mq")
@Slf4j
public class ProducerController {
@Resource
private RocketMQTemplate rocketMQTemplate;
@RequestMapping("/sync/send1")
public String syncSendString(){
//发送一个同步 消息,会返回值 ---发送到 stringTopic主题
SendResult sendResult = rocketMQTemplate.syncSend("topicTest", "Hello, World!");
System.out.printf("syncSend1 to topic %s sendResult=%s %n", stringTopic, sendResult);
//consumer result:------- StringConsumerNewNS received: Hello, World!
return sendResult.toString();
}
}
上面的案例,就是我向"topicTest" 的主题中发送一个 Hello,World 的字符串
消费者代码:
代码语言:javascript复制/**
* RocketMQMessageListener
*/
@Service
@RocketMQMessageListener(nameServer = "${rocketmq.nameserver}", topic = "topicTest", consumerGroup = "string_consumer")
public class StringConsumerNewNS implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("------- StringConsumerNewNS received: %s n", message);
}
}
这个消费者需要绑定nameserver, 然后监听 topicTest 这个主题。要注意,要把这个消费者放到能够被spring容器扫描到的地方。
接下来启动服务: 在浏览器访问: localhost:{your port}/mq/sync/send1 此时就会生产出一条消息,观察控制台,就会看到Hello, World打印出来,代表消息消费成功
四. 实现顺序消费
生产者: 此时要生产多条消息,方便观察顺序,我们依然写一个controller
代码语言:javascript复制/**************验证rocketmq顺序消费***************/
@RequestMapping("/send/ordered")
public String sendOrderedMsg(){
/**
* hashKey: 为了保证报到同一个队列中,将消息发送到orderTopic主题上
*/
rocketMQTemplate.syncSendOrderly("orderTopic","no1","order");
rocketMQTemplate.syncSendOrderly("orderTopic","no2","order");
rocketMQTemplate.syncSendOrderly("orderTopic","no3","order");
rocketMQTemplate.syncSendOrderly("orderTopic","no4","order");
return "success";
}
这里要注意,我是向 orderTopic主题发送4条消息,内容分别是 no1 no2 no3 no4. 第三个参数是order ,他的作用是会根据他的hash值计算发送到哪一个队列,我用的是同一个值order,那么他们的hash一样就可以保证发送到同一个队列里
消费者。要注意,消费者在消费的时候,默认是异步多线程消费的,所以无法保证顺序,我们要指定同步消费才行;先看代码:
代码语言:javascript复制/**
* 监听顺序消息,保证顺序缴费
*/
@Component
@Slf4j
@RocketMQMessageListener(topic = "orderTopic", consumerGroup = "ordered-consumer",consumeMode = ConsumeMode.ORDERLY)
public class OrderedMsqConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("consumer 顺序消费,收到消息{}",message);
}
}
这里边指定了 consumeMode = ConsumeMode.ORDERLY, 默认值是 consumeMode = ConsumeMode.CONCURRENT
修改完毕后,启动项目;
浏览器访问:http://localhost:8888/mq/send/ordered
观察控制台日志,顺序打印: no1 no2 no3 no4
好了实现了顺序消费;相关源码已上传至github: https://github.com/lsqingfeng/action/ (springboot分支)欢迎大家关注交流