RocketMQ事务消息使用与原理

2022-09-27 08:04:17 浏览数 (1)

内容目录

一、背景&概述

二、应用场景

三、使用方式

四、原理介绍

五、源码分析

六、总结与思考

一、背景&概述

最近在找工作,面试过程中被多次问到事务消息的实现原理,另外在分布式事务解决方案中,事务消息也是一个不错的解决方案,本篇文章将围绕RocketMQ的事务消息实现展开描述。

二、应用场景

所谓事务消息,其实是为了解决上下游写一致性,以及强依赖解耦,也即是完成当前操作的同时给下游发送指令,并且保证上下游要么同时成功或者同时失败,并且考虑上游的性能和RT问题做出的强调用解耦妥协。常见的应用场景有:

1.订单履约指令下发

用户下单成功后,给履约系统发送指令进行履约操作,下单失败不发送指令,采购缺货或者其他履约异常,反向触发订单取消或者其他兜底操作。

2.用户转账

用户发起转账后,交易状态短暂挂起,发送指令给银行,如果发起失败则不发送指令,发送成功后等待结果更新交易状态。

3.订单支付

支付发起后,当笔订单处于中间状态,给支付网关发起指令,如果发起失败则不发送指令,发送成功后等待支付网关反馈更新支付状态。

三、使用方式

1.事务消息监听器
代码语言:javascript复制
@Component
@Slf4j
public class OrderTransactionalListener implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        log.info("开始执行本地事务....");
        LocalTransactionState state;
        try{
            String body = new String(message.getBody());
            OrderDTO order = JSONObject.parseObject(body, OrderDTO.class);
            orderService.createOrder(order,message.getTransactionId());
            state = LocalTransactionState.COMMIT_MESSAGE;
            log.info("本地事务已提交。{}",message.getTransactionId());
        }catch (Exception e){
            log.error("执行本地事务失败。{}",e);
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return state;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        log.info("开始回查本地事务状态。{}",messageExt.getTransactionId());
        LocalTransactionState state;
        String transactionId = messageExt.getTransactionId();
        if (transactionLogService.get(transactionId)>0){
            state = LocalTransactionState.COMMIT_MESSAGE;
        }else {
            state = LocalTransactionState.UNKNOW;
        }
        log.info("结束本地事务状态查询:{}",state);
        return state;
    }
}
2.编写事务消息生产者
代码语言:javascript复制
@Component
@Slf4j
public class TransactionalMsgProducer implements InitializingBean, DisposableBean {
    private String GROUP = "order_transactional";
    private TransactionMQProducer msgProducer;
    //用于执行本地事务和事务状态回查的监听器
    @Autowired
    private OrderTransactionalListener orderTransactionListener;
    //执行任务的线程池
    private ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
            TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50));
    private void start(){
        try {
            this.msgProducer.start();
        } catch (MQClientException e) {
            log.error("msg producer starter occur error;",e);
        }
    }
    private void shutdown() {
        if(null != msgProducer) {
            try {
                msgProducer.shutdown();
            } catch (Exception e) {
                log.error("producer shutdown occur error;",e);
            }
        }
    }
    public TransactionSendResult send(String data, String topic) throws MQClientException {
        Message message = new Message(topic,data.getBytes());
        return this.msgProducer.sendMessageInTransaction(message, null);
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        msgProducer = new TransactionMQProducer(GROUP);
        msgProducer.setNamesrvAddr("namesrvHost:ip");
        msgProducer.setSendMsgTimeout(Integer.MAX_VALUE);
        msgProducer.setExecutorService(executor);
        msgProducer.setTransactionListener(orderTransactionListener);
        this.start();
    }
    @Override
    public void destroy() throws Exception {
        this.shutdown();
    }
}
3.业务实现
代码语言:javascript复制
@Service
@Slf4j
public class OrderService {
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private  TransactionLogMapper transactionLogMapper;
    @Autowired
    private TransactionalMsgProducer producer;
    //执行本地事务时调用,将订单数据和事务日志写入本地数据库
    @Transactional
    @Override
    public void createOrder(OrderDTO orderDTO,String transactionId){
        //1.创建订单
        Order order = new Order();
        BeanUtils.copyProperties(orderDTO,order);
        orderMapper.createOrder(order);
        //2.写入事务日志
        TransactionLog log = new TransactionLog();
        log.setId(transactionId);
        log.setBusiness("order");
        log.setForeignKey(String.valueOf(order.getId()));
        transactionLogMapper.insert(log);
        log.info("create order success,order={}",orderDTO);
    }
    //前端调用,只用于向RocketMQ发送事务消息
    @Override
    public void createOrder(OrderDTO order) throws MQClientException {
        order.setId(snowflake.nextId());
        order.setOrderNo(snowflake.nextIdStr());
        producer.send(JSON.toJSONString(order),"order");
    }
}
4.入口调用
代码语言:javascript复制
@RestController
@Slf4j
public class OrderController {

    @Autowired
    private OrderService orderService;

    @PostMapping("/create_order")
    public void createOrder(@RequestBody OrderDTO order) {
        log.info("receive order data,order={}",order.getCommodityCode());
        orderService.createOrder(order);
    }
}

这样我们就实现了rocketmq事务消息的使用。

四、原理介绍

1.概念模型
  • 半消息(half message):半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费(消费端不可见)。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息。
  • 消息状态回查(Message status check):由于网络抖动闪断、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit 或 Rollback)。可以看出,Message Status Check主要用来解决分布式事务中的超时问题。
2.执行流程

1):Producer向Broker端发送Half Message;

2):Broker ACK,Half Message发送成功;

3):Producer执行本地事务;

4):本地事务完毕,根据事务的状态,Producer向Broker发送二次确认消息,确认该Half Message的Commit或者Rollback状态。Broker收到二次确认消息后,对于Commit状态,则直接发送到Consumer端执行消费逻辑,而对于Rollback则直接标记为失败,一段时间后清除,并不会发给Consumer。正常情况下,到此分布式事务已经完成,剩下要处理的就是超时问题,即一段时间后Broker仍没有收到Producer的二次确认消息;

5):针对超时状态,Broker主动向Producer发起消息回查;

6):Producer处理回查消息,返回对应的本地事务的执行结果;

7):Broker针对回查消息的结果,执行Commit或Rollback操作,同4。

3.事务消息设计

在RocketMQ事务消息的主要流程中,一阶段的消息如何对用户不可见。其中事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息,然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。

在RocketMQ中,消息在服务端的存储结构如下,每条消息都会有对应的索引信息,Consumer通过ConsumeQueue这个二级索引来读取消息实体内容,其流程如下:

RocketMQ的具体实现策略是:写入的如果事务消息,对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消费。

在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。

一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息。

如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。

需要注意的是,rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。

五、源码分析

1.客服端发送事务消息

RocketMQ事务消息由TransactionMQProducer实现,继承DefaultMQProducer实现了发送事务消息的能力。

发送事务消息会调用TransactionMQProducer的sendMessageInTransaction方法:

代码语言:javascript复制
public TransactionSendResult sendMessageInTransaction(final Message msg,
    final Object arg) throws MQClientException {
    if (null == this.transactionListener) {
        throw new MQClientException("TransactionListener is null", null);
    }

    msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
    return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}

检查有没有配置事务监听器,监听器提供了两个方法:

  • executeLocalTransaction:执行本地事务
  • checkLocalTransaction:回查本地事务

然后调用DefaultMQProducerImpl执行发送:

代码语言:javascript复制
public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                      final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    //...省略
    SendResult sendResult = null;
    //msg设置参数TRAN_MSG,表示为事务消息
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
        //发送消息
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }
    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
            try {
                if (sendResult.getTransactionId() != null) {
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (null != transactionId && !"".equals(transactionId)) {
                    msg.setTransactionId(transactionId);
                }
                //通过LocalTransactionExecutor执行,已经废弃
                if (null != localTransactionExecuter) {
                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                } else if (transactionListener != null) {
                    //消息发送成功,执行本地事务
                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                }
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }
            } catch (Throwable e) {
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }
    try {
        //执行endTransaction方法,如果半消息发送失败或本地事务执行失败告诉服务端是删除半消息,半消息发送成功且本地事务执行成功则告诉服务端生效半消息
        this.endTransaction(sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute "   localTransactionState   ", but end broker transaction failed", e);
    }
    //省略...
    return transactionSendResult;
}

该方法做了以下几件事情:

  • 给消息打上事务属性,用于broker区分普通消息和事务消息
  • 发送半消息(half message)
  • 发送成功则由transactionListener执行本地事务
  • 执行endTransaction方法,通知broker 执行 commit/rollback

发送消息会正常调用DefaultMQProducerImpl的发送消息逻辑,执行本地事务通过transactionListener调用本地的事务逻辑,我们看一下结束事务endTransaction方法实现:

代码语言:javascript复制
public void endTransaction(
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }
    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: "   localException.toString()) : null;
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
}

本地事务执行后,则调用this.endTransaction()方法,根据本地事务执行状态,去提交事务或者回滚事务。

如果半消息发送失败或本地事务执行失败告诉服务端是删除半消息,半消息发送成功且本地事务执行成功则告诉服务端生效半消息。

2.Broker处理事务消息

RocketMQ服务端有个NettyRequestProcessor接口,类似于spring的BeanPostProcessor,broker启动的时候会把对应的实现注册到NettyRemotingServer的本地缓存processorTable中,在收到producer发送的消息会调用NettyServerHandler的channelRead0方法,然后会调用对应的NettyRequestProcessor实现处理接收到的消息请求。看一下SendMessageProcessor实现:

代码语言:javascript复制
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    SendMessageContext traceContext;
    switch (request.getCode()) {
        case RequestCode.CONSUMER_SEND_MSG_BACK:
            return this.consumerSendMsgBack(ctx, request);
        default:
            SendMessageRequestHeader requestHeader = parseRequestHeader(request);
            if (requestHeader == null) {
                return null;
            }
            TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true);
            RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
            if (rewriteResult != null) {
                return rewriteResult;
            }
            traceContext = buildMsgContext(ctx, requestHeader);
            String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
            traceContext.setCommercialOwner(owner);
            try {
                this.executeSendMessageHookBefore(ctx, request, traceContext);
            } catch (AbortProcessException e) {
                final RemotingCommand errorResponse = RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage());
                errorResponse.setOpaque(request.getOpaque());
                return errorResponse;
            }
            RemotingCommand response;
            if (requestHeader.isBatch()) {
                response = this.sendBatchMessage(ctx, request, traceContext, requestHeader, mappingContext,
                    (ctx1, response1) -> executeSendMessageHookAfter(response1, ctx1));
            } else {
                response = this.sendMessage(ctx, request, traceContext, requestHeader, mappingContext,
                    (ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12));
            }
            return response;
    }
}

会调用到SendMessageProcessor.sendMessage(),判断消息类型,进行半消息存储:

代码语言:javascript复制
public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
    final RemotingCommand request,
    final SendMessageContext sendMessageContext,
    final SendMessageRequestHeader requestHeader,
    final TopicQueueMappingContext mappingContext,
    final SendMessageCallback sendMessageCallback) throws RemotingCommandException {
    //...省略
    String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    boolean sendTransactionPrepareMessage = false;
    if (Boolean.parseBoolean(traFlag)
        && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                "the broker["   this.brokerController.getBrokerConfig().getBrokerIP1()
                      "] sending transaction message is forbidden");
            return response;
        }
        sendTransactionPrepareMessage = true;
    }
    long beginTimeMillis = this.brokerController.getMessageStore().now();
    if (brokerController.getBrokerConfig().isAsyncSendEnable()) {
      //...异步发送
        return null;
    } else {
        PutMessageResult putMessageResult = null;
        if (sendTransactionPrepareMessage) {
          //存储事务消息
            putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
        } else {
          //存储普通消息
            putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        }
        return response;
    }
}

继续看事务半消息存储实现prepareMessage:

代码语言:javascript复制
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
    return transactionalMessageBridge.putHalfMessage(messageInner);
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

备份消息的原主题名称与原队列ID,然后取消事务消息的消息标签,重新设置消息的主题为:RMQ_SYS_TRANS_HALF_TOPIC,队列ID固定为0。与其他普通消息区分开,然后完成消息持久化。

到这里,broker就初步处理完了 Producer 发送的事务半消息。

当客户端TransactionMQProducer执行endTransaction动作时,触发broker事务消息的二阶段提交,broker会执行EndTransactionProcessor的processRequest方法:

代码语言:javascript复制
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
    RemotingCommandException {
    //...省略
    OperationResult result = new OperationResult();
    if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
        result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                RemotingCommand sendResult = sendFinalMessage(msgInner);
                if (sendResult.getCode() == ResponseCode.SUCCESS) {
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return sendResult;
            }
            return res;
        }
    } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
        result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return res;
        }
    }
    response.setCode(result.getResponseCode());
    response.setRemark(result.getResponseRemark());
    return response;
}

逻辑很清晰,其核心实现如下:

  • 根据commitlogOffset找到消息
  • 如果是提交动作,就恢复原消息的主题与队列,再次存入commitlog文件进而转到消息消费队列,供消费者消费,然后将原预处理消息存入一个新的主题RMQ_SYS_TRANS_OP_HALF_TOPIC,代表该消息已被处理
  • 回滚消息,则直接将原预处理消息存入一个新的主题RMQ_SYS_TRANS_OP_HALF_TOPIC,代表该消息已被处理

还有一种情况,如果本地事务执行结果是UNKNOW或者由于网络问题没有提交,那么存储的broker的事务消息处于漂浮状态,无法主动转换成可消费或者删除状态,那么就需要broker有一种兜底机制来处理这种场景,当然RocketMQ提供了一种补偿机制,定时回查此类消息,由TransactionalMessageCheckService实现:

代码语言:javascript复制
@Override
public void run() {
    log.info("Start transaction check service thread!");
    while (!this.isStopped()) {
        long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}
@Override
protected void onWaitEnd() {
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

整体流程如下图:

六、总结与思考

异常情况覆盖
  • 客户端producer发送半消息失败

可能由于网络或者mq故障,导致 Producer 发送半消息(prepare)失败。客户端服务可以执行回滚操作,比如“订单关闭”等。

  • 半消息发送成功,本地事务执行失败

如果producer发送的半消息成功了,但是执行本地事务失败了,如更新订单状态为“已完成”。这种情况下,执行本地事务失败后,会返回rollback给 MQ,MQ会删除之前发送的半消息。不会下发指令给下游依赖。

  • 半消息投递成功,没收到MQ返回的ack

如果客户端发送半消息成功后,没有收到MQ返回的响应。可能是因为网络问题,或者其他未知异常,客户端以为发送MQ半消息失败,执行了逆向回滚流程。这个时候其实mq已经保存半消息成功了,那这个消息怎么处理?

这个时候broker的补偿逻辑上场,消息回查定时任务TransactionalMessageCheckService会每隔1分钟扫描一次半消息队列,判断是否需要消息回查,然后回查订单系统的本地事务,这时MQ就会发现订单已经变成“已关闭”,此时就要发送rollback请求给mq,删除之前的半消息。

  • commit/rollback失败

这个也是通过定时任务TransactionalMessageCheckService来做补偿,它发现这个消息超过一定时间还没有进行二阶段处理,就会回查本地事务。

缺点和替代方案

事务消息很好了解决了分布式事务场景的业务解耦,但是也存在一些问题,比如引入新的组件依赖,并且事务消息是强依赖,那么还有没有其他比较可行的替代方案,ebay提出的本地消息表是一种解决方案,消息生产方新增消息表,并记录消息发送状态。消息表和业务数据要在一个事务里提交,也就是说他们要在一个数据库里面。然后消息会经过MQ发送到消息的消费方。如果消息发送失败,会进行重试发送。消息消费方,需要处理这个消息,并完成自己的业务逻辑。此时如果本地事务处理成功,表明已经处理成功了,如果处理失败,那么就会重试执行。如果是业务上面的失败,可以给生产方发送一个业务补偿消息,通知生产方进行回滚等操作。

本地消息表的优点是避免了分布式事务,实现了最终一致性,缺点也明显,消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多支撑逻辑要处理。

0 人点赞