SpringBoot优雅整合RocketMQ
本篇文章默认你已经有RocketMQ的基础:
- Producer启动过程,消息发送过程
- Consumer启动过程,消息拉取消息消费过程
- NameServer,Broker,Topic,Queue等相关概念
本篇内容默认你已经有SpringBoot的基础:
- @Component ,@Service
- @PostConstruct
- @PreDestory
- ApplicationEventPublisher
具备模板方法设计模式的概念
你只有简单具备上述知识,就可以继续往下阅读文章
Step1 : Maven引入相关依赖
代码语言:javascript复制<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.1.0-incubating</version>
</dependency>
引入fastjson及rocketmq-client依赖,这两个都是必须的。版本号根据自己实际需求可更改
Step2:生产者
思想:利用@Compoent注解让生产者实例受Spring容器管理,并且利用@PostConstruct实现生产者启动以及@PreDestory实现生产者关闭 注意事项:
- 下面的生产者,会伴随SpringBoot启动时调用start()启动生产者,在开发者需要使用的时候利用SpringIoc依赖注入即可。在项目运行过程中,可以随时调用shutdown()方法以关闭生产者。
- 如果你认为,生产者长时间闲置不好,亦可以根据自己的需求,变更逻辑。可以是每次由Spring容器返回一个新的实例,但是要记得,你这样做,每次使用完都要手动shutdown()
- 实际开发中,应该用Log日志方式代替System.out.println()输出
/**
* 长连接producer抽象
*/
public abstract class AbstractMqProducer {
protected DefaultMQProducer producer;
@PostConstruct
public abstract void start() throws MQClientException;
@PreDestroy
public void shutdown() {
System.err.println("AbstractMqProducer @PreDestroy调用");
producer.shutdown();
}
}
生产者的抽象类,定义了start()和shutdown()方法。其中start()方法需要开发者进行重写。开发者需要在start()方法中,为producer进行初始化和启动工作。
代码语言:javascript复制/**
* 生产者示例1
*
* 利用SpringBoot的特性,首先将其注解Component,让Spring容器接管这个实例
* 利用PostConstruct来让实例化后的Bean进行后置处理
*/
@Component
public class TestProducer1 extends AbstractMqProducer{
@Value("${dy.rocketmq.producer.producerGroup}")
private String producerGroup;
@Value("${dy.rocketmq.namesrvAddr}")
private String namesrvAddr;
@Value("${dy.rocketmq.producer.instanceName}")
private String instanceName;
@Override
public void start() throws MQClientException {
if (null == producer) {
producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
producer.setInstanceName(instanceName);
}
producer.start();
System.out.println(namesrvAddr);
System.err.println("rocketmq producer is starting...");
}
public boolean send(String topic, String tag, String key, TestMqMessageDto msg) {
try {
Message message = new Message(
topic, tag, key,
JSONObject.toJSONString(msg).getBytes("utf-8")
);
SendResult sendResult = producer.send(message);
System.err.println("消息生产结果:" sendResult);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
}
重写start()方法,@Value注解大家应该不陌生,他会从application.yml中获取对应的属性值,此处你也可以直接手动设置值。
为你的producer定义send()方法,可根据你的需求写多个不同的send方法以达到生产消息的目的
如果你想每一次消息发送的时候才启动producer,并且在发送成功后shutdown(),那么你可以修改AbstractProducer类的逻辑,并且在子类send()中,每次都调用start(),然后在发送结束后shutdown()
代码语言:javascript复制// 使用方式:在Spring接管的Bean中,直接使用@Autowired来获取producer实例
@Autowired
private TestProducer1 testProducer1;
{
// 直接使用 发送消息
testProducer1.send("topic", "tag", "key", 内容);
}
Step3: 消费者
代码语言:javascript复制/**
* MQ消费者抽象类
*
* 定义消费消息的逻辑
*/
public abstract class AbstractMqConsumer {
protected DefaultMQPushConsumer consumer;
// 是否允许顺序消费
protected boolean isOrderConsumer = false;
@Autowired
private ApplicationEventPublisher publisher;
/**
* 初始化consumer,由开发者控制
*
* 例如:
* try {
* consumer = new DefaultMQPushConsumer(consumerGroup);
* consumer.setNamesrvAddr(namesrvAddr);
* consumer.setMessageModel(MessageModel.CLUSTERING);
* consumer.setConsumeMessageBatchMaxSize(1);
* consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
* consumer.subscribe("TopicTest", "*");
* } catch (MQClientException e) {
* e.printStackTrace();
* }
*/
abstract void start0();
@PostConstruct
private void start() {
if (null == consumer) {
start0();
}
if (isOrderConsumer) {
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
try {
consumeOrderlyContext.setAutoCommit(true);
if (null == msgs || msgs.size() == 0) {
return ConsumeOrderlyStatus.SUCCESS;
}
publisher.publishEvent(new MqMessageEvent(consumer, msgs));
} catch (Exception e) {
e.printStackTrace();
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
}
else {
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
if (null == msgs || msgs.size() == 0) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
publisher.publishEvent(new MqMessageEvent(consumer, msgs));
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
}
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000L);
consumer.start();
System.err.println("rocketmq consumer server is starting...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
@PreDestroy
public void shutdown() {
consumer.shutdown();
}
}
抽象了消费者,所有消费者继承这个类,然后实现start0()方法。
start0方法:由开发者去编写,目的是初始化consumer,但无需调用consumer.start();
start()方法: 该方法定义了ApplicationEventPublisher发布消息事件的逻辑,他会根据你的consumer类型(顺序消费,并发消费)来注册不同的MessageListener
PS:
- 如果你不懂ApplicationEventPublisher,请自行百度。大概来说,它是Spring实现的一种“观察者模式”。由ApplicationEventPublisher.publish()来通知对应的订阅者处理事件。
/**
* 消费者1示例
*/
@Component
public class TestConsumer1 extends AbstractMqConsumer {
@Value("${dy.rocketmq.consumer.consumerGroup}")
private String consumerGroup;
@Value("${dy.rocketmq.namesrvAddr}")
private String namesrvAddr;
@Override
void start0() {
try {
consumer = new DefaultMQPushConsumer(consumerGroup);
// 设置namesrv地址
consumer.setNamesrvAddr(namesrvAddr);
// 设置集群消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 设置每次消费消息的数量,官方一般建议1条,除非你有批量处理的需求
consumer.setConsumeMessageBatchMaxSize(1);
// 设置消费策略
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 设置订阅的topic和tags
consumer.subscribe("TopicTest", "*");
// ... 根据自己的需求设置consumer其他参数
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
代码语言:javascript复制/**
* 监听rocketMQ消费消息的spring event
*/
public class MqMessageEvent extends ApplicationEvent {
private DefaultMQPushConsumer consumer;
private List<MessageExt> msgs;
public MqMessageEvent(DefaultMQPushConsumer consumer, List<MessageExt> msgs) {
super(msgs);
this.consumer = consumer;
this.msgs = msgs;
}
public DefaultMQPushConsumer getConsumer() {
return consumer;
}
public void setConsumer(DefaultMQPushConsumer consumer) {
this.consumer = consumer;
}
public List<MessageExt> getMsgs() {
return msgs;
}
public void setMsgs(List<MessageExt> msgs) {
this.msgs = msgs;
}
}
最后定义消费消息的Service:
代码语言:javascript复制/**
* 用于监听MqMessageEvent的服务
* 消费MQ消息
*
* 一般两种方式:
* (1)第一种:这个类的作用就是监听SpringEvent事件,然后再根据消息分发给其他Service进行处理,所以这里一般不会包含业务逻辑代码
* (2)第二种:这个类的作用就是具体的消费消息类
*/
@Service
public class TestConsumerService {
/**
* 消费TopicTest下的TagA
* @param event
*/
@EventListener(condition = "#event.msgs[0].topic=='TopicTest' && #event.msgs[0].tags=='TagA'")
public void testConsumer(MqMessageEvent event) {
// 由于mq消费者设置了batch=1,所以每次都只会消费一条
MessageExt msg = event.getMsgs().get(0);
if (null != msg) {
// 具体的消费MessageExt的逻辑
}
}
/**
* 消费TopicTestB
* @param event
*/
@EventListener(condition = "#event.msgs[0].topic=='TopicTestB' ")
public void testConsumer(MqMessageEvent event) {
// 由于mq消费者设置了batch=1,所以每次都只会消费一条
MessageExt msg = event.getMsgs().get(0);
if (null != msg) {
// 具体的消费MessageExt的逻辑
if (msg.getTags() == "TagA") {
//消费TagA的消息
}
else if (msg.getTags() == 'TagB'){
//消费TagB的消息
}
}
}
}
综上,你已经整合好了RocketMQ的生产者和消费者。
可能你的项目会需要多个不同生产者,多个不同的消费者,你只需要按上面的方式,新建多个不同的producer,consumer继承AbstractProducer, AbstractConsumer即可。
GitHub参考项目地址,找到src/main目录下的rocketmq包即可查看相关源码:
GitHub示例项目
最后简单脚注一些rocketmq的注意点
这些注意点都是在学习源码的过程中总结的,希望对很多还未深入源码了解rocketmq的程序员,在使用mq的过程中有所注意:
- 在集群消费模式下,同一个消费组(consumerGroup名相同)的所有消费者。他们所订阅的Topic,Tags都务必需要一致。 具体分析可以参考 参考 。此处涉及两个源码知识点,消息过滤和消息拉取。
- 生产消息过程中,消息是org.apache.rocketmq.common.message.Message . 该消息的构造方法有个参数虽然叫tags,但是它并不支持多个tag标签。一条Message仅能对应有且只有一个tag。所以不要被tags这个复数被误导了。如果你有去关注,你会发现consumer启动前配置的subscribe()订阅topic,tags时,它的参数是叫subExpression,所以在这里是支持表达式配置多个Tags。
- consumer属性consumeMessageBatchMaxSize默认为1,不建议去改动这个参数。这个参数表示你每次获得的List<MessageExt> msgs的消息个数。如果这里设置为1,表示每次你都只获得一个消息,也就是msgs.get(0)就可以取出这条消息。 它的工作原理大概是,配合pullBatchSize=32 。首先consumer会从它负责的queue中每隔一段时间一次拉取最多32(pullBatchSize)条消息(如果有这么多的话),然后再将这32条消息再根据tags进行一个消息过滤(因为“表达式过滤"模式,有可能会拉到其他非订阅的消息),最后将符合当前consumer订阅的消息内容,一次传递1(consumeMessageBatchMaxSize)条给开发者进行消息消费。 因此出于几点考虑: (1)一次拉一条进行消费,消费成功就返回SUCCESS,出问题就按照逻辑是记录下载,还是直接稍后重试。一般来说,都是一条消息对应一次业务处理。如果你拉N条消息,那么其中某一条失败了,你需要稍后重试。那么就会导致N-1条本来应该返回SUCESS消费成功的消息,而被迫全部失败。这样一来是不同的业务放在一起处理相互影响,另一方面如果失败了是很大的成本开销。 (2)暂时想不到
- 重复消费问题的产生原因: (1)consumeMessage()方法里没有用try-catch包住消费逻辑,导致一些意外的抛出异常而导致消费重试。 (2)消费消息超时时间默认15min,也就是说如果你消费消息过程中,超过15min没有返回CONSUME_SUCCESS或者RECONSUME_LATER ,即使之后返回了,也属于TIME_OUT。默认会重发这条消息给你并且无限重试,因此需要注意消息消费的时间不能超过consumeTimeout属性设置的值。