RocketMQ事务消息原理简析

2023-01-13 17:15:09 浏览数 (2)

零、业务场景

代码语言:txt复制
在项目中,经常遇到这样一个场景,需要保证数据持久化和消息发送要么同时成功,要么同时失败。比如当用户在交易系统下了一个订单,购物车需要消费订单消息清除加购数据、积分系统需要变更用户积分、短信平台需要给买家发送提醒等,交易系统要将订单落入DB和发送订单消息保证一致,不能本地事务回滚,订单没有生成但是发送了创建订单消息,下游系统产生脏数据,也不能订单已经创建,但是下游系统没有感知继而无法履约,影响用户体验。
代码语言:txt复制
如果让我们自己实现的话,当然也是有办法的。比如在业务数据库中建立一张消息表用于存储消息,将业务数据和消息数据放在同一个事务中进行存储,就可以利用数据库事务保证同时原子性。后续可以定时扫描消息表,将消息数据再发送出去。
代码语言:txt复制
当然也可以用现成的解决方案,RocketMQ从4.3.0版本开始,支持事务消息。我们只需要编写对应的本地事务执行方法executeLocalTransaction和本地事务执行结果检查方法checkLocalTransaction,RocketMQ会自动调用本地事务执行。如果本地事务执行成功,下游才能消费到消息,如果本地事务执行失败,下游是无法感知到这条消息的

一、使用方法

代码语言:txt复制
使用RocketMQ发送事务消息,只有消息发送和普通消息发送有所区别。参见官方示例:
代码语言:java复制
// TransactionProducer.java
// 需要自定义一个TransactionListener用于执行事务executeLocalTransaction和事务执行结果回查checkLocalTransaction 代码在下面
TransactionListener transactionListener = new TransactionListenerImpl();
// 事务消息发送producer
TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
// 创建一个线程池 用于Broker回查本地事务执行状态 如果这里没有创建,RocketMQ会自动创建一个线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {
  Thread thread = new Thread(r);
  thread.setName("client-transaction-msg-check-thread");
  return thread;
});

producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();

String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < MESSAGE_COUNT; i  ) {
  try {
    Message msg =
      new Message(TOPIC, tags[i % tags.length], "KEY"   i,
                  ("Hello RocketMQ "   i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    // 发送事务消息
    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    System.out.printf("%s%n", sendResult);

    Thread.sleep(10);
  } catch (MQClientException | UnsupportedEncodingException e) {
    e.printStackTrace();
  }
}
代码语言:java复制
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    /**
     * 执行本地事务
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 本地事务执行状态回查
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
代码语言:txt复制
本地事务的执行状态,有三种结果:
  • LocalTransactionState.COMMIT_MESSAGE:事务执行成功,Broker会处理消息供下游消费
  • LocalTransactionState.ROLLBACK_MESSAGE:事务被回滚,Broker会删除消息,下游感知不到消息
  • LocalTransactionState.UNKNOW:事务的执行结果未知,比如事务还在执行中,稍后Broker会回重复回查,直到超过最大时间或者最大次数二、原理解析0、整体流程
image.pngimage.png
  1. Producer发送事务消息// 编写TransactionListener实现类用于执行本地事务和本地事务回查 TransactionListener transactionListener = new TransactionListenerImpl(); // 发送事务消息专用的TransactionMQProducer TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP); producer.setNamesrvAddr(DEFAULT_NAMESRVADDR); // 新建一个线程池用于异步执行从Broker过来的事务回查 如果这里不新建 Broker也会自动创建一个 ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < MESSAGE_COUNT; i ) { try { Message msg = new Message(TOPIC, tags[i % tags.length], "KEY" i, ("Hello RocketMQ " i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送事务消息 SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } }// 发送事务消息 public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { // 校验是否设置TransactionListener 发送事务消息必须要有TransactionListener TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } // ignore DelayTimeLevel parameter if (msg.getDelayTimeLevel() != 0) { // 事务消息不支持延迟发送 MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; // 标记prepare消息 Broker根据这个判断是否是一条事务消息 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); // 设置消息生产者组 为了查询事务消息本地事务状态时 从该生产者组中随机选择一个消息生产者即可 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } // ...... // 对发送结果的处理稍后解析 }2、Broker接收事务消息// asyncSendMessage方法 CompletableFuture<PutMessageResult> putMessageResult = null; // 依据消息是否有MessageConst.PROPERTY_TRANSACTION_PREPARED判断是否事务消息 String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (Boolean.parseBoolean(transFlag)) { // 处理事务消息 if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" this.brokerController.getBrokerConfig().getBrokerIP1() "] sending transaction message is forbidden"); return CompletableFuture.completedFuture(response); } // 保存事务消息 putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); } else { // 保存消息 putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); }private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { // 备份原本的topic和队列 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); // 设置新的topic为RMQ_SYS_TRANS_HALF_TOPIC msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); // 队列是写死的 只有一个 也就是说是顺序处理的 msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }3、Producer处理发送消息结果// sendMessageInTransaction方法 try { // 事务消息发送结果 sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; // 处理事务消息发送结果 switch (sendResult.getSendStatus()) { // 发送成功的情况 case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } if (null != localTransactionExecuter) { // 这个已经废弃了 不会进入这里 localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { // 这里执行本地事务 log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { // 有catch逻辑 是考虑到事务执行异常的场景 log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; }4、Producer通知Broker事务执行状态try { // 事务消息收尾工作 通知Broker干活 this.endTransaction(msg, sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " localTransactionState ", but end broker transaction failed", e); }public void endTransaction( final Message msg, final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } String transactionId = sendResult.getTransactionId(); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); // 设置本地事务执行状态 switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " localException.toString()) : null; // 发送消息给Broker this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }public void endTransactionOneway( final String addr, final EndTransactionRequestHeader requestHeader, final String remark, final long timeoutMillis ) throws RemotingException, InterruptedException { // 指定Broker使用EndTransactionProcessor处理 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader); request.setRemark(remark); // 单向消息 不考虑发送结果 // 也就是说 是可能发送失败的 发送失败之后Broker会回查 this.remotingClient.invokeOneway(addr, request, timeoutMillis); }5、Broker处理事务执行状态通知// processRequest方法 // 上面的代理是Broker向Producer回查事务后的处理 稍后解析 else { // 发送半消息之后产生的调用 switch (requestHeader.getCommitOrRollback()) { // 如果事务执行不是commit或者rollback 直接返回 不再进行下面的逻辑 case MessageSysFlag.TRANSACTION_NOT_TYPE: { LOGGER.warn("The producer[{}] end transaction in sending message, and it's pending status." "RequestHeader: {} Remark: {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); return null; } // 如果事务执行是commit 接着下面的处理 case MessageSysFlag.TRANSACTION_COMMIT_TYPE: { break; } // 如果事务执行是rollback 打印异常日志 接着下面的处理 case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: { LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message." "RequestHeader: {} Remark: {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); break; } default: return null; } } OperationResult result = new OperationResult(); if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // 根据偏移量 取出topic是RMQ_SYS_TRANS_HALF_TOPIC的消息 // 第2步Broker保存消息之后 会把偏移量通知Producer Producer再传到这里 result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); // 事务执行状态是commit if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { // 从消息中取出原来的topic和队列等信息 构建真实消息 MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); // 清除事务消息相关标记 防止循环处理 MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED); // 保存真实消息 RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { // 删除消息RMQ_SYS_TRANS_HALF_TOPIC // 实际上是投递到RMQ_SYS_TRANS_OP_HALF_TOPIC中 并标记删除 // 这里为什么还要投递到RMQ_SYS_TRANS_OP_HALF_TOPIC中 不直接删除呢 后面还需要根据这个判断是否是重复处理等 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; } } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { // 如果事务状态是rollback 删除消息RMQ_SYS_TRANS_HALF_TOPIC 投递到RMQ_SYS_TRANS_OP_HALF_TOPIC中并标记删除 // 比commit少了一个投递真实主题的步骤 result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return res; } }6、Broker主动捞取消息TransactionalMessageCheckService类实现Runnable接口,在Broker启动的时候,回调用BrokerController的start方法,在start方法中,会调用TransactionalMessageCheckService的start方法启动线程,run方法是一个死循环,默认每6秒执行一次public void run() { log.info("Start transaction check service thread!"); long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval(); while (!this.isStopped()) { this.waitForRunning(checkInterval); } log.info("End transaction check service thread!"); }循环中实际执行的是这个方法 protected void onWaitEnd() { // 事务过期时间 只有当消息存储时间加上这个过期时间大于系统当前时间 才对消息执行事务回查 否则在下一次周期中执行事务回查操作 long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); // 事务回查最大检测次数 如果超过最大检测次数还是无法获知消息的事务状态 不会再会回查 直接丢弃相当于回滚事务 int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); long begin = System.currentTimeMillis(); log.info("Begin to check prepare message, begin time:{}", begin); this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin); }public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) { try { // 获取事务半消息主题下的全部队列 然后依次处理 String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC; Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic); if (msgQueues == null || msgQueues.size() == 0) { log.warn("The queue of topic is empty :" topic); return; } log.debug("Check topic={}, queues={}", topic, msgQueues); for (MessageQueue messageQueue : msgQueues) { long startTime = System.currentTimeMillis(); MessageQueue opQueue = getOpQueue(messageQueue); // RMQ_SYS_TRANS_HALF_TOPIC消息消费进度 long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue); // 收到事务消息提交或者回滚请求后的MQ_SYS_TRANS_OP_HALF_TOPIC中消息消费进度 long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue); log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset); if (halfOffset < 0 || opOffset < 0) { log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset); continue; } List<Long> doneOpOffset = new ArrayList<>(); HashMap<Long, Long> removeMap = new HashMap<>(); // 根据当前的处理进度 依次从已处理队列MQ_SYS_TRANS_OP_HALF_TOPIC拉取32条消息 PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset); if (null == pullResult) { log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null", messageQueue, halfOffset, opOffset); continue; } // single thread // 获取空消息的次数 int getMessageNullCount = 1; // 当前处理半消息的进度 long newOffset = halfOffset; // 当前处理消息的队列偏移量 long i = halfOffset; while (true) { // 超过时常等待下次调度 if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) { log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT); break; } // 如果该消息已被处理 继续处理下一条消息 if (removeMap.containsKey(i)) { log.debug("Half offset {} has been committed/rolled back", i); Long removedOpOffset = removeMap.remove(i); doneOpOffset.add(removedOpOffset); } else { // 获取消息 GetResult getResult = getHalfMsg(messageQueue, i); MessageExt msgExt = getResult.getMsg(); if (msgExt == null) { // 超过重试次数 直接跳出 结束该消息队列的事务回查状态 if (getMessageNullCount > MAX_RETRY_COUNT_WHEN_HALF_NULL) { break; } // 没有新的消息而返回 结束该消息队列的事务回查 if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) { log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); break; } else { log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); // 其它原因 重新拉取 i = getResult.getPullResult().getNextBeginOffset(); newOffset = i; continue; } } // needDiscard 如果该消息回查的次数超过允许回查的最大次数 该消息将被丢弃 事务消息提交失败 每回查一次 在消息属性中 1 默认回查最大次数为5 // needSkip 如果事务消息超过文件的过期时间 默认72小时 则跳过该消息 if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { listener.resolveDiscardMsg(msgExt); newOffset = i 1; i ; continue; } if (msgExt.getStoreTimestamp() >= startTime) { log.debug("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp())); break; } // 消息已存储时间 当前系统时间减去消息存储时间 long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); // checkImmunityTime 立即检测事务消息的时间 设计的意义是 应用程序在发送事务消息后 事务不会马上提交 该时间就是假设事务消息发送成功后 应用程序事务提交时间 在这段时间内 RocketMQ任务事务未提交 不应该在这个时间段内向应用程序发送回查请求 // transactionTimeout 事务消息的超时时间 这个时间是从OP拉取消息的最后一条消息存储时间与check方法开始的时间 如果时间差超过了transactionTimeout 就算时间小于checkImmunityTime 也发送事务回查指令 long checkImmunityTime = transactionTimeout; // PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS 消息事务消息回查的最晚时间 单位为秒 指的是程序发送事务消息 可以指定该事务消息的有效时间 只有在这个时间内收到回查消息才有效 默认为null String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); if (null != checkImmunityTimeStr) { checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); if (valueOfCurrentMinusBorn < checkImmunityTime) { if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) { newOffset = i 1; i ; continue; } } } else { // 如果当前时间没过应用程序事务结束时间 跳出本次处理 if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) { log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, checkImmunityTime, new Date(msgExt.getBornTimestamp())); break; } } List<MessageExt> opMsg = pullResult.getMsgFoundList(); // 如果OP队列中没有已处理消息并且已经超过应用程序事务结束时间transactionTimeout // 或者 // 操作队列不为空并且最后一条消息的存储时间已经超过transactionTimeout boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1); if (isNeedCheck) { // 这里回查是异步处理的 所以在回查之前 需要把消息重新投递到队列中 以便下次check if (!putBackHalfMsgQueue(msgExt, i)) { continue; } // 执行回查逻辑 listener.resolveHalfMsg(msgExt); } else { // 如果无法判断是否发送回查消息 则加载更多的已处理消息进行筛选 pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult); continue; } } newOffset = i 1; i ; } if (newOffset != halfOffset) { // 保存半消息的回查进度 transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset); } long newOpOffset = calculateOpOffset(doneOpOffset, opOffset); if (newOpOffset != opOffset) { // 保存OP进度 transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset); } } } catch (Throwable e) { log.error("Check error", e); } }7、Broker主动回查事务状态public void resolveHalfMsg(final MessageExt msgExt) { executorService.execute(new Runnable() { @Override public void run() { try { sendCheckMessage(msgExt); } catch (Exception e) { LOGGER.error("Send check message error!", e); } } }); } public void sendCheckMessage(MessageExt msgExt) throws Exception { CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader(); checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId()); checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId()); checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset()); msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setStoreSize(0); String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); // 从同一个生产者组中选择一个Producer进行回查 // 所以同一个生产者组中如果部分机器出现宕机、发布重启等问题 也不会影响回查 Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId); if (channel != null) { brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt); } else { LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId); } } public void checkProducerTransactionState( final String group, final Channel channel, final CheckTransactionStateRequestHeader requestHeader, final MessageExt messageExt) throws Exception { // 给Producer发送消息时 指定类型是CHECK_TRANSACTION_STATE RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader); request.setBody(MessageDecoder.encode(messageExt, false)); try { this.brokerController.getRemotingServer().invokeOneway(channel, request, 10); } catch (Exception e) { log.error("Check transaction failed because invoke producer exception. group={}, msgId={}, error={}", group, messageExt.getMsgId(), e.toString()); } }9、Producer本地事务回查public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.CHECK_TRANSACTION_STATE: // 判断是Broker事务回查 检查本地事务执行状态 return this.checkTransactionState(ctx, request); // ......省略部分代码 } return null; }public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) { Runnable request = new Runnable() { //...省略部分代码 下文解析 }; // 这里正是用新建TransactionMQProducer时创建的线程池异步执行提高效率 this.checkExecutor.submit(request); }public void run() { TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); TransactionListener transactionListener = getCheckListener(); if (transactionCheckListener != null || transactionListener != null) { LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable exception = null; try { if (transactionCheckListener != null) { localTransactionState = transactionCheckListener.checkLocalTransactionState(message); } else if (transactionListener != null) { // 检查本地事务 log.debug("Used new check API in transaction message"); localTransactionState = transactionListener.checkLocalTransaction(message); } else { log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group); } } catch (Throwable e) { log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); exception = e; } // 将本地事务状态通知Broker // 和第四步Producer第一次尝试通知Broker一样 也是单向发送 可能发送失败 this.processTransactionState( localTransactionState, group, exception); } else { log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group); } }
  2. Broker端SendMessageProcessor收到消息后,判断如果是一条事务消息,会将消息原来的topic和队列id存储到消息拓展中,设置新的topic为RMQ_SYS_TRANS_HALF_TOPIC然后 进行存储,然后通知Producer
  3. Producer收到Broker消息发送成功后,开始执行本地事务
  4. 本地事务执行完毕,Producer将事务执行状态通知Broker
  5. Broker端EndTransactionProcessor收到事务执行状态,从RMQ_SYS_TRANS_HALF_TOPIC中取出消息。如果事务执行成功,则从消息拓展中取出原本的topic和队列id,存储到真实的topic和队列id中,存储到RMQ_SYS_TRANS_OP_HALF_TOPIC主题中;如果是事务回滚,只把消息存储到RMQ_SYS_TRANS_OP_HALF_TOPIC主题中
  6. 如果Broker没有收到Producer事务执行状态的通知,Broker端TransactionalMessageCheckService会主动定时从RMQ_SYS_TRANS_HALF_TOPIC中捞取消息,判断是否有需要回查的消息
  7. 如果有需要回查的消息,Broker端TransactionalMessageCheckService会向Producer回查事务状态
  8. Producer执行TransactionListener的checkLocalTransaction方法,查询事务执行状态
  9. Producer查询本地事务状态之后再执行上述第4步和第5步1、Producer发送消息

0 人点赞