聊聊rocketmq的pullBatchSize

2019-11-20 10:13:19 浏览数 (1)

本文只要研究一下rocketmq的pullBatchSize

pullBatchSize

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

代码语言:javascript复制
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
​
    private final InternalLogger log = ClientLogger.getLog();
​
    //......
​
    /**
     * Batch pull size
     */
    private int pullBatchSize = 32;
​
    public int getPullBatchSize() {
        return pullBatchSize;
    }
​
    public void setPullBatchSize(int pullBatchSize) {
        this.pullBatchSize = pullBatchSize;
    }
​
    //......
}
  • DefaultMQPushConsumer定义了pullBatchSize属性,默认为32

checkConfig

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

代码语言:javascript复制
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
​
    //......
​
    private void checkConfig() throws MQClientException {
        Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());
​
        //......
​
        // pullBatchSize
        if (this.defaultMQPushConsumer.getPullBatchSize() < 1 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) {
            throw new MQClientException(
                "pullBatchSize Out of range [1, 1024]"
                      FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                null);
        }
    }
​
    //......
}
  • checkConfig方法会校验pullBatchSize大小,必须大于等于1且小于1024

pullMessage

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

代码语言:javascript复制
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
​
    //......
​
    private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
    private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
​
    //......    
​
    public void pullMessage(final PullRequest pullRequest) {
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
        if (processQueue.isDropped()) {
            log.info("the pull request[{}] is dropped.", pullRequest.toString());
            return;
        }
​
        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
​
        //......
​
        boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
            if (commitOffsetValue > 0) {
                commitOffsetEnable = true;
            }
        }
​
        String subExpression = null;
        boolean classFilter = false;
        SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (sd != null) {
            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                subExpression = sd.getSubString();
            }
​
            classFilter = sd.isClassFilterMode();
        }
​
        int sysFlag = PullSysFlag.buildSysFlag(
            commitOffsetEnable, // commitOffset
            true, // suspend
            subExpression != null, // subscription
            classFilter // class filter
        );
        try {
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                CommunicationMode.ASYNC,
                pullCallback
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception", e);
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        }
    }
​
    //......
}
  • pullMessage方法最后调用的是pullAPIWrapper.pullKernelImpl方法,它会传递defaultMQPushConsumer.getPullBatchSize()、BROKER_SUSPEND_MAX_TIME_MILLIS(1000 * 15)、CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND(1000 * 30)等参数

pullKernelImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java

代码语言:javascript复制
public class PullAPIWrapper {
​
    //......
​
    public PullResult pullKernelImpl(
        final MessageQueue mq,
        final String subExpression,
        final String expressionType,
        final long subVersion,
        final long offset,
        final int maxNums,
        final int sysFlag,
        final long commitOffset,
        final long brokerSuspendMaxTimeMillis,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);
        if (null == findBrokerResult) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                    this.recalculatePullFromWhichNode(mq), false);
        }
​
        if (findBrokerResult != null) {
            {
                // check version
                if (!ExpressionType.isTagType(expressionType)
                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                    throw new MQClientException("The broker["   mq.getBrokerName()   ", "
                          findBrokerResult.getBrokerVersion()   "] does not upgrade to support for filter message by "   expressionType, null);
                }
            }
            int sysFlagInner = sysFlag;
​
            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
            }
​
            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);
​
            String brokerAddr = findBrokerResult.getBrokerAddr();
            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
            }
​
            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                brokerAddr,
                requestHeader,
                timeoutMillis,
                communicationMode,
                pullCallback);
​
            return pullResult;
        }
​
        throw new MQClientException("The broker["   mq.getBrokerName()   "] not exist", null);
    }
​
    //......
}
  • pullKernelImpl首先通过findBrokerAddressInSubscribe方法获取FindBrokerResult,然后构造PullMessageRequestHeader,最后通过mQClientFactory.getMQClientAPIImpl().pullMessage拉取消息;defaultMQPushConsumer.getPullBatchSize()传递进来的参数对应于maxNums值

小结

DefaultMQPushConsumer定义了pullBatchSize属性,默认为32;DefaultMQPushConsumerImpl的checkConfig方法会校验pullBatchSize大小,必须大于等于1且小于1024;DefaultMQPushConsumerImpl的pullMessage方法最后调用的是pullAPIWrapper.pullKernelImpl方法,它会传递defaultMQPushConsumer.getPullBatchSize()、BROKER_SUSPEND_MAX_TIME_MILLIS(1000 * 15)、CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND(1000 * 30)等参数

doc

  • DefaultMQPushConsumer

0 人点赞