【RocketMQ】发送事务消息

2022-12-16 18:55:10 浏览数 (1)

概念介绍

事务消息:提供类似XA或Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。

半事务消息:暂不能投递的消息,生产者已经成功地将消息发送到了RocketMQ服务端,但是RocketMQ服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。

消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。

分布式事务消息的优势

RocketMQ分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。

典型场景

在淘宝购物车下单时,涉及到购物车系统和交易系统,这两个系统之间的数据最终一致性可以通过分布式事务消息的异步处理实现。在这种场景下,交易系统是最为核心的系统,需要最大限度地保证下单成功。而购物车系统只需要订阅交易订单消息,做相应的业务处理,即可保证最终的数据一致性。

交互流程

事务消息交互流程如下图所示。

事务消息发送步骤如下:

代码语言:javascript复制
1.生产者将半事务消息发送至RocketMQ服务端。

2.RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息为半事务消息。

3.生产者开始执行本地事务逻辑。

4.生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

5.在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

事务消息回查步骤如下:

代码语言:javascript复制
1.生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

2.生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

示例代码

事务消息生产者

代码语言:javascript复制
public enum LocalTransactionState {
    COMMIT_MESSAGE,
    ROLLBACK_MESSAGE,
    UNKNOW,
}

事务消息发送完成本地事务后,可在execute方法中返回以下三种状态:

代码语言:javascript复制
COMMIT_MESSAGE:提交事务,允许消费者消费该消息。
ROLLBACK_MESSAGE:回滚事务,消息将被丢弃不允许消费。
UNKNOW:暂时无法判断状态,等待固定时间以后消息队列RocketMQ版服务端根据回查规则向生产者进行消息回查。

创建事务消息的Producer时必须指定TransactionListener的实现类,处理异常情况下事务消息的回查。

回查规则:本地事务执行完成后,若服务端收到的本地事务返回状态为TransactionStatus.Unknow,或生产者应用退出导致本地事务未提交任何状态。则服务端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。

回查间隔时间:系统默认每隔30秒发起一次定时任务,对未提交的半事务消息进行回查,共持续12小时。

代码语言:javascript复制
package com.kaigejava.rocketmq.transaction;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;

/**
 * 事务消息生产者
 */
public class TransactionProducer {

    public static void main(String[] args) throws Exception {

        TransactionMQProducer producer = new TransactionMQProducer("transaction-producer-demo");
        producer.setNamesrvAddr(NAME_SERVER_ADDRESS);

        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        });
        producer.setExecutorService(executorService);

        // 指定事务会查的实现类
        producer.setTransactionListener(new TransactionListener() {
            private final AtomicInteger transactionIndex = new AtomicInteger(0);

            private final ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                int value = transactionIndex.getAndIncrement();
                System.out.println(Thread.currentThread().getName()   "-executeLocalTransaction:"   new String(msg.getBody())   ",value="   value);
                int status = value % 3;
                localTrans.put(msg.getTransactionId(), status);
                return LocalTransactionState.UNKNOW;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println(Thread.currentThread().getName()   "-checkLocalTransaction:"   new String(msg.getBody()));
                Integer status = localTrans.get(msg.getTransactionId());
                if (null != status) {
                    switch (status) {
                        case 0:
                            return LocalTransactionState.COMMIT_MESSAGE;
                        case 1:
                            return LocalTransactionState.UNKNOW;
                        case 2:
                            return LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                }
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        producer.start();

        for(int i = 0; i < 10; i  ) {
            Message message = new Message("TransactionTopic", ("transactionDemo"   i).getBytes());
            // 发送事务消息
            producer.sendMessageInTransaction(message, i);
            System.out.println(message);
        }
    }

}

第一次消息回查最快时间:该参数支持自定义设置。若指定消息未达到设置的最快回查时间前,系统默认每隔30秒一次的回查任务不会检查该消息。

设置方式如下:

代码语言:javascript复制
Message message = new Message();
message.putUserProperties(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "60");

说明:因为系统默认的回查间隔,第一次消息回查的实际时间会向后有0秒~30秒的浮动。

例如:指定消息的第一次消息最快回查时间设置为60秒,系统在第58秒时达到定时的回查时间,但设置的60秒未到,所以该消息不在本次回查范围内。等待间隔30秒后,下一次的系统回查时间在第88秒,该消息才符合条件进行第一次回查,距设置的最快回查时间延后了28秒。

事务消息消费者

事务消息的Group ID不能与其他类型消息的Group ID共用。与其他类型的消息不同,事务消息有回查机制,回查时服务端会根据Group ID去查询生产者客户端。

代码语言:javascript复制
package com.kaigejava.rocketmq.transaction;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import static com.kaigejava.rocketmq.util.Contant.NAME_SERVER_ADDRESS;

/**
 * 事务消息消费者
 */
public class TranscationConsumer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者,指定组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction-consumer-group");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);
        // 订阅Topic
        consumer.subscribe("TransactionTopic", "*");
        //负载均衡模式消费
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册回调函数,处理消息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            try {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        //启动消息者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

使用说明

代码语言:javascript复制
1:事务消息不支持延时消息和批量消息。

2:事务回查的间隔时间:BrokerConfig.transactionCheckInterval,通过Broker的配置文件设置好。

3:为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为15次,但是用户可以通过Broker配置文件的transactionCheckMax参数来修改此限制。如果已经检查某条消息超过N次的话(N=transactionCheckMax)则Broker将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener类来修改这个行为。

4:事务消息将在Broker配置文件中的参数transactionMsgTimeout这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS来改变这个限制,该参数优先于transactionMsgTimeout参数。

5:事务性消息可能不止一次被检查或消费。

6:事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。

7:提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过RocketMQ本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。

8:事务消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者ID查询到消费者。

0 人点赞