Springboot2整合RocketMQ消费端普通开发

2020-11-25 11:55:07 浏览数 (1)

1、创建springboot项目,略;

2、POM.XML配置文件

代码语言:javascript复制
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>

3、java代码,实现并发处理

代码语言:javascript复制
/**
 * @Author: Liu Yue
 * @Descripition:
 * @Date; Create in 2020/10/30 17:08
 **/
@Component
@Configuration
@Slf4j
@Data
public class BaseMsgConsumer {



    private DefaultMQPushConsumer consumer;
    private static final String CONSUMER_GROUP = "base_group";
    private final static String NAMESRV_ADDR = "192.168.27.16:9876";
    private final static String TOPIC = "base_topic";
    private final static String TAGS = "base_tags";


    public void consumer_3() throws MQClientException {
        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP   "_syncMsg");
        consumer.setNamesrvAddr(NAMESRV_ADDR);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe(TOPIC, TAGS);

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(
                    List< MessageExt > msgs, ConsumeConcurrentlyContext context) {

                log.info(Thread.currentThread().getName()
                          " Receive New Messages: "   msgs.size());
                try {
                    for (MessageExt msg : msgs) {
                        // 业务实现
                        log.info(Thread.currentThread().getName()   "t"   "组1消费"   msg.getKeys()   new String(msg.getBody()));

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    log.info("==========RECONSUME_LATER===========");
                    log.error(e.getMessage(),e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });

        consumer.start();

        System.out.println("Consumer1 Started.");

    }

4、实现排序处理

代码语言:javascript复制
public void consumer() throws MQClientException {
        log.info("初始化成功");
        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP   "_syncMsg");
        consumer.setNamesrvAddr(NAMESRV_ADDR);
        consumer.subscribe(TOPIC, TAGS);
        // 分散消费 MessageModel.CLUSTERING 同一个 Consumer ID 所标识的所有 Consumer 分散消费消息。
        // 广播消费 MessageModel.BROADCASTING 同一个 Consumer ID 所标识的所有 Consumer 都会各自消费某条消息一次。
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        // 并发消费模式 (MessageListenerConcurrently)
        // 有序消费模式 (MessageListenerOrderly)
        consumer.registerMessageListener((MessageListenerOrderly) (list, consumeConcurrentlyContext) -> {
            System.out.println(Thread.currentThread().getName()
                      " Receive New Messages: "   list.size());
            synchronized (hkEquipmentAnalyselist) {
                list.forEach(msg -> {
                    // 业务实现
                    log.info(Thread.currentThread().getName()   "t"   msg.getKeys()   "组1消费:"   new String(msg.getBody()));
                    
                });
            }
            return ConsumeOrderlyStatus.SUCCESS;
        });
        consumer.start();
    }

RocketMQ消息处理有两种,一种是多线程的并发模式,使用MessageListenerConcurrently;一种是有序消费模式,使用MessageListenerOrderly。根据系统的特别选择。

每天提高一点点!!!

0 人点赞