SpringBoot优雅整合RocketMQ

2022-11-03 14:12:06 浏览数 (1)

SpringBoot优雅整合RocketMQ

本篇文章默认你已经有RocketMQ的基础:

  • Producer启动过程,消息发送过程
  • Consumer启动过程,消息拉取消息消费过程
  • NameServer,Broker,Topic,Queue等相关概念

本篇内容默认你已经有SpringBoot的基础:

  • @Component ,@Service
  • @PostConstruct
  • @PreDestory
  • ApplicationEventPublisher

具备模板方法设计模式的概念

你只有简单具备上述知识,就可以继续往下阅读文章

Step1 : Maven引入相关依赖

代码语言:javascript复制
<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>fastjson</artifactId>
  <version>1.2.47</version>
</dependency>

<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-client</artifactId>
   <version>4.1.0-incubating</version>
</dependency>

引入fastjson及rocketmq-client依赖,这两个都是必须的。版本号根据自己实际需求可更改

Step2:生产者

思想:利用@Compoent注解让生产者实例受Spring容器管理,并且利用@PostConstruct实现生产者启动以及@PreDestory实现生产者关闭 注意事项:

  1. 下面的生产者,会伴随SpringBoot启动时调用start()启动生产者,在开发者需要使用的时候利用SpringIoc依赖注入即可。在项目运行过程中,可以随时调用shutdown()方法以关闭生产者。
  2. 如果你认为,生产者长时间闲置不好,亦可以根据自己的需求,变更逻辑。可以是每次由Spring容器返回一个新的实例,但是要记得,你这样做,每次使用完都要手动shutdown()
  3. 实际开发中,应该用Log日志方式代替System.out.println()输出
代码语言:javascript复制
/**
 * 长连接producer抽象
 */
public abstract class AbstractMqProducer {
    protected DefaultMQProducer producer;

	@PostConstruct
    public abstract void start() throws MQClientException;

    @PreDestroy
    public void shutdown() {
        System.err.println("AbstractMqProducer @PreDestroy调用");
        producer.shutdown();
    }
}

生产者的抽象类,定义了start()和shutdown()方法。其中start()方法需要开发者进行重写。开发者需要在start()方法中,为producer进行初始化和启动工作。

代码语言:javascript复制
/**
 * 生产者示例1
 *
 * 利用SpringBoot的特性,首先将其注解Component,让Spring容器接管这个实例
 * 利用PostConstruct来让实例化后的Bean进行后置处理
 */
@Component
public class TestProducer1 extends AbstractMqProducer{

    @Value("${dy.rocketmq.producer.producerGroup}")
    private String producerGroup;

    @Value("${dy.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @Value("${dy.rocketmq.producer.instanceName}")
    private String instanceName;

   
    @Override
    public void start() throws MQClientException {
        if (null == producer) {
            producer = new DefaultMQProducer(producerGroup);
            producer.setNamesrvAddr(namesrvAddr);
            producer.setInstanceName(instanceName);
        }
        producer.start();
        System.out.println(namesrvAddr);
        System.err.println("rocketmq producer is starting...");
    }

    public boolean send(String topic, String tag, String key, TestMqMessageDto msg) {
        try {
            Message message = new Message(
                    topic, tag, key,
                    JSONObject.toJSONString(msg).getBytes("utf-8")
            );

            SendResult sendResult = producer.send(message);
            System.err.println("消息生产结果:"   sendResult);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
}
重写start()方法,@Value注解大家应该不陌生,他会从application.yml中获取对应的属性值,此处你也可以直接手动设置值。
为你的producer定义send()方法,可根据你的需求写多个不同的send方法以达到生产消息的目的
如果你想每一次消息发送的时候才启动producer,并且在发送成功后shutdown(),那么你可以修改AbstractProducer类的逻辑,并且在子类send()中,每次都调用start(),然后在发送结束后shutdown()
代码语言:javascript复制
// 使用方式:在Spring接管的Bean中,直接使用@Autowired来获取producer实例

@Autowired
private TestProducer1 testProducer1;

{
	// 直接使用 发送消息
	testProducer1.send("topic", "tag", "key", 内容);
}

Step3: 消费者

代码语言:javascript复制
/**
 * MQ消费者抽象类
 *
 * 定义消费消息的逻辑
 */
public abstract class AbstractMqConsumer {

    protected DefaultMQPushConsumer consumer;
    // 是否允许顺序消费
    protected boolean isOrderConsumer = false;

    @Autowired
    private ApplicationEventPublisher publisher;

    /**
     * 初始化consumer,由开发者控制
     *
     * 例如:
     * try {
     *      consumer = new DefaultMQPushConsumer(consumerGroup);
     *      consumer.setNamesrvAddr(namesrvAddr);
     *      consumer.setMessageModel(MessageModel.CLUSTERING);
     *      consumer.setConsumeMessageBatchMaxSize(1);
     *      consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
     *      consumer.subscribe("TopicTest", "*");
     *  } catch (MQClientException e) {
     *      e.printStackTrace();
     *  }
     */
    abstract void start0();

    @PostConstruct
    private void start() {
        if (null == consumer) {
            start0();
        }

        if (isOrderConsumer) {
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
                    try {
                        consumeOrderlyContext.setAutoCommit(true);
                        if (null == msgs || msgs.size() == 0) {
                            return ConsumeOrderlyStatus.SUCCESS;
                        }
                        publisher.publishEvent(new MqMessageEvent(consumer, msgs));
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
        }
        else {
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    try {
                        if (null == msgs || msgs.size() == 0) {
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                        publisher.publishEvent(new MqMessageEvent(consumer, msgs));
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
        }

        new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    Thread.sleep(5000L);

                    consumer.start();
                    System.err.println("rocketmq consumer server is starting...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    @PreDestroy
    public void shutdown() {
        consumer.shutdown();
    }
}

抽象了消费者,所有消费者继承这个类,然后实现start0()方法。

start0方法:由开发者去编写,目的是初始化consumer,但无需调用consumer.start();

start()方法: 该方法定义了ApplicationEventPublisher发布消息事件的逻辑,他会根据你的consumer类型(顺序消费,并发消费)来注册不同的MessageListener

PS:

  1. 如果你不懂ApplicationEventPublisher,请自行百度。大概来说,它是Spring实现的一种“观察者模式”。由ApplicationEventPublisher.publish()来通知对应的订阅者处理事件。
代码语言:javascript复制
/**
 * 消费者1示例
 */
@Component
public class TestConsumer1 extends AbstractMqConsumer {

    @Value("${dy.rocketmq.consumer.consumerGroup}")
    private String consumerGroup;

    @Value("${dy.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @Override
    void start0() {
        try {
            consumer = new DefaultMQPushConsumer(consumerGroup);
            // 设置namesrv地址
            consumer.setNamesrvAddr(namesrvAddr);
            // 设置集群消费
            consumer.setMessageModel(MessageModel.CLUSTERING);
            // 设置每次消费消息的数量,官方一般建议1条,除非你有批量处理的需求
            consumer.setConsumeMessageBatchMaxSize(1);
            // 设置消费策略
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            // 设置订阅的topic和tags
            consumer.subscribe("TopicTest", "*");
            // ... 根据自己的需求设置consumer其他参数
            
        } catch (MQClientException e) {
            e.printStackTrace();
        }

    }
}
代码语言:javascript复制
/**
 * 监听rocketMQ消费消息的spring event
 */
public class MqMessageEvent extends ApplicationEvent {
    private DefaultMQPushConsumer consumer;
    private List<MessageExt> msgs;

    public MqMessageEvent(DefaultMQPushConsumer consumer, List<MessageExt> msgs) {
        super(msgs);
        this.consumer = consumer;
        this.msgs = msgs;
    }

    public DefaultMQPushConsumer getConsumer() {
        return consumer;
    }

    public void setConsumer(DefaultMQPushConsumer consumer) {
        this.consumer = consumer;
    }

    public List<MessageExt> getMsgs() {
        return msgs;
    }

    public void setMsgs(List<MessageExt> msgs) {
        this.msgs = msgs;
    }
}

最后定义消费消息的Service:

代码语言:javascript复制
/**
 * 用于监听MqMessageEvent的服务
 * 消费MQ消息
 * 
 * 一般两种方式:
 * (1)第一种:这个类的作用就是监听SpringEvent事件,然后再根据消息分发给其他Service进行处理,所以这里一般不会包含业务逻辑代码
 * (2)第二种:这个类的作用就是具体的消费消息类
 */
@Service
public class TestConsumerService {

    /**
     * 消费TopicTest下的TagA
     * @param event
     */
    @EventListener(condition = "#event.msgs[0].topic=='TopicTest' && #event.msgs[0].tags=='TagA'")
    public void testConsumer(MqMessageEvent event) {
        // 由于mq消费者设置了batch=1,所以每次都只会消费一条
        MessageExt msg = event.getMsgs().get(0);
        if (null != msg) {
            // 具体的消费MessageExt的逻辑
        }
    }
    
    /**
     * 消费TopicTestB
     * @param event
     */
    @EventListener(condition = "#event.msgs[0].topic=='TopicTestB' ")
    public void testConsumer(MqMessageEvent event) {
        // 由于mq消费者设置了batch=1,所以每次都只会消费一条
        MessageExt msg = event.getMsgs().get(0);
        if (null != msg) {
            // 具体的消费MessageExt的逻辑
            if (msg.getTags() == "TagA") {
            	//消费TagA的消息
            }
            else if (msg.getTags() == 'TagB'){
				//消费TagB的消息
			}
        }
    }
}
综上,你已经整合好了RocketMQ的生产者和消费者。
可能你的项目会需要多个不同生产者,多个不同的消费者,你只需要按上面的方式,新建多个不同的producer,consumer继承AbstractProducer, AbstractConsumer即可。
GitHub参考项目地址,找到src/main目录下的rocketmq包即可查看相关源码:

GitHub示例项目

最后简单脚注一些rocketmq的注意点

这些注意点都是在学习源码的过程中总结的,希望对很多还未深入源码了解rocketmq的程序员,在使用mq的过程中有所注意:

  1. 在集群消费模式下,同一个消费组(consumerGroup名相同)的所有消费者。他们所订阅的Topic,Tags都务必需要一致。 具体分析可以参考 参考 。此处涉及两个源码知识点,消息过滤和消息拉取。
  2. 生产消息过程中,消息是org.apache.rocketmq.common.message.Message . 该消息的构造方法有个参数虽然叫tags,但是它并不支持多个tag标签。一条Message仅能对应有且只有一个tag。所以不要被tags这个复数被误导了。如果你有去关注,你会发现consumer启动前配置的subscribe()订阅topic,tags时,它的参数是叫subExpression,所以在这里是支持表达式配置多个Tags。
  3. consumer属性consumeMessageBatchMaxSize默认为1,不建议去改动这个参数。这个参数表示你每次获得的List<MessageExt> msgs的消息个数。如果这里设置为1,表示每次你都只获得一个消息,也就是msgs.get(0)就可以取出这条消息。 它的工作原理大概是,配合pullBatchSize=32 。首先consumer会从它负责的queue中每隔一段时间一次拉取最多32(pullBatchSize)条消息(如果有这么多的话),然后再将这32条消息再根据tags进行一个消息过滤(因为“表达式过滤"模式,有可能会拉到其他非订阅的消息),最后将符合当前consumer订阅的消息内容,一次传递1(consumeMessageBatchMaxSize)条给开发者进行消息消费。 因此出于几点考虑: (1)一次拉一条进行消费,消费成功就返回SUCCESS,出问题就按照逻辑是记录下载,还是直接稍后重试。一般来说,都是一条消息对应一次业务处理。如果你拉N条消息,那么其中某一条失败了,你需要稍后重试。那么就会导致N-1条本来应该返回SUCESS消费成功的消息,而被迫全部失败。这样一来是不同的业务放在一起处理相互影响,另一方面如果失败了是很大的成本开销。 (2)暂时想不到
  4. 重复消费问题的产生原因: (1)consumeMessage()方法里没有用try-catch包住消费逻辑,导致一些意外的抛出异常而导致消费重试。 (2)消费消息超时时间默认15min,也就是说如果你消费消息过程中,超过15min没有返回CONSUME_SUCCESS或者RECONSUME_LATER ,即使之后返回了,也属于TIME_OUT。默认会重发这条消息给你并且无限重试,因此需要注意消息消费的时间不能超过consumeTimeout属性设置的值。

0 人点赞