分布式事务处理常用手段及生产实践

2023-05-05 20:02:07 浏览数 (1)

分布式事务处理常用手段及生产实践

引言

在分布式系统架构设计中,如何保证数据的一致性是一个非常重要的问题。而分布式事务处理就是解决这个问题的常见手段之一。本篇将介绍常见的分布式事务处理手段,并结合生产实践案例进行详细阐述。

两阶段提交

两阶段提交(Two-Phase Commit)是一种经典的分布式事务处理方法,它通过一个事务协调器(Transaction Coordinator)来协调所有参与者(Participant)的事务操作。具体流程如下:

  1. 事务协调器向所有参与者询问是否可以执行该操作。
  2. 如果所有参与者都可以执行该操作,则事务协调器发送提交请求。
  3. 如果有任何一个参与者无法执行该操作,则事务协调器发送回滚请求。

该方法的优点是比较直接,实现简单;缺点是可能存在单点故障。下面以一个在线购物场景为例,介绍如何使用两阶段提交实现分布式事务处理。

假设用户在购买商品时需要扣减库存和账户余额,场景如下:

  1. 用户下单并支付成功。
  2. 订单服务扣减库存。
  3. 账户服务扣减余额。

下面是使用两阶段提交实现分布式事务处理的步骤:

  1. 用户下单并支付成功后,订单服务向负责管理库存的服务发送扣减库存的请求,同时向负责管理账户余额的服务发送扣减余额的请求。
  2. 负责管理库存的服务收到请求后,将要扣减的库存量记录在本地事务中,并向负责管理账户余额的服务发送“预提交”请求。
  3. 负责管理账户余额的服务收到“预提交”请求后,将要扣减的余额量记录在本地事务中,并向订单服务发送“同意提交”或“拒绝提交”的响应。如果账户余额不足,服务将拒绝提交。
  4. 订单服务收到所有参与者的响应后,如果所有参与者都同意提交,则向所有参与者发送“正式提交”请求;否则,向所有参与者发送“回滚”请求。
  5. 负责管理库存和账户余额的服务在收到“正式提交”请求后,执行本地事务并释放资源。如果收到“回滚”请求,则撤销之前记录的本地事务操作,同时释放资源。

在Java的Spring Boot框架中,可以使用JTA来模拟实现两阶段提交协议。具体操作方法如下:

  1. 在pom.xml文件中添加JTA依赖。例如,可以添加Atomikos依赖:
代码语言:javascript复制
<dependency>
  <groupId>com.atomikos</groupId>
  <artifactId>atomikos-jta</artifactId>
  <version>4.0.6</version>
</dependency>
  1. 对涉及到的数据库配置进行修改,将事务管理器设置为JTA事务管理器。例如,可以在application.properties文件中添加以下配置:
代码语言:javascript复制
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/db_example
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.jndi-name=jdbc/MyDataSource

spring.jta.atomikos.connectionfactory.bean-name=myXAConnectionFactory
spring.jta.atomikos.datasource.max-idle=3
spring.jta.atomikos.datasource.min-idle=1
spring.jta.atomikos.datasource.max-active=5
spring.jta.atomikos.datasource.pool-size=5
spring.jta.atomikos.datasource.test-query=SELECT 1
  1. 在处理两阶段提交的代码中,需要使用@EnableTransactionManagement注解启用事务管理,同时使用@Transactional注解标记涉及到的方法。例如,可以在订单服务和账户服务中的扣减方法上添加@Transactional注解:
代码语言:javascript复制
@Service
public class OrderService {
    
    @Autowired
    private StockService stockService;

    @Autowired
    private AccountService accountService;

    @Transactional
    public void deductStockAndAccount(String orderId, String userId, double totalPrice) {
        // 扣减库存
        stockService.deductStock(orderId, totalPrice);

        // 扣减账户余额
        accountService.deductAccount(userId, totalPrice);
    }
}

@Service
public class AccountService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Transactional
    public void deductAccount(String userId, double totalPrice) throws Exception {
        int result = jdbcTemplate.update("UPDATE account SET balance=balance-? WHERE user_id=?", totalPrice, userId);
        if (result == 0) {
            throw new Exception("余额不足,扣减失败");
        }
    }
}

@Service
public class StockService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Transactional
    public void deductStock(String orderId, double totalPrice) throws Exception {
        int result = jdbcTemplate.update("UPDATE stock SET count=count-? WHERE order_id=?", totalPrice, orderId);
        if (result == 0) {
            throw new Exception("库存不足,扣减失败");
        }
    }
}

补偿事务

补偿事务(Compensating Transaction)是另一种常用的分布式事务处理方法。它假定在执行业务操作时,如果其中一个分支出问题,后续分支可以对之前分支已经执行过的操作进行回滚或者撤销,以此来解决分布式事务中的部分失败场景。具体流程如下:

  1. 执行第一个业务操作。
  2. 如果第二个业务操作发生异常,则执行回滚操作,撤销第一个业务操作。

这种方法的优点是相比于两阶段提交,能够更好地适应分布式系统中的不确定性和故障;缺点是实现略微复杂。下面以一个转账场景为例,介绍如何使用补偿事务实现分布式事务处理。

假设用户在进行账户之间的转账时需要拆分为两个服务:转出服务和转入服务,场景如下:

  1. 用户发起转账请求。
  2. 转出服务扣减余额并向转入服务发起转账请求。
  3. 如果转入服务成功,则完成转账操作;否则,转出服务通过补偿操作撤销转账操作。

以上三个操作需要保证原子性,即要么全部执行成功,要么全部回滚。这时可以使用补偿事务来解决该问题:

  1. 转出服务执行扣款操作,调用转入服务的转账接口。
  2. 转入服务收到转账请求并执行扣款操作。
  3. 如果转账成功,则事务结束;否则,转出服务发送一条撤销转账请求给转入服务。
  4. 转入服务接收到撤销请求后执行撤销操作,将已经扣款的金额返还给转出服务,然后事务结束。

针对这个场景,可以使用补偿事务来实现分布式事务处理。下面是步骤:

  1. 用户发起转账请求,转出服务接收到请求后,记录转账操作信息并将要扣减的余额量记录在本地事务中。
  2. 转出服务向转入服务发送转账请求,并等待转入服务响应。转入服务收到请求后,判断账户是否存在、余额是否充足等条件,如果满足条件,则将要增加的余额量记录在本地事务中,并向转出服务发送“同意提交”响应;否则,向转出服务发送“拒绝提交”响应。
  3. 转出服务接收到转入服务的响应后,如果为“拒绝提交”,则进入补偿阶段:撤销之前记录的余额扣减操作,并释放资源;否则,向转入服务发送“正式提交”请求。
  4. 转入服务在收到“正式提交”请求后,执行本地事务并释放资源。
  5. 补偿阶段结束,事务完成。

下面是使用Java的Spring Boot框架 JTA模拟实现补偿事务的过程:

  1. 在pom.xml文件中添加JTA依赖,例如,可以添加Atomikos依赖。
代码语言:javascript复制
<dependency>
  <groupId>com.atomikos</groupId>
  <artifactId>atomikos-jta</artifactId>
  <version>4.0.6</version>
</dependency>
  1. 对涉及到的数据库配置进行修改,将事务管理器设置为JTA事务管理器。例如,可以在application.properties文件中添加以下配置:
代码语言:javascript复制
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/db_example
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.jndi-name=jdbc/MyDataSource

spring.jta.atomikos.connectionfactory.bean-name=myXAConnectionFactory
spring.jta.atomikos.datasource.max-idle=3
spring.jta.atomikos.datasource.min-idle=1
spring.jta.atomikos.datasource.max-active=5
spring.jta.atomikos.datasource.pool-size=5
spring.jta.atomikos.datasource.test-query=SELECT 1
  1. 在处理补偿事务的代码中,需要使用@EnableTransactionManagement注解启用事务管理,同时使用@Transactional注解标记涉及到的方法。例如,可以在转出服务和转入服务中的扣减余额方法上添加@Transactional注解:
代码语言:javascript复制
@Service
public class TransferOutService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private TransferInService transferInService;

    @Transactional
    public void transferOut(String fromUser, String toUser, double amount) throws Exception {
        int result = jdbcTemplate.update("UPDATE account SET balance=balance-? WHERE username=?", amount, fromUser);
        if (result == 0) {
            throw new Exception("余额不足,扣减失败");
        }
        // 发起转账请求
        transferInService.transferIn(fromUser, toUser, amount);
    }

    @Transactional(rollbackFor = Exception.class)
    public void compensateTransferOut(String fromUser, double amount) {
        jdbcTemplate.update("UPDATE account SET balance=balance ? WHERE username=?", amount, fromUser);
    }

}

@Service
public class TransferInService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Transactional
    public void transferIn(String fromUser, String toUser, double amount) throws Exception {
        int result = jdbcTemplate.update("UPDATE account SET balance=balance ? WHERE username=?", amount, toUser);
        if (result == 0) {
            throw new Exception("账户不存在,转入失败");
        }
        // 向转出服务发送“同意提交”响应
        transferOutClient.agreeTransfer();
    }

    @Transactional(rollbackFor = Exception.class)
    public void compensateTransferIn(String toUser, double amount) {
        jdbcTemplate.update("UPDATE account SET balance=balance-? WHERE username=?", amount, toUser);
    }

}

消息队列

消息队列(Message Queue)是一种常用的异步通信方式,在分布式系统中也可以用来解决部分分布式事务处理问题。具体流程如下:

  1. 将分布式操作拆分为多个步骤。
  2. 将每个步骤的执行结果通过消息队列异步发送。
  3. 最终由消息消费者进行汇总处理。

这种方式可以避免分布式事务中的资源锁争用和阻塞等问题,提高系统的并发性和扩展性。下面以一个秒杀场景为例,介绍如何使用消息队列实现分布式事务处理。

假设在商家进行秒杀活动时需要保证商品库存数量和订单数量的一致性,场景如下:

  1. 用户提交秒杀订单请求。
  2. 库存服务将秒杀商品数量减1,并通过消息队列向订单服务发送创建订单消息。
  3. 订单服务接收到消息后创建订单并返回成功状态给库存服务。
  4. 如果订单服务未能成功创建订单,则库存服务通过消息队列向之前发送的消息发送回滚消息。

以上操作需要保证原子性,即要么全部执行成功,要么全部回滚。这时可以使用消息队列来解决该问题:

  1. 库存服务将秒杀商品数量减1。
  2. 库存服务将创建订单的消息发送到消息队列。
  3. 订单服务从消息队列中获取消息并创建订单。
  4. 订单服务通过另一个消息队列发送成功状态给库存服务。
  5. 如果订单服务未能成功创建订单,则库存服务从消息队列中获取撤销消息并执行撤销操作。

要基于maven的Spring Boot程序设计使用RabbitMQ模拟实现分布式事务处理,需要按如下步骤进行:

  1. 添加RabbitMQ依赖。在pom.xml文件中添加以下依赖:
代码语言:javascript复制
<!-- RabbitMQ依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 创建RabbitMQ配置文件。在application.yml文件中添加以下配置:
代码语言:javascript复制
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  1. 创建秒杀商品服务,该服务将秒杀商品数量减一并通过消息队列向订单服务发送创建订单消息。代码如下:
代码语言:javascript复制
@Service
public class SecKillService {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Transactional
    public void doSecKill(String itemId, String userId) throws Exception {
        // 减少商品数量
        int result = jdbcTemplate.update("UPDATE item SET stock=stock-1 WHERE id=?", itemId);
        if (result == 0) {
            throw new Exception("库存不足,秒杀失败");
        }
        // 发送创建订单消息
        SeckillOrder order = new SeckillOrder();
        order.setItemId(itemId);
        order.setUserId(userId);
        amqpTemplate.convertAndSend("create-order-exchange", "create-order-key", JsonUtils.toJson(order));
    }

}
  1. 创建订单服务,该服务接收到创建订单消息后创建订单,并通过消息队列向库存服务发送响应,若未能成功创建订单,则发送回滚消息。代码如下:
代码语言:javascript复制
@Service
public class OrderService {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Transactional
    @RabbitListener(queues = "create-order-queue")
    public void createOrder(String message) throws Exception {
        SeckillOrder order = JsonUtils.fromJson(message, SeckillOrder.class);
        // 执行本地事务
        int result = jdbcTemplate.update("INSERT INTO order (item_id, user_id) VALUES (?, ?)", order.getItemId(), order.getUserId());
        if (result == 0) {
            throw new Exception("创建订单失败");
        }
        // 发送响应消息
        amqpTemplate.convertAndSend("create-order-exchange", "order-created-key", message);
    }

    @RabbitListener(queues = "rollback-order-queue")
    public void rollbackOrder(String message) {
        // 执行本地事务回滚
        SeckillOrder order = JsonUtils.fromJson(message, SeckillOrder.class);
        jdbcTemplate.update("UPDATE item SET stock=stock 1 WHERE id=?", order.getItemId());
    }

}
  1. 配置RabbitMQ相关信息。在Spring Boot的Application类中添加以下代码:
代码语言:javascript复制
@SpringBootApplication
@EnableTransactionManagement
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public DirectExchange createOrderExchange() {
        return new DirectExchange("create-order-exchange");
    }

    @Bean
    public Queue createOrderQueue() {
        return new Queue("create-order-queue");
    }

    @Bean
    public Binding createOrderBinding() {
        return BindingBuilder.bind(createOrderQueue()).to(createOrderExchange()).with("create-order-key");
    }

    @Bean
    public Queue orderCreatedQueue() {
        return new Queue("order-created-queue");
    }

    @Bean
    public Binding orderCreatedBinding() {
        return BindingBuilder.bind(orderCreatedQueue()).to(createOrderExchange()).with("order-created-key");
    }

    @Bean
    public Queue rollbackOrderQueue() {
        return new Queue("rollback-order-queue");
    }

    @Bean
    public Binding rollbackOrderBinding() {
        return BindingBuilder.bind(rollbackOrderQueue()).to(createOrderExchange()).with("rollback-order-key");
    }

}

可靠消息最终一致性

可靠消息最终一致性(Reliable Message-Based Consistency)是一种通过消息中间件来确保分布式操作的可靠性和一致性的方法。具体流程如下:

  1. 分布式操作的执行结果转化为消息,并确保其可靠性。
  2. 消息消费者对消息进行汇总处理。

这种方式适用于数据一致性要求不高的场景,可以提高系统的并发性和扩展性。下面以一个用户注册送积分场景为例,介绍如何使用可靠消息最终一致性实现分布式事务处理。

假设在用户注册成功后需要给用户送积分,场景如下:

  1. 用户注册成功并向积分服务发送请求。
  2. 积分服务将用户的积分信息转化为消息并发送到消息队列。
  3. 另一个消息消费者接收到消息后执行送积分操作。

以上操作需要保证原子性,即要么全部执行成功,要么全部回滚。这时可以使用可靠消息最终一致性来解决该问题:

  1. 用户注册成功并向积分服务发送请求。
  2. 积分服务将用户的积分信息转化为消息,并通过消息中间件确保其可靠性。
  3. 另一个消息消费者从消息队列中获取消息并执行送积分操作。

结论

本篇介绍了常见的分布式事务处理手段,并结合生产实践案例进行详细阐述。实际生产需要根据具体业务场景来选择适合的分布式事务处理方法,以保证系统的可靠性和一致性。

0 人点赞