RockerMQ源码
1、源码环境搭建
1、源码拉取
- RocketMQ的官方Git仓库地址:https://github.com/apache/rocketmq 可以用git把项目clone下来或者直接下载代码包。也可以到RocketMQ的官方网站上下载指定版本的源码: http://rocketmq.apache.org/dowloading/releases/个人也有上传相关的源码到gitcode上面,地址:https://gitcode.net/java_wxid/rocketmq-all-4.7.1-source-release
下载后就可以解压导入到IDEA中进行解读了。我们只要注意下是下载的4.7.1版本就行了。 源码下很多的功能模块,很容易让人迷失方向,我们只关注下几个最为重要的模块: broker: broker 模块(broke 启动进程) client :消息客户端,包含消息生产者、消息消费者相关类 example: RocketMQ 例代码 namesrv:NameServer实现相关类(NameServer启动进程) store:消息存储实现相关类 各个模块的功能大都从名字上就能看懂。我们可以在有需要的时候再进去看源码。 但是这些模块有些东西还是要关注的。例如docs文件夹下的文档,以及各个模块下都有非常丰富的junit测试代码,这些都是非常有用的。
2、注解版源码引入
RocketMQ的源码中有个非常让人头疼的事情,就是他的代码注释几乎没有。为了帮助大家解读源码,我给大家准备了一个添加了自己注释的源码版本。 在配套资料当中。大家可以把这个版本导入IDEA来进行解读。 源码中对最为重要的注解设定了一个标记K1,相对不那么重要的注解设定了一个标记K2,而普通的注释就没有添加标记。大家可以在IDEA的TODO标签中配置这两个注解标记。
3、源码调试
将源码导入IDEA后,需要先对源码进行编译。编译指令 clean install -Dmaven.test.skip=true
编译完成后就可以开始调试代码了。调试时需要按照以下步骤: 调试时,先在项目目录下创建一个conf目录,并从distribution拷贝broker.conf和logback_broker.xml和logback_namesrv.xml
注解版源码中已经复制好了。
3.1 启动nameServer
展开namesrv模块,运行NamesrvStartup类即可启动NameServer 启动时,会报错,提示需要配置一个ROCKETMQ_HOME环境变量。这个环境变量我们可以在机器上配置,跟配置JAVA_HOME环境变量一样。也可以在IDEA的运行环境中配置。目录指向源码目录即可。
配置完成后,再次执行,看到以下日志内容,表示NameServer启动成功 The Name Server boot success. serializeType=JSON
3.2 启动Broker
启动Broker之前,我们需要先修改之前复制的broker.conf文件
代码语言:javascript复制 brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 自动创建Topic
autoCreateTopicEnable=true
# nameServ地址
namesrvAddr=127.0.0.1:9876
# 存储路径
storePathRootDir=E:RocketMQdatarocketmqdataDir
# commitLog路径
storePathCommitLog=E:RocketMQdatarocketmqdataDircommitlog
# 消息队列存储路径
storePathConsumeQueue=E:RocketMQdatarocketmqdataDirconsumequeue
# 消息索引存储路径
storePathIndex=E:RocketMQdatarocketmqdataDirindex
# checkpoint文件路径
storeCheckpoint=E:RocketMQdatarocketmqdataDircheckpoint
# abort文件存储路径
abortFile=E:RocketMQdatarocketmqdataDirabort
然后Broker的启动类是broker模块下的BrokerStartup。 启动Broker时,同样需要ROCETMQ_HOME环境变量,并且还需要配置一个-c 参数,指向broker.conf配置文件。
然后重新启动,即可启动Broker。
3.3 发送消息
在源码的example模块下,提供了非常详细的测试代码。例如我们启动example模块下的org.apache.rocketmq.example.quickstart.Producer类即可发送消息。 但是在测试源码中,需要指定NameServer地址。这个NameServer地址有两种指定方式,一种是配置一个NAMESRV_ADDR的环境变量。另一种是在源码中指定。我们可以在源码中加一行代码指定NameServer producer.setNamesrvAddr("127.0.0.1:9876"); 然后就可以发送消息了。
3.4 消费消息
我们可以使用同一模块下的org.apache.rocketmq.example.quickstart.Consumer类来消费消息。运行时同样需要指定NameServer地址 consumer.setNamesrvAddr("192.168.232.128:9876"); 这样整个调试环境就搭建好了。
3.5 如何看源码
下面我们可以一边调试一边讲解源码了。源码中大部分关键的地方都已经添加了注释,文档中就不做过多记录了。我们在看源码的时候,要注意,不要一看源码就一行行代码都逐步看,更不要期望一遍就把代码给看明白。这样会陷入到代码的复杂细节中,瞬间打击到放弃。 看源码时,需要用层层深入的方法。每一次阅读源码时,先了解程序执行的流程性代码,略过服务实现的细节性代码,形成大概的概念框架。然后再回头按同样的方法,逐步深入到之前略过的代码。这样才能从源码中看出一点门道来。
2、NameServer启动
源码地址:https://gitcode.net/java_wxid/rocketmq-all-4.7.1-source-release/-/tree/master/namesrv
NameServer的启动入口为NamesrvStartup类的main方法,我们可以进行逐步调试。这次看源码,我们不要太过陷入其中的细节,我们的目的是先搞清楚NameServer的大体架构。
1、核心问题
从之前的介绍中,我们已经了解到,在RocketMQ中,实际进行消息存储、推送等核心功能的是Broker。那NameServer具体做什么用呢?NameServer的核心作用其实就只有两个,一是维护Broker的服务地址并进行及时的更新。二是给Producer和Consumer提供服务获取Broker列表。 整体的流程:
2、源码重点
整个NameServer的核心就是一个NamesrvController对象。这个controller对象就跟java Web开发中的Controller功能类似,都是响应客户端请求的。 在创建NamesrvController对象时,有两个关键的配置文件NamesrvConfig这个是NameServer自己运行需要的配置信息,还一个NettyServerConfig包含Netty服务端的配置参数,固定的占用了9876端口。 比较有意思的是这个9876端口并没有提供覆盖的方法 然后在启动服务时,启动了一个RemotingServer。这个就是用来响应请求的。 在关闭服务时,关闭了四个东西remotingServer,响应请求的服务;remotingExecutor Netty服务线程池; scheduledExecutorService 定时任务;fileWatchService 这个是用来跟踪acl配置的(acl的配置文件是实时热加载的)。 所以整个NameServer的结构是这样:
3、Broker启动
源码地址:https://gitcode.net/java_wxid/rocketmq-all-4.7.1-source-release/-/tree/master/broker
Broker启动的入口在BrokerStartup这个类,可以从他的main方法开始调试。 启动过程关键点: 重点也是围绕一个BrokerController对象,先创建,然后再启动。 在BrokerStartup.createBrokerController方法中可以看到Broker的几个核心配置:
BrokerConfig NettyServerConfig:Netty服务端占用了10911端口。又是一个神奇的端口。 NettyClientConfig MessageStoreConfig
然后在BrokerController.start方法可以看到启动了一大堆Broker的核心服务,我们挑一些重要的
this.messageStore.start();启动核心的消息存储组件 this.remotingServer.start(); this.fastRemotingServer.start(); 启动两个Netty服务 this.brokerOuterAPI.start();启动客户端,往外发请求 BrokerController.this.registerBrokerAll: 向NameServer注册心跳。 this.brokerStatsManager.start(); this.brokerFastFailure.start();这也是一些负责具体业务的功能组件
我们现在不需要了解这些核心组件的具体功能,只要有个大概,Broker中有一大堆的功能组件负责具体的业务。 我们需要抽取出Broker的一个整体结构:
4、Broker注册
BrokerController.this.registerBrokerAll方法会发起向NameServer注册心跳。启动时会立即注册,同时也会启动一个线程池,以10秒延迟,默认30秒的间隔 持续向NameServer发送心跳。 BrokerController.this.registerBrokerAll这个方法就是注册心跳的入口。
5、Producer
源码地址:
https://gitcode.net/java_wxid/rocketmq-all-4.7.1-source-release/-/blob/master/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
https://gitcode.net/java_wxid/rocketmq-all-4.7.1-source-release/-/blob/master/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
https://gitcode.net/java_wxid/rocketmq-all-4.7.1-source-release/-/blob/master/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
注意Producer有两种。 一个是普通发送者DefaultMQProducer。这个只需要构建一个Netty客户端。 还一个是事务消息发送者: TransactionMQProducer。这个需要构建一个Netty客户端同时也要构建Netty服务端。对于整个Producer的流程,其实还是挺复杂的,大致分两个步骤, start()方法,准备一大堆信息,send发送消息。我们先抽取下主线。
首先,关于Borker路由信息的管理: Producer需要拉取Broker列表,然后跟Broker建立连接等等很多核心的流程,其实都是在发送消息时建立的。因为在启动时,还不知道要拉取哪个Topic的Broker列表呢。所以对于这个问题,我们关注的重点,不应该是start方法,而是send方法。Send方法中,首先需要获得Topic的路由信息。这会从本地缓存中获取,如果本地缓存中没有,就从NameServer中去申请。
路由信息大致的管理流程:
然后 获取路由信息后,会选出一个MessageQueue去发送消息。这个选MessageQueue的方法就是一个索引自增然后取模的方式。
然后 封装Netty请求发送消息。消息发从到Borker后,会由一个CommitLog类写入到CommitLog文件中。
6、消息存储
我们接着上面的流程,来关注下Broker是如何把消息进行存储的。 消息存储的入口在:DefaultMessageStore.putMessage 最终存储的文件有哪些?
commitLog:消息存储目录 config:运行期间一些配置信息 consumerqueue:消息消费队列存储目录 index:消息索引文件存储目录 abort:如果存在改文件寿命Broker非正常关闭 checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerquueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。
6.1-commitLog写入
代码见CommitLog.putMessage方法。
代码语言:javascript复制 //K2 CommitLog写入的过程
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
msg.setBornHostV6Flag();
}
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag();
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
//mappedFile 零拷贝实现
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//线程锁 注意使用锁的这种方式
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " msg.getTopic() " clientAddr: " msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
//直接以Append的方式写入文件
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
//文件写入的结果
switch (result.getStatus()) {
case PUT_OK:
break;
//文件写满了,就创建一个新文件,重写消息
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " msg.getTopic() " clientAddr: " msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
//文件刷盘
handleDiskFlush(result, putMessageResult, msg);
//主从同步
handleHA(result, putMessageResult, msg);
return putMessageResult;
}
CommitLog的doAppend方法就是Broker写入消息的实际入口。这个方法最终会把消息追加到MappedFile映射的一块内存里,并没有直接写入磁盘。写入消息的过程是串行的,一次只会允许一个线程写入。
6.2-消息转储到目标Topic
这个转储的核心服务是scheduleMessageService,他也是Broker启动过程中的一个功能组件、
然后ScheduleMessageService会每隔1秒钟执行一个executeOnTimeup任务,将消息从延迟队列中写入正常Topic中。 代码见ScheduleMessageService中的DeliverDelayedMessageTimerTask.executeOnTimeup方法。
代码语言:javascript复制 //K2 延迟消息:处理任务的具体逻辑。
public void executeOnTimeup() {
//拿到延迟级别对应的队列
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
//遍历每个延迟队列的消息
for (; i < bufferCQ.getSize(); i = ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
//把每个延迟消息封装成一个MessageExt
if (countdown <= 0) {
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
continue;
}
//将延迟消息写入正常消息队列,这样就能被消费者正常消费了。
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
msgExt ", nextOffset=" nextOffset ",offsetPy="
offsetPy ",sizePy=" sizePy, e);
}
}
} else {
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
nextOffset = offset (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" offset ", cqMinOffset="
cqMinOffset ", queueId=" cq.getQueueId());
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
这个其中有个需要注意的点就是在ScheduleMessageService的start方法中。有一个很关键的CAS操作:
代码语言:javascript复制if (started.compareAndSet(false, true)) {
这个CAS操作保证了同一时间只会有一个DeliverDelayedMessageTimerTask执行。保证了消息安全的同时也限制了消息进行回传的效率。所以,这也是很多互联网公司在使用RocketMQ时,对源码进行定制的一个重点。
6.3-分发ConsumeQueue和IndexFile
当CommitLog写入一条消息后,会有一个后台线程reputMessageService每隔1毫秒就会去拉取CommitLog中最新更新的一批消息,然后分别转发到ComsumeQueue和IndexFile里去,这就是他底层的实现逻辑。并且,如果服务异常宕机,会造成CommitLog和ConsumeQueue、IndexFile文件不一致,有消息写入CommitLog后,没有分发到索引文件,这样消息就丢失了。DefaultMappedStore的load方法提供了恢复索引文件的方法,入口在load方法。
6.4、文件同步刷盘与异步刷盘
入口:CommitLog.putMessage -> CommitLog.handleDiskFlush
代码语言:javascript复制 //K2 处理数据刷盘
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush 同步刷盘
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
//构建一个GroupCommitRequest,交给GroupCommitService处理。
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() result.getWroteBytes());
service.putRequest(request);
CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
PutMessageStatus flushStatus = null;
//同步等待文件刷新
try {
flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
if (flushStatus != PutMessageStatus.PUT_OK) {
log.error("do groupcommit, wait for flush failed, topic: " messageExt.getTopic() " tags: " messageExt.getTags()
" client address: " messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// Asynchronous flush 异步刷盘
//异步刷盘是把消息映射到MappedFile后,单独唤醒一个服务来进行刷盘
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}
其中主要涉及到是否开启了对外内存。TransientStorePoolEnable。如果开启了堆外内存,会在启动时申请一个跟CommitLog文件大小一致的堆外内存,这部分内存就可以确保不会被交换到虚拟内存中。
6.5、过期文件删除
入口: DefaultMessageStore.addScheduleTask -> DefaultMessageStore.this.cleanFilesPeriodically()
代码语言:javascript复制 private void addScheduleTask() {
//K1 定时删除过期消息的任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.this.checkSelf();
}
}, 1, 10, TimeUnit.MINUTES);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
try {
if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock();
if (lockTime > 1000 && lockTime < 10000000) {
String stack = UtilAll.jstack();
final String fileName = System.getProperty("user.home") File.separator "debug/lock/stack-"
DefaultMessageStore.this.commitLog.getBeginTimeInLock() "-" lockTime;
MixAll.string2FileNotSafe(stack, fileName);
}
}
} catch (Exception e) {
}
}
}
}, 1, 1, TimeUnit.SECONDS);
// this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
// @Override
// public void run() {
// DefaultMessageStore.this.cleanExpiredConsumerQueue();
// }
// }, 1, 1, TimeUnit.HOURS);
this.diskCheckScheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
DefaultMessageStore.this.cleanCommitLogService.isSpaceFull();
}
}, 1000L, 10000L, TimeUnit.MILLISECONDS);
}
private void cleanFilesPeriodically() {
//定时删除过期commitlog
this.cleanCommitLogService.run();
//定时删除过期的consumequeue
this.cleanConsumeQueueService.run();
}
默认情况下, Broker会启动后台线程,每60秒,检查CommitLog、ConsumeQueue文件。然后对超过72小时的数据进行删除。也就是说,默认情况下, RocketMQ只会保存3天内的数据。这个时间可以通过fileReservedTime来配置。注意他删除时,并不会检查消息是否被消费了。整个文件存储的核心入口入口在DefaultMessageStore的start方法中。
代码语言:javascript复制 public void start() throws Exception {
lock = lockFile.getChannel().tryLock(0, 1, false);
if (lock == null || lock.isShared() || !lock.isValid()) {
throw new RuntimeException("Lock failed,MQ already started");
}
lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
lockFile.getChannel().force(true);
{
/**
* 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog;
* 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go;
* 3. Calculate the reput offset according to the consume queue;
* 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
*/
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
}
}
}
if (maxPhysicalPosInLogicQueue < 0) {
maxPhysicalPosInLogicQueue = 0;
}
if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
/**
* This happens in following conditions:
* 1. If someone removes all the consumequeue files or the disk get damaged.
* 2. Launch a new broker, and copy the commitlog from other brokers.
*
* All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0.
* If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong.
*/
log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
}
log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
//K2 Broker启动时会启动一个线程来更新ConsumerQueue索引文件。
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();
/**
* 1. Finish dispatching the messages fall behind, then to start other services.
* 2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0
*/
while (true) {
if (dispatchBehindBytes() <= 0) {
break;
}
Thread.sleep(1000);
log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
}
this.recoverTopicQueueTable();
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService.start();
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
this.flushConsumeQueueService.start();
this.commitLog.start();
this.storeStatsService.start();
this.createTempFile();
//K2 Broker启动删除过期文件的定时任务
this.addScheduleTask();
this.shutdown = false;
}
6.6、文件存储部分的总结
RocketMQ的存储文件包括消息文件(Commitlog)、消息消费队列文件(ConsumerQueue)、Hash索引文件(IndexFile)、监测点文件(checkPoint)、abort(关闭异常文件)。单个消息存储文件、消息消费队列文件、Hash索引文件长度固定以便使用内存映射机制进行文件的读写操作。RocketMQ组织文件以文件的起始偏移量来命令文件,这样根据偏移量能快速定位到真实的物理文件。RocketMQ基于内存映射文件机制提供了同步刷盘和异步刷盘两种机制,异步刷盘是指在消息存储时先追加到内存映射文件,然后启动专门的刷盘线程定时将内存中的文件数据刷写到磁盘。CommitLog,消息存储文件,RocketMQ为了保证消息发送的高吞吐量,采用单一文件存储所有主题消息,保证消息存储是完全的顺序写,但这样给文件读取带来了不便,为此RocketMQ为了方便消息消费构建了消息消费队列文件,基于主题与队列进行组织,同时RocketMQ为消息实现了Hash索引,可以为消息设置索引键,根据所以能够快速从CommitLog文件中检索消息。当消息达到CommitLog后,会通过ReputMessageService线程接近实时地将消息转发给消息消费队列文件与索引文件。为了安全起见,RocketMQ引入abort文件,记录Broker的停机是否是正常关闭还是异常关闭,在重启Broker时为了保证CommitLog文件,消息消费队列文件与Hash索引文件的正确性,分别采用不同策略来恢复文件。RocketMQ不会永久存储消息文件、消息消费队列文件,而是启动文件过期机制并在磁盘空间不足或者默认凌晨4点删除过期文件,文件保存72小时并且在删除文件时并不会判断该消息文件上的消息是否被消费。
7、消费者
消费者以消费者组的模式开展。消费者组之间有集群模式和广播模式两种消费模式。然后消费模式有推模式和拉模式。推模式是由拉模式封装组成。集群模式下,消费队列负载均衡的通用原理:一个消费队列同一时间只能被一个消费者消费,而一个消费者可以同时消费多个队列。消息顺序:RocketMQ只支持一个队列上的局部消息顺序,不保证全局消息顺序。 要实现顺序消息,可以把有序的消息指定为一个queue,或者给Topic只指定一个Queue,这个不推荐。
7.1、启动
DefaultMQPushConsumer.start方法
代码语言:javascript复制 //K2 启动过程
@Override
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
defaultMQPushConsumerImpl.start()
代码语言:javascript复制 //消费者端实际启动过程
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
//K2 客户端创建工厂,这个是核心对象
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
//根据客户端配置实例化不同的consumeMessageService
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
//注册本地的消费者组缓存。
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" this.defaultMQPushConsumer.getConsumerGroup()
"] has been created before, specify another name please." FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
this.serviceState
FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
启动过程不用太过关注,有个概念就行,然后客户端启动的核心是mQClientFactory 主要是启动了一大堆的服务。 这些服务可以结合具体场景再进行深入。例如pullMessageService主要处理拉取消息服务,rebalanceService主要处理客户端的负载均衡。
7.2、消息拉取
拉模式: PullMessageService PullRequest里有messageQueue和processQueue,其中messageQueue负责拉取消息,拉取到后,将消息存入processQueue,进行处理。 存入后就可以清空messageQueue,继续拉取了。
7.3 长轮询拉取机制
在Broker的配置中,有一个配置项longPollingEnable可以配置为true开启长轮询模式。我们看下这个是干什么的。消息长轮询的处理入口在Broker端的PullMessageProcessor.processReuquest方法,这是一个非常长的方法。在403行左右。有这一段。
代码语言:javascript复制case ResponseCode.PULL_NOT_FOUND:
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
//消息长轮询
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
如果开启了长轮询机制,PullRequestHoldService会每隔5S被环境去尝试检查是否有新的消息到来,并给客户端响应,或者直到超时才给客户端响应。消息的实时性比较差,为了避免这种情况,RocketMQ还有另外一个机制,当消息到达时唤醒挂起的线程再检查一次。 这个机制的入口在DefaultMessageStore的start方法中,会启动一个reputMessageService。然后在commitLog消息分发成功后,会检查如果开启了长轮询,就会唤醒NotifyMessageArrivingListener,进行一起请求线程的检查。
代码语言:javascript复制 if (dispatchRequest.isSuccess()) {
if (size > 0) {
//分发CommitLog写入消息
DefaultMessageStore.this.doDispatch(dispatchRequest);
//K2 长轮询: 如果有消息到了主节点,并且开启了长轮询。
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
//唤醒NotifyMessageArrivingListener的arriving方法,进行一次请求线程的检查
DefaultMessageStore.this.messageArrivingListener
.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(),
dispatchRequest.getConsumeQueueOffset() 1,
dispatchRequest.getTagsCode(),
dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(),
dispatchRequest.getPropertiesMap());
}
7.4 客户端负载均衡策略
1>在消费者示例的start方法中,启动RebalanceService,这个是客户端进行负载均衡策略的启动服务。他只负责根据负载均衡策略获取当前客户端分配到的MessageQueue示例。 五种负载策略,可以由Consumer的allocateMessageQueueStrategy属性来选择。 最常用的是AllocateMessageQueueAveragely平均分配和AllocateMessageQueueAveragelyByCircle平均轮询分配。 平均分配是把MessageQueue按组内的消费者个数平均分配。 而平均轮询分配就是把MessageQueue按组内的消费者一个一个轮询分配。
例如,六个队列q1,q2,q3,q4,q5,q6,分配给三个消费者c1,c2,c3 平均分配的结果就是: c1:{q1,q2},c2:{q3,q4},c3{q5,q6} 平均轮询分配的结果就是: c1:{q1,q4},c2:{q2,q5},c3:{q3,q6}
2>消费的过程 消费的过程依然是在DefaultMQPushConsumerImpl的 consumeMessageService中。他有两个子类ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService 消费过程的入口在DefaultMQPushConsumerImpl的pullMessage中定义的PullCallback中。
8、延迟消息
延迟消息的处理入口在scheduleMessageService这个组件中。 他会在broker启动时也一起加载。 整个延迟消息的实现方式是这样的:
消息写入时,会将延迟消息转为写入到SCHEDULE_TOPIC_XXXX这个Topic中。这个系统内置的Topic有18个队列,对应18个延迟级别。 代码见CommitLog.putMessage方法。
代码语言:javascript复制 public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
msg.setBornHostV6Flag();
}
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag();
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
//mappedFile 零拷贝实现
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//线程锁 注意使用锁的这种方式
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " msg.getTopic() " clientAddr: " msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
//直接以Append的方式写入文件
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
//文件写入的结果
switch (result.getStatus()) {
case PUT_OK:
break;
//文件写满了,就创建一个新文件,重写消息
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " msg.getTopic() " clientAddr: " msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
//文件刷盘
handleDiskFlush(result, putMessageResult, msg);
//主从同步
handleHA(result, putMessageResult, msg);
return putMessageResult;
}
然后ScheduleMessageService会每隔1秒钟执行一个executeOnTimeup任务,将消息从延迟队列中写入正常Topic中。 代码见ScheduleMessageService中的DeliverDelayedMessageTimerTask.executeOnTimeup方法。
代码语言:javascript复制 //K2 延迟消息:处理任务的具体逻辑。
public void executeOnTimeup() {
//拿到延迟级别对应的队列
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
//遍历每个延迟队列的消息
for (; i < bufferCQ.getSize(); i = ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
//把每个延迟消息封装成一个MessageExt
if (countdown <= 0) {
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
continue;
}
//将延迟消息写入正常消息队列,这样就能被消费者正常消费了。
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
msgExt ", nextOffset=" nextOffset ",offsetPy="
offsetPy ",sizePy=" sizePy, e);
}
}
} else {
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
nextOffset = offset (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" offset ", cqMinOffset="
cqMinOffset ", queueId=" cq.getQueueId());
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
消费者部分小结: RocketMQ消息消费方式分别为集群模式、广播模式。 消息队列负载由RebalanceService线程默认每隔20s进行一次消息队列负载,根据当前消费者组内消费者个数与主题队列数量按照某一种负载算法进行队列分配,分配原则为同一个消费者可以分配多个消息消费队列,同一个消息消费队列同一个时间只会分配给一个消费者。 消息拉取由PullMessageService线程根据RebalanceService线程创建的拉取任务进行拉取,默认每次拉取32条消息,提交给消费者消费线程后继续下一次消息拉取。如果消息消费过慢产生消息堆积会触发消息消费拉取流控。 并发消息消费指消费线程池中的线程可以并发对同一个消息队列的消息进行消费,消费成功后,取出消息队列中最小的消息偏移量作为消息消费进度偏移量存储在于消息消费进度存储文件中,集群模式消息消费进度存储在Broker(消息服务器),广播模式消息消费进度存储在消费者端。 RocketMQ不支持任意精度的定时调度消息,只支持自定义的消息延迟级别,例如1s、2s、5s等,可通过在broker配置文件中设置messageDelayLevel。 顺序消息一般使用集群模式,是指对消息消费者内的线程池中的线程对消息消费队列只能串行消费。并并发消息消费最本质的区别是消息消费时必须成功锁定消息消费队列,在Broker端会存储消息消费队列的锁占用情况。