近期线上MQ持续发生了消息丢失的情况,因为磁盘扩容问题,在对mq broker进行升级,今天反馈某单未进行结算,也未产生异常,接到反馈开始定位。
首先定位消费记录,发现并没有消费记录,然后进行单据状态查询,是正常节点状态,然后查询单据发送节点,发现满足发送条件,再事务提交后正常发送,接着查询发送记录,定位到错误如下:
代码语言:javascript复制org.apache.rocketmq.client.exception.MQBrokerException: CODE: 2 DESC: [REJECTREQUEST]system busy, start flow control for a while
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
at org.apache.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:556) ~[rocketmq-client-4.5.0.jar!/:4.5.0]
at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$000(MQClientAPIImpl.java:155) ~[rocketmq-client-4.5.0.jar!/:4.5.0]
at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:396) ~[rocketmq-client-4.5.0.jar!/:4.5.0]
at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54) ~[rocketmq-remoting-4.5.0.jar!/:4.5.0]
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:309) ~[rocketmq-remoting-4.5.0.jar!/:4.5.0]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
首先将类似单据重推后解决业务问题,定位原因,搜索全局异常如下
代码语言:javascript复制if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
在源码中位置
代码语言:javascript复制org.apache.rocketmq.remoting.netty.NettyRemotingAbstract
向上查询
代码语言:javascript复制public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
结合近期mq在进行频繁broker内存参数调整,磁盘抽取,查询资料得知在broker中如下代码
代码语言:javascript复制org.apache.rocketmq.broker.latency.BrokerFastFailure#cleanExpiredRequest
private void cleanExpiredRequest() {
while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
try {
if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
} else {
break;
}
} catch (Throwable ignored) {
}
}
cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
}
客户端链接超过了默认等待时间,或者调大发送消息线程池的数量,默认值为1在mq的4.x后引进了相关配置,另外应在客户端配置发送失败重试。但主要原因是由于mq消息积压导致内存写入变慢超时了。随着集群扩展希望能得到解决,另外此类消息是会丢失消息的。