如何从RocketMQ企业版迁移Apache RocketMQ (一)

2021-12-04 21:29:56 浏览数 (2)

近期很多客户在咨询如何从RocketMQ企业版迁移到标准的Apache RocketMQ。基于此,我做了一下的第一版的Java代码Demo,来尝试总结一些迁移的注意事项和两者在客户端的主要差别。后期再逐步整理其他语言的Demo案例,比如我自己很喜欢的Scala和常见语言 Python/Golang/Nodejs。

第一篇文章会针对最基础的代码做迁移对比,之后会逐步增加高阶功能的迁移。

迁移动机

  1. 黑盒 vs 白盒 对于小部分客户来说,一个Demo也许就足以解决问题了。但是不知道同学们有没有想过,我们当前学语言除了helloworld是在用demo程序以外,大部分场景还需要Java SDK Doc。这里的Doc对于API的说明更多的是一个“合同”,来承诺一个API能做啥,不能做啥。而黑盒的demo,最多只能告诉你在一个很狭窄的场景里这个jar包能做啥,超过一点儿你就会感受到一片漆黑。就像买房子,你不能看过样板房就住进去吧?你要签合同,并且很细致的定义每一个花钱的地方是什么样的质量,承诺了什么功能和保质期。
  2. 绑定 vs 开源 企业版封装的API也许有一些地方是为了方便用户,更好用一些。但是大家想没想过,如果只是API层面的改进,为啥不贡献进社区?妥妥的绑定模式。Kafka/Kubernetes/Istio各种开源社区的项目逛一逛,有哪个是企业版和社区版面目全非的?面目全非一种什么样的开源?
  3. 自主 vs 被动 开源的目的就是,使用者发现问题,讨论出解决方案,立刻贡献给社区。而闭源版本就是你只能等着,接收或者不接受。

开始迁移之旅

说明了动机,也许有些人觉得我说的有些主观,至少这是我真实的看法。我不会喜欢用只有demo的SDK。

使用社区的客户端

我们在项目里选择使用org.apache.rocketmq的客户端。当前比较新的版本是4.8.0,完全没有问题。

代码语言:javascript复制
    <dependency>
		<groupId>org.apache.rocketmq</groupId>
		<artifactId>rocketmq-client</artifactId>
		<version>4.8.0</version>
	</dependency>

基础层面的代码区别

1. Apache RocketMQ的默认消费是Batch模式,源代码如下:

代码语言:javascript复制
public interface MessageListenerConcurrently extends MessageListener {
    /**
     * It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if
     * consumption failure
     *
     * @param msgs msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
     * @return The consume status
     */
    ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
        final ConsumeConcurrentlyContext context);
}

这个场景下如果想达到针对每个message 消费的回掉接口,需要确认下面这个配置:

代码语言:javascript复制
consumeMessageBatchMaxSize = 1

之后你会发现,在List<MessageExt> msgs里只会有1个message,这样就确保了回掉Listener每次只处理一个信息,并且返回的Status只针对这个message。

2. Apache RocketMQ的每个consumer只能对应一个MessageListener。所以在使用下面的代码的时候你会发现MessageListener会被覆盖。

代码语言:javascript复制
for(int i = 0 ; i < x ; i  ){
  // 这里topic会被累计叠加subscribe
  consumer.subscribe(topic, tagsString); // tagString in format "tagA||tagB||tagC"
  //Listener会被覆盖掉
  consumer.registerMessageListener(new UnifiedConcurrentlyMessageListener(listenerMap));
}

源代码如下:

代码语言:javascript复制
    public void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression);
            // 针对每个topic 做一个subscriptionData,然后put到一个Map当中去,所以之后可以被轮巡
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

而MessageListener就不一样了

代码语言:javascript复制
    public void registerMessageListener(MessageListener messageListener) {
        //直接覆盖了~ 好像也可以改名叫setMessageListener
        this.messageListenerInner = messageListener;
    }

所以这里如果需要监听多个topic和使用不同MessageListener的场景里,需要用类似如下的代码实现:

代码语言:javascript复制
    List<DefaultMQPushConsumer> consumers = new ArrayList<>();
    public void init() {
        try {

            for (String topic : topicList) {
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(namespace, groupId);
                consumer.setNamesrvAddr(nameSrvAddr);

                // Optional, default value is CONSUME_FROM_LAST_OFFSET
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                consumer.setConsumeThreadMin(consumeThreadNums);
                // default max thread number is 20
                consumer.setConsumeThreadMax(consumeThreadNums * 2);

                // max retry times
                consumer.setMaxReconsumeTimes(3);
                consumer.setMessageModel(MessageModel.CLUSTERING);
                consumer.setConsumeMessageBatchMaxSize(1);
                // subscribe to a topic and append a topic message listener.
                consumer.subscribe(topic, tagsStr);
                consumer.registerMessageListener(new MyMessageListener());
              
                // store the consumer to the list
                consumers.add(consumer);
                // start consumer
                consumer.start();
            }

        } catch (Throwable e) {
            e.printStackTrace();
            throw new RuntimeException("canot initialize consumer");
        }
    }

3. 有一些命名上的差别:

  • 比如Apache RocketMQ的Consumer在注册Listener时决定是顺序消费还是并行消费,MessageListener分为MessageListenerConcurrentlyMessageListenerOrderly。在企业版里,创建Consumer的时候就定了,Consumer分为ConsumerBatchConsumerOrderConsumer
  • 比如Apache RocketMQ的ConsumeOrderlyStatusConsumeConcurrentlyStatus,分别对应企业版的ActionOrderAction

4. 生产者Async调用

Apache RocketMQ 的生产者函数send(msg, callback) vs 企业版生产者函数sendAsync(msg, callback)

源码如下:

代码语言:javascript复制
@Override
    public void send(Message msg, SendCallback sendCallback, long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        msg.setTopic(withNamespace(msg.getTopic()));
        this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
    }

    public void send(final Message msg, final SendCallback sendCallback, final long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        final long beginStartTime = System.currentTimeMillis();
        ExecutorService executor = this.getAsyncSenderExecutor();
        try {
            // 这里用异步的线程调用 - 非阻塞
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    long costTime = System.currentTimeMillis() - beginStartTime;
                    if (timeout > costTime) {
                        try {
                            sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
                        } catch (Exception e) {
                            sendCallback.onException(e);
                        }
                    } else {
                        sendCallback.onException(
                            new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
                    }
                }

            });
        } catch (RejectedExecutionException e) {
            throw new MQClientException("executor rejected ", e);
        }
    }

// 最终调用的是
// MQClientAPIImpl::sendMessageAsync

代码调用如下:

代码语言:javascript复制
//异步模式
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.printf("%s%n", sendResult);
      }
    @Override
    public void onException(Throwable e) {
      	e.printStackTrace();
    }
});
//或者同步模式
SendResult sendResult = producer.send(msg);

运维

当前腾讯云TDMQ的Pulsar已经支持了RocketMQ的协议兼容,并且贡献给了社区 - RoP

当前产品还在内测期,开白可以使用。

想要享受开源便利,又不希望自己运维的同学们可以开始试用了~

下期预告: Apache RocketMQ 在RoP上如何做延迟消息和事物消息。

0 人点赞