RocketMQ system busy

2022-11-11 09:38:45 浏览数 (2)

近期线上MQ持续发生了消息丢失的情况,因为磁盘扩容问题,在对mq broker进行升级,今天反馈某单未进行结算,也未产生异常,接到反馈开始定位。

首先定位消费记录,发现并没有消费记录,然后进行单据状态查询,是正常节点状态,然后查询单据发送节点,发现满足发送条件,再事务提交后正常发送,接着查询发送记录,定位到错误如下:

代码语言:javascript复制
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 2  DESC: [REJECTREQUEST]system busy, start flow control for a while
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
  at org.apache.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:556) ~[rocketmq-client-4.5.0.jar!/:4.5.0]
  at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$000(MQClientAPIImpl.java:155) ~[rocketmq-client-4.5.0.jar!/:4.5.0]
  at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:396) ~[rocketmq-client-4.5.0.jar!/:4.5.0]
  at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54) ~[rocketmq-remoting-4.5.0.jar!/:4.5.0]
  at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:309) ~[rocketmq-remoting-4.5.0.jar!/:4.5.0]
  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
  at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

首先将类似单据重推后解决业务问题,定位原因,搜索全局异常如下

代码语言:javascript复制
if (pair.getObject1().rejectRequest()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
}

在源码中位置

代码语言:javascript复制
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract

向上查询

代码语言:javascript复制
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }
    
  class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }

结合近期mq在进行频繁broker内存参数调整,磁盘抽取,查询资料得知在broker中如下代码

代码语言:javascript复制
org.apache.rocketmq.broker.latency.BrokerFastFailure#cleanExpiredRequest

private void cleanExpiredRequest() {
        while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
            try {
                if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
                    final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
                    if (null == runnable) {
                        break;
                    }

                    final RequestTask rt = castRunnable(runnable);
                    rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
                } else {
                    break;
                }
            } catch (Throwable ignored) {
            }
        }

        cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
            this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());

        cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
            this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());

        cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
            this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());

        cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
            .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
    }

客户端链接超过了默认等待时间,或者调大发送消息线程池的数量,默认值为1在mq的4.x后引进了相关配置,另外应在客户端配置发送失败重试。但主要原因是由于mq消息积压导致内存写入变慢超时了。随着集群扩展希望能得到解决,另外此类消息是会丢失消息的。

0 人点赞