序
本文主要研究一下rocketmq的SERVICE_NOT_AVAILABLE
SERVICE_NOT_AVAILABLE
rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
代码语言:javascript复制public class ResponseCode extends RemotingSysResponseCode {
public static final int FLUSH_DISK_TIMEOUT = 10;
public static final int SLAVE_NOT_AVAILABLE = 11;
public static final int FLUSH_SLAVE_TIMEOUT = 12;
public static final int MESSAGE_ILLEGAL = 13;
public static final int SERVICE_NOT_AVAILABLE = 14;
//......
}
- ResponseCode定义了SERVICE_NOT_AVAILABLE
PutMessageStatus
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java
代码语言:javascript复制public enum PutMessageStatus {
PUT_OK,
FLUSH_DISK_TIMEOUT,
FLUSH_SLAVE_TIMEOUT,
SLAVE_NOT_AVAILABLE,
SERVICE_NOT_AVAILABLE,
CREATE_MAPEDFILE_FAILED,
MESSAGE_ILLEGAL,
PROPERTIES_SIZE_EXCEEDED,
OS_PAGECACHE_BUSY,
UNKNOWN_ERROR,
}
- PutMessageStatus枚举中有一个是SERVICE_NOT_AVAILABLE
DefaultMessageStore
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
代码语言:javascript复制public class DefaultMessageStore implements MessageStore {
//......
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is slave mode, so putMessage is forbidden ");
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is not writeable, so putMessage is forbidden " this.runningFlags.getFlagBits());
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
this.printTimes.set(0);
}
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " msg.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " msg.getPropertiesString().length());
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessage(msg);
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
//......
}
- putMessage方法在shutdown为true、当前broker的role变为SLAVE、runningFlags.isWriteable()为false时都会返回PutMessageStatus.SERVICE_NOT_AVAILABLE的PutMessageResult
RunningFlags
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
代码语言:javascript复制public class RunningFlags {
private static final int NOT_READABLE_BIT = 1;
private static final int NOT_WRITEABLE_BIT = 1 << 1;
private static final int WRITE_LOGICS_QUEUE_ERROR_BIT = 1 << 2;
private static final int WRITE_INDEX_FILE_ERROR_BIT = 1 << 3;
private static final int DISK_FULL_BIT = 1 << 4;
private volatile int flagBits = 0;
public RunningFlags() {
}
//......
public boolean isWriteable() {
if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | DISK_FULL_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) {
return true;
}
return false;
}
public boolean getAndMakeReadable() {
boolean result = this.isReadable();
if (!result) {
this.flagBits &= ~NOT_READABLE_BIT;
}
return result;
}
public boolean getAndMakeNotReadable() {
boolean result = this.isReadable();
if (result) {
this.flagBits |= NOT_READABLE_BIT;
}
return result;
}
public boolean getAndMakeWriteable() {
boolean result = this.isWriteable();
if (!result) {
this.flagBits &= ~NOT_WRITEABLE_BIT;
}
return result;
}
public boolean getAndMakeNotWriteable() {
boolean result = this.isWriteable();
if (result) {
this.flagBits |= NOT_WRITEABLE_BIT;
}
return result;
}
public void makeLogicsQueueError() {
this.flagBits |= WRITE_LOGICS_QUEUE_ERROR_BIT;
}
public void makeIndexFileError() {
this.flagBits |= WRITE_INDEX_FILE_ERROR_BIT;
}
public boolean getAndMakeDiskFull() {
boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT);
this.flagBits |= DISK_FULL_BIT;
return result;
}
public boolean getAndMakeDiskOK() {
boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT);
this.flagBits &= ~DISK_FULL_BIT;
return result;
}
//......
}
- RunningFlags提供了isWriteable方法,在NOT_WRITEABLE_BIT、WRITE_LOGICS_QUEUE_ERROR_BIT、DISK_FULL_BIT、WRITE_INDEX_FILE_ERROR_BIT的flag下都会返回false;而getAndMakeReadable、getAndMakeNotReadable、getAndMakeWriteable、getAndMakeNotWriteable、makeLogicsQueueError、makeIndexFileError、getAndMakeDiskFull、getAndMakeDiskOK均可能改变flagBits的值
SendMessageProcessor
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
代码语言:javascript复制public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
//......
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
RemotingCommand request, MessageExt msg,
SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
int queueIdInt) {
if (putMessageResult == null) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store putMessage return null");
return response;
}
boolean sendOK = false;
switch (putMessageResult.getPutMessageStatus()) {
// Success
case PUT_OK:
sendOK = true;
response.setCode(ResponseCode.SUCCESS);
break;
case FLUSH_DISK_TIMEOUT:
response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
sendOK = true;
break;
case FLUSH_SLAVE_TIMEOUT:
response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
sendOK = true;
break;
case SLAVE_NOT_AVAILABLE:
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
sendOK = true;
break;
// Failed
case CREATE_MAPEDFILE_FAILED:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("create mapped file failed, server is busy or broken.");
break;
case MESSAGE_ILLEGAL:
case PROPERTIES_SIZE_EXCEEDED:
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark(
"the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
break;
case SERVICE_NOT_AVAILABLE:
response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
response.setRemark(
"service not available now, maybe disk full, " diskUtil() ", maybe your broker machine memory too small.");
break;
case OS_PAGECACHE_BUSY:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
break;
case UNKNOWN_ERROR:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UNKNOWN_ERROR");
break;
default:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UNKNOWN_ERROR DEFAULT");
break;
}
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
if (sendOK) {
this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
response.setRemark(null);
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
doResponse(ctx, request, response);
if (hasSendMessageHook()) {
sendMessageContext.setMsgId(responseHeader.getMsgId());
sendMessageContext.setQueueId(responseHeader.getQueueId());
sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
sendMessageContext.setCommercialSendTimes(incValue);
sendMessageContext.setCommercialSendSize(wroteSize);
sendMessageContext.setCommercialOwner(owner);
}
return null;
} else {
if (hasSendMessageHook()) {
int wroteSize = request.getBody().length;
int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
sendMessageContext.setCommercialSendTimes(incValue);
sendMessageContext.setCommercialSendSize(wroteSize);
sendMessageContext.setCommercialOwner(owner);
}
}
return response;
}
//......
}
- handlePutMessageResult方法会将SERVICE_NOT_AVAILABLE转化为ResponseCode.SERVICE_NOT_AVAILABLE
小结
- ResponseCode定义了SERVICE_NOT_AVAILABLE;PutMessageStatus枚举中有一个是SERVICE_NOT_AVAILABLE;handlePutMessageResult方法会将SERVICE_NOT_AVAILABLE转化为ResponseCode.SERVICE_NOT_AVAILABLE
- DefaultMessageStore的putMessage方法在shutdown为true、当前broker的role变为SLAVE、runningFlags.isWriteable()为false时都会返回PutMessageStatus.SERVICE_NOT_AVAILABLE的PutMessageResult
- RunningFlags提供了isWriteable方法,在NOT_WRITEABLE_BIT、WRITE_LOGICS_QUEUE_ERROR_BIT、DISK_FULL_BIT、WRITE_INDEX_FILE_ERROR_BIT的flag下都会返回false;而getAndMakeReadable、getAndMakeNotReadable、getAndMakeWriteable、getAndMakeNotWriteable、makeLogicsQueueError、makeIndexFileError、getAndMakeDiskFull、getAndMakeDiskOK均可能改变flagBits的值
doc
- ResponseCode