近期很多客户在咨询如何从RocketMQ企业版迁移到标准的Apache RocketMQ。基于此,我做了一下的第一版的Java代码Demo,来尝试总结一些迁移的注意事项和两者在客户端的主要差别。后期再逐步整理其他语言的Demo案例,比如我自己很喜欢的Scala和常见语言 Python/Golang/Nodejs。
第一篇文章会针对最基础的代码做迁移对比,之后会逐步增加高阶功能的迁移。
迁移动机
- 黑盒 vs 白盒 对于小部分客户来说,一个Demo也许就足以解决问题了。但是不知道同学们有没有想过,我们当前学语言除了helloworld是在用demo程序以外,大部分场景还需要Java SDK Doc。这里的Doc对于API的说明更多的是一个“合同”,来承诺一个API能做啥,不能做啥。而黑盒的demo,最多只能告诉你在一个很狭窄的场景里这个jar包能做啥,超过一点儿你就会感受到一片漆黑。就像买房子,你不能看过样板房就住进去吧?你要签合同,并且很细致的定义每一个花钱的地方是什么样的质量,承诺了什么功能和保质期。
- 绑定 vs 开源 企业版封装的API也许有一些地方是为了方便用户,更好用一些。但是大家想没想过,如果只是API层面的改进,为啥不贡献进社区?妥妥的绑定模式。Kafka/Kubernetes/Istio各种开源社区的项目逛一逛,有哪个是企业版和社区版面目全非的?面目全非一种什么样的开源?
- 自主 vs 被动 开源的目的就是,使用者发现问题,讨论出解决方案,立刻贡献给社区。而闭源版本就是你只能等着,接收或者不接受。
开始迁移之旅
说明了动机,也许有些人觉得我说的有些主观,至少这是我真实的看法。我不会喜欢用只有demo的SDK。
使用社区的客户端
我们在项目里选择使用org.apache.rocketmq的客户端。当前比较新的版本是4.8.0,完全没有问题。
代码语言:javascript复制 <dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
基础层面的代码区别
1. Apache RocketMQ的默认消费是Batch模式,源代码如下:
代码语言:javascript复制public interface MessageListenerConcurrently extends MessageListener {
/**
* It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if
* consumption failure
*
* @param msgs msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
* @return The consume status
*/
ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
final ConsumeConcurrentlyContext context);
}
这个场景下如果想达到针对每个message 消费的回掉接口,需要确认下面这个配置:
代码语言:javascript复制consumeMessageBatchMaxSize = 1
之后你会发现,在List<MessageExt> msgs
里只会有1个message,这样就确保了回掉Listener每次只处理一个信息,并且返回的Status只针对这个message。
2. Apache RocketMQ的每个consumer只能对应一个MessageListener。所以在使用下面的代码的时候你会发现MessageListener会被覆盖。
代码语言:javascript复制for(int i = 0 ; i < x ; i ){
// 这里topic会被累计叠加subscribe
consumer.subscribe(topic, tagsString); // tagString in format "tagA||tagB||tagC"
//Listener会被覆盖掉
consumer.registerMessageListener(new UnifiedConcurrentlyMessageListener(listenerMap));
}
源代码如下:
代码语言:javascript复制 public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression);
// 针对每个topic 做一个subscriptionData,然后put到一个Map当中去,所以之后可以被轮巡
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
而MessageListener就不一样了
代码语言:javascript复制 public void registerMessageListener(MessageListener messageListener) {
//直接覆盖了~ 好像也可以改名叫setMessageListener
this.messageListenerInner = messageListener;
}
所以这里如果需要监听多个topic和使用不同MessageListener的场景里,需要用类似如下的代码实现:
代码语言:javascript复制 List<DefaultMQPushConsumer> consumers = new ArrayList<>();
public void init() {
try {
for (String topic : topicList) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(namespace, groupId);
consumer.setNamesrvAddr(nameSrvAddr);
// Optional, default value is CONSUME_FROM_LAST_OFFSET
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeThreadMin(consumeThreadNums);
// default max thread number is 20
consumer.setConsumeThreadMax(consumeThreadNums * 2);
// max retry times
consumer.setMaxReconsumeTimes(3);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setConsumeMessageBatchMaxSize(1);
// subscribe to a topic and append a topic message listener.
consumer.subscribe(topic, tagsStr);
consumer.registerMessageListener(new MyMessageListener());
// store the consumer to the list
consumers.add(consumer);
// start consumer
consumer.start();
}
} catch (Throwable e) {
e.printStackTrace();
throw new RuntimeException("canot initialize consumer");
}
}
3. 有一些命名上的差别:
- 比如Apache RocketMQ的Consumer在注册Listener时决定是顺序消费还是并行消费,MessageListener分为
MessageListenerConcurrently
和MessageListenerOrderly
。在企业版里,创建Consumer的时候就定了,Consumer分为Consumer
,BatchConsumer
和OrderConsumer
。 - 比如Apache RocketMQ的
ConsumeOrderlyStatus
和ConsumeConcurrentlyStatus
,分别对应企业版的Action
和OrderAction
。
4. 生产者Async调用
Apache RocketMQ 的生产者函数send(msg, callback)
vs 企业版生产者函数sendAsync(msg, callback)
源码如下:
代码语言:javascript复制@Override
public void send(Message msg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
}
public void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getAsyncSenderExecutor();
try {
// 这里用异步的线程调用 - 非阻塞
executor.submit(new Runnable() {
@Override
public void run() {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) {
try {
sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
} catch (Exception e) {
sendCallback.onException(e);
}
} else {
sendCallback.onException(
new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
}
}
});
} catch (RejectedExecutionException e) {
throw new MQClientException("executor rejected ", e);
}
}
// 最终调用的是
// MQClientAPIImpl::sendMessageAsync
代码调用如下:
代码语言:javascript复制//异步模式
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%s%n", sendResult);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
//或者同步模式
SendResult sendResult = producer.send(msg);
运维
当前腾讯云TDMQ的Pulsar已经支持了RocketMQ的协议兼容,并且贡献给了社区 - RoP
当前产品还在内测期,开白可以使用。
想要享受开源便利,又不希望自己运维的同学们可以开始试用了~