1、创建springboot项目,略;
2、POM.XML配置文件
代码语言:javascript复制 <dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
3、java代码,实现并发处理
代码语言:javascript复制/**
* @Author: Liu Yue
* @Descripition:
* @Date; Create in 2020/10/30 17:08
**/
@Component
@Configuration
@Slf4j
@Data
public class BaseMsgConsumer {
private DefaultMQPushConsumer consumer;
private static final String CONSUMER_GROUP = "base_group";
private final static String NAMESRV_ADDR = "192.168.27.16:9876";
private final static String TOPIC = "base_topic";
private final static String TAGS = "base_tags";
public void consumer_3() throws MQClientException {
consumer = new DefaultMQPushConsumer(CONSUMER_GROUP "_syncMsg");
consumer.setNamesrvAddr(NAMESRV_ADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(TOPIC, TAGS);
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List< MessageExt > msgs, ConsumeConcurrentlyContext context) {
log.info(Thread.currentThread().getName()
" Receive New Messages: " msgs.size());
try {
for (MessageExt msg : msgs) {
// 业务实现
log.info(Thread.currentThread().getName() "t" "组1消费" msg.getKeys() new String(msg.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.info("==========RECONSUME_LATER===========");
log.error(e.getMessage(),e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
consumer.start();
System.out.println("Consumer1 Started.");
}
4、实现排序处理
代码语言:javascript复制public void consumer() throws MQClientException {
log.info("初始化成功");
consumer = new DefaultMQPushConsumer(CONSUMER_GROUP "_syncMsg");
consumer.setNamesrvAddr(NAMESRV_ADDR);
consumer.subscribe(TOPIC, TAGS);
// 分散消费 MessageModel.CLUSTERING 同一个 Consumer ID 所标识的所有 Consumer 分散消费消息。
// 广播消费 MessageModel.BROADCASTING 同一个 Consumer ID 所标识的所有 Consumer 都会各自消费某条消息一次。
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 并发消费模式 (MessageListenerConcurrently)
// 有序消费模式 (MessageListenerOrderly)
consumer.registerMessageListener((MessageListenerOrderly) (list, consumeConcurrentlyContext) -> {
System.out.println(Thread.currentThread().getName()
" Receive New Messages: " list.size());
synchronized (hkEquipmentAnalyselist) {
list.forEach(msg -> {
// 业务实现
log.info(Thread.currentThread().getName() "t" msg.getKeys() "组1消费:" new String(msg.getBody()));
});
}
return ConsumeOrderlyStatus.SUCCESS;
});
consumer.start();
}
RocketMQ消息处理有两种,一种是多线程的并发模式,使用MessageListenerConcurrently;一种是有序消费模式,使用MessageListenerOrderly。根据系统的特别选择。
每天提高一点点!!!