在RocketMQ中要实现重平衡Rebalance,此时会ConsumerManager中会调用consumerIdsChangeListener的handle方法,来执行通知调用操作handle,改变、注册、不注册,而改变的时候,会通知消费者ids改变,此时的reqeustCode会放入:NOTIFY_CONSUMER_IDS_CHANGED
远程客户端处理器会根据case判断匹配到NOTIFY_CONSUMER_IDS_CHANGED,执行notifyConsumerIdsChanged方法,根据请求头的信息,会执行立即重平衡操作rebalanceImmediately。由于rebalanceImmediately实现了服务香肠,因此wakeUp的使用,会执行run方法,而在run方法的前面是waitForRunning,这个方法会等待线程运行,其是一个倒计时线程计数器同步并发,从而执行重平衡操作。而此时我们会看到两个方法:
代码语言:javascript复制DefaultMQPullConsumerImpl#doRebalance
DefaultMQPushConsumerImpl#doRebalance
那么两者有什么区别吗?
我们可以看到两者本质都是基于pull模式,虽然两者一个是拉模式,一个是推模式。可以看到DefaultMQPullConsumerImpl的doRebalance操作是一个空实现。
重平衡服务的启动每隔20s执行一次
代码语言:javascript复制//执行重平衡操作
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//进行重平衡操作 通过主题执行重平衡操作
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
// 截断消息队列不是自己的队列
this.truncateMessageQueueNotMyTopic();
}
可以看到执行重平衡会通过主题执行,同时还会截断不属于自己的队列。而在重平衡中更新处理队列表updateProcessQueueTableInRebalance,如果队列的主题与需要重平衡的主题一样,同时不包含,则设置丢弃,否者移除不必要的队列。否则 查看当前的处理队列是否拉取过期,如果true,则查看是否是消费激活,如果是,则直接跳过,否者不是,则设置丢弃,同时查看移除不必要的队列。同时还会执行一个重要的操作转发到拉取操作:
代码语言:javascript复制this.dispatchPullRequest(pullRequestList) //执行转发拉取请求操作
而push的拉取中是将其放入到pullRequestQueue拉取请求队列中:
代码语言:javascript复制//执行拉取消息
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
此时可以看到:
代码语言:javascript复制PullRequest pullRequest = this.pullRequestQueue.take();
可以可到器会获取请求队列的拉取请求,同时请求到pullMessageProcessor,然后拿到消息。这个过程是主动的,而不是被动的。
代码语言:javascript复制public PullResult pullMessage(
final String addr,
final PullMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
//创建请求数据包
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
//交流的模式:oneway、异步、同步
switch (communicationMode) {
case ONEWAY:
assert false;
return null;
case ASYNC:
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC:
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
}
return null;
}
可以看到拉取请求时,放入请求的code,从而进行拉取操作执行回调。
代码语言:javascript复制//处理拉取响应
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
//拉取回调 onSucess
pullCallback.onSuccess(pullResult);
而执行重平衡的操作的过程中使用了锁,而锁的操作是值得我们去学习的。使用锁的过程中,参考了AQS的方式,也即使用队列对队列进行存储,然后执行操作,而这个体现则是RebalanceLockManager的tryLockBatch和unlockBatch:
代码语言:javascript复制创建锁定mqs和未锁定mqs, 对消息队列进行遍历,首先判断是否锁定,如果需要锁定,则将其添加到锁定队列,否者放入不锁定队列,可以看到lockEntry中会放入上次更新的时间戳,同时放入锁定的mq到lockedMqs,这个lockMqs是一个HashSet的队列。
如果lockEntry中如果锁定clientId,则设置上次更新时间戳,同时将其添加到锁定队列,如果lockEntry如果过期,则设置客户端id,同时设置上次更新时间戳,添加锁定队列。
不锁定的过程是一个遍历移除队列的过程。
那在生产端是怎样实现高可用的呢?
在生产端会延迟上,也即latencyFaultTolerance机制,其实现的延迟时间是有13个等级的:
代码语言:javascript复制protected static final int[] DELAY_LEVEL = {
1, 5, 10, 30, 1 * 60, 5 * 60, 10 * 60,
30 * 60, 1 * 3600, 2 * 3600, 6 * 3600, 12 * 3600, 1 * 24 * 3600};