聊聊rocketmq的SERVICE_NOT_AVAILABLE

2019-12-19 10:08:53 浏览数 (1)

本文主要研究一下rocketmq的SERVICE_NOT_AVAILABLE

SERVICE_NOT_AVAILABLE

rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java

代码语言:javascript复制
public class ResponseCode extends RemotingSysResponseCode {
​
    public static final int FLUSH_DISK_TIMEOUT = 10;
​
    public static final int SLAVE_NOT_AVAILABLE = 11;
​
    public static final int FLUSH_SLAVE_TIMEOUT = 12;
​
    public static final int MESSAGE_ILLEGAL = 13;
​
    public static final int SERVICE_NOT_AVAILABLE = 14;
​
    //......
​
}
  • ResponseCode定义了SERVICE_NOT_AVAILABLE

PutMessageStatus

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java

代码语言:javascript复制
public enum PutMessageStatus {
    PUT_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
    SERVICE_NOT_AVAILABLE,
    CREATE_MAPEDFILE_FAILED,
    MESSAGE_ILLEGAL,
    PROPERTIES_SIZE_EXCEEDED,
    OS_PAGECACHE_BUSY,
    UNKNOWN_ERROR,
}
  • PutMessageStatus枚举中有一个是SERVICE_NOT_AVAILABLE

DefaultMessageStore

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

代码语言:javascript复制
public class DefaultMessageStore implements MessageStore {
​
    //......
​
    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so putMessage is forbidden");
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
​
        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("message store is slave mode, so putMessage is forbidden ");
            }
​
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
​
        if (!this.runningFlags.isWriteable()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("message store is not writeable, so putMessage is forbidden "   this.runningFlags.getFlagBits());
            }
​
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        } else {
            this.printTimes.set(0);
        }
​
        if (msg.getTopic().length() > Byte.MAX_VALUE) {
            log.warn("putMessage message topic length too long "   msg.getTopic().length());
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }
​
        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
            log.warn("putMessage message properties length too long "   msg.getPropertiesString().length());
            return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
        }
​
        if (this.isOSPageCacheBusy()) {
            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
        }
​
        long beginTime = this.getSystemClock().now();
        PutMessageResult result = this.commitLog.putMessage(msg);
​
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
​
        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }
​
        return result;
    }
​
    //......
}
  • putMessage方法在shutdown为true、当前broker的role变为SLAVE、runningFlags.isWriteable()为false时都会返回PutMessageStatus.SERVICE_NOT_AVAILABLE的PutMessageResult

RunningFlags

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java

代码语言:javascript复制
public class RunningFlags {
​
    private static final int NOT_READABLE_BIT = 1;
​
    private static final int NOT_WRITEABLE_BIT = 1 << 1;
​
    private static final int WRITE_LOGICS_QUEUE_ERROR_BIT = 1 << 2;
​
    private static final int WRITE_INDEX_FILE_ERROR_BIT = 1 << 3;
​
    private static final int DISK_FULL_BIT = 1 << 4;
​
    private volatile int flagBits = 0;
​
    public RunningFlags() {
    }
​
    //......
​
    public boolean isWriteable() {
        if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | DISK_FULL_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) {
            return true;
        }
​
        return false;
    }
​
    public boolean getAndMakeReadable() {
        boolean result = this.isReadable();
        if (!result) {
            this.flagBits &= ~NOT_READABLE_BIT;
        }
        return result;
    }
​
    public boolean getAndMakeNotReadable() {
        boolean result = this.isReadable();
        if (result) {
            this.flagBits |= NOT_READABLE_BIT;
        }
        return result;
    }
​
    public boolean getAndMakeWriteable() {
        boolean result = this.isWriteable();
        if (!result) {
            this.flagBits &= ~NOT_WRITEABLE_BIT;
        }
        return result;
    }
​
    public boolean getAndMakeNotWriteable() {
        boolean result = this.isWriteable();
        if (result) {
            this.flagBits |= NOT_WRITEABLE_BIT;
        }
        return result;
    }
​
    public void makeLogicsQueueError() {
        this.flagBits |= WRITE_LOGICS_QUEUE_ERROR_BIT;
    }
​
    public void makeIndexFileError() {
        this.flagBits |= WRITE_INDEX_FILE_ERROR_BIT;
    }
​
    public boolean getAndMakeDiskFull() {
        boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT);
        this.flagBits |= DISK_FULL_BIT;
        return result;
    }
​
    public boolean getAndMakeDiskOK() {
        boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT);
        this.flagBits &= ~DISK_FULL_BIT;
        return result;
    }
​
    //......
}
  • RunningFlags提供了isWriteable方法,在NOT_WRITEABLE_BIT、WRITE_LOGICS_QUEUE_ERROR_BIT、DISK_FULL_BIT、WRITE_INDEX_FILE_ERROR_BIT的flag下都会返回false;而getAndMakeReadable、getAndMakeNotReadable、getAndMakeWriteable、getAndMakeNotWriteable、makeLogicsQueueError、makeIndexFileError、getAndMakeDiskFull、getAndMakeDiskOK均可能改变flagBits的值

SendMessageProcessor

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java

代码语言:javascript复制
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
​
    //......
​
    private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
                                                   RemotingCommand request, MessageExt msg,
                                                   SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
                                                   int queueIdInt) {
        if (putMessageResult == null) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("store putMessage return null");
            return response;
        }
        boolean sendOK = false;
​
        switch (putMessageResult.getPutMessageStatus()) {
            // Success
            case PUT_OK:
                sendOK = true;
                response.setCode(ResponseCode.SUCCESS);
                break;
            case FLUSH_DISK_TIMEOUT:
                response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
                sendOK = true;
                break;
            case FLUSH_SLAVE_TIMEOUT:
                response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
                sendOK = true;
                break;
            case SLAVE_NOT_AVAILABLE:
                response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
                sendOK = true;
                break;
​
            // Failed
            case CREATE_MAPEDFILE_FAILED:
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("create mapped file failed, server is busy or broken.");
                break;
            case MESSAGE_ILLEGAL:
            case PROPERTIES_SIZE_EXCEEDED:
                response.setCode(ResponseCode.MESSAGE_ILLEGAL);
                response.setRemark(
                    "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
                break;
            case SERVICE_NOT_AVAILABLE:
                response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
                response.setRemark(
                    "service not available now, maybe disk full, "   diskUtil()   ", maybe your broker machine memory too small.");
                break;
            case OS_PAGECACHE_BUSY:
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
                break;
            case UNKNOWN_ERROR:
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("UNKNOWN_ERROR");
                break;
            default:
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("UNKNOWN_ERROR DEFAULT");
                break;
        }
​
        String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
        if (sendOK) {
​
            this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
            this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
                putMessageResult.getAppendMessageResult().getWroteBytes());
            this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
​
            response.setRemark(null);
​
            responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
            responseHeader.setQueueId(queueIdInt);
            responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
​
            doResponse(ctx, request, response);
​
            if (hasSendMessageHook()) {
                sendMessageContext.setMsgId(responseHeader.getMsgId());
                sendMessageContext.setQueueId(responseHeader.getQueueId());
                sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
​
                int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
                int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
                int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
​
                sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
                sendMessageContext.setCommercialSendTimes(incValue);
                sendMessageContext.setCommercialSendSize(wroteSize);
                sendMessageContext.setCommercialOwner(owner);
            }
            return null;
        } else {
            if (hasSendMessageHook()) {
                int wroteSize = request.getBody().length;
                int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
​
                sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
                sendMessageContext.setCommercialSendTimes(incValue);
                sendMessageContext.setCommercialSendSize(wroteSize);
                sendMessageContext.setCommercialOwner(owner);
            }
        }
        return response;
    }
​
    //......
}
  • handlePutMessageResult方法会将SERVICE_NOT_AVAILABLE转化为ResponseCode.SERVICE_NOT_AVAILABLE

小结

  • ResponseCode定义了SERVICE_NOT_AVAILABLE;PutMessageStatus枚举中有一个是SERVICE_NOT_AVAILABLE;handlePutMessageResult方法会将SERVICE_NOT_AVAILABLE转化为ResponseCode.SERVICE_NOT_AVAILABLE
  • DefaultMessageStore的putMessage方法在shutdown为true、当前broker的role变为SLAVE、runningFlags.isWriteable()为false时都会返回PutMessageStatus.SERVICE_NOT_AVAILABLE的PutMessageResult
  • RunningFlags提供了isWriteable方法,在NOT_WRITEABLE_BIT、WRITE_LOGICS_QUEUE_ERROR_BIT、DISK_FULL_BIT、WRITE_INDEX_FILE_ERROR_BIT的flag下都会返回false;而getAndMakeReadable、getAndMakeNotReadable、getAndMakeWriteable、getAndMakeNotWriteable、makeLogicsQueueError、makeIndexFileError、getAndMakeDiskFull、getAndMakeDiskOK均可能改变flagBits的值

doc

  • ResponseCode

0 人点赞