Spring Cloud Alibaba系列之分布式事务Seata

2022-03-07 15:12:46 浏览数 (1)

Spring Cloud Alibaba系列之分布式事务Seata

1、分布式事务

分布式事务不是在现在微服务分布式架构上才产生的问题,在单体应用同样存在分布式事务问题,典型的场景就是单体应用使用了多个数据源。所以分布式事务的场景就是分布式的多进程环境,或者多数据源的情况。然后为什么需要有分布式事务这些组件框架?Spring 框架的@Transactional是我们使用比较多的,但是这个注解只能支持单数据源,而且不能支持分布式的场景,所以就需要一些分布式事务的框架或者解决方案出来。

2、什么是Seata?

Seata官网也提供了Seata比较齐全的文档

总而言之,Seata是Alibaba开源的一款分布式事务解决方案框架,致力于提供简单易用,高性能的服务。Seata支持多种分布式事务解决方法,比如常见的TCC、XA、SAGA,同时还有独创的AT模式

3、AT事务模式

图来自官网,图中显示了在一个分布式事务中RM于TC的交互过程:

分布式事务处理过程的三个组件:

  • Transaction Coordinator (TC):事务协调器,维护全局和分支事务的运行状态,协调全局事务提交或回滚
  • Transaction Manager (TM):事务管理器,定义全局事务的边界,发起开始全局事务、提交或回滚全局事务。
  • Resource Manager (RM):资源管理器, 控制分支事务,管理分支事务处理的资源,负责本地分支的注册和状态汇报,接收TC命令,驱动分支(本地)事务的提交和回滚。

一个分布式事务有全局唯一的xid,每个分支事务有全局唯一的branchid。AT模式的交互动作主要有:

  • branchRegister:分支事务在 commit 之前与 TC 交互获取 全局锁 和返回 branchId。获取全局锁过程可以成功,也有可能失败,成功就获取全局锁并返回branchId,失败就按照规则重试,若重试到最大次数失败,则发起全局事务的回滚, 对已完成的分支事务执行回滚
  • branchReport:在本地事务提交后,与TC交互,上传本地事务已完成标识。branchReport动作在1.0版本做了改造,本地事务 commit 不上 报,本地事务 rollback 上报。这个改造提高了性能。
  • branchCommit:在形成 globalCommit 决议后执行。AT模式,是通过异步来执行的,通过定时任务 sql 批量合并的方式,主要用于删除一阶段的undo_log
  • branchRollback:在形成 globalRol lback 决议后执行。RM收到branceRollback请求后,取取 undo_log 表中对应的 branchId 记录解析 rollback_info 字 段,对比现有数据和undo log镜像,对比不同分支事务回滚,对比成功,则根据镜像构造SQL反向执行,并删除undo log

4、Seata Server下载部署

Seata官网页提供了比较友好的入门教程,Seata新手部署指南(1.4.0版本),本博客按照官网和网上资料,自己摸索写出来

在GitHub下载seata server,链接:https://github.com/seata/seata/releases

创建一个seata服务端的数据库,命名为seata,然后去GitHub找对应的脚本,https://github.com/seata/seata/blob/1.4.2/script/server/db/mysql.sql,sql脚本执行:

/conf/file.conf配置文件,默认是文件file保存的,可以修改为数据库保存:

5、Seata集成Spring Cloud

例子使用官网的示例,这里稍作改动,图来自官网:

官网提供了客户端的file.conf配置,链接:https://github.com/seata/seata/blob/develop/script/client/conf/file.conf,这里将配置信息复制到原来的file.conf里:

按照GitHub官网给的sample建业务表,然后每个库里也要加个undo_log表:

代码语言:javascript复制
CREATE TABLE `undo_log`(
    `id` bigint(20) NOT NULL AUTO_INCREMENT,
    `branch_id` bigint(20) NOT NULL,
    `xid` varchar(100) NOT NULL,
    `context` varchar(128) NOT NULL,
    `rollback_info` longblob NOT NULL,
    `log_status` int(11) NOT NULL,
    `log_created` datetime NOT NULL,
    `log_modified` datetime NOT NULL,
    PRIMARY KEY(`id`),
    UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8

建好的数据库:

开发环境

  • JDK 1.8
  • SpringBoot2.2.1
  • MybatisPlus3.4.3.4
  • Maven 3.2
  • Mysql5.7.36
  • 开发工具
    • IntelliJ IDEA
    • smartGit

使用阿里的脚手架,链接:https://start.aliyun.com

建立一个parent工程:

加上seata配置:

然后再建立多个子工程:

订单服务工程:

要涉及的服务都加上这个配置:

订单,下单业务:

代码语言:javascript复制
import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.seata.common.rest.ResultBean;
import com.example.seata.order.bean.Order;
import com.example.seata.order.bean.dto.OrderDto;
import com.example.seata.order.mapper.OrderMapper;
import com.example.seata.order.service.IOrderService;
import org.springframework.stereotype.Service;

@Service
public class OrderService extends ServiceImpl<OrderMapper , Order> implements IOrderService {

    @Override
    public ResultBean createOrder(OrderDto orderDto) {
        Order order = BeanUtil.copyProperties(orderDto , Order.class);
        int suc = baseMapper.insert(order);
        if (suc > 0) {
            return ResultBean.ok();
        }
        return ResultBean.badRequest("创建订单失败!");
    }
}

账户,扣除金额:

代码语言:javascript复制
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.seata.account.bean.Account;
import com.example.seata.account.bean.dto.AccountDto;
import com.example.seata.account.mapper.AccountMapper;
import com.example.seata.account.service.IAccountService;
import com.example.seata.common.rest.ResultBean;
import org.springframework.stereotype.Service;

@Service
public class AccountServiceImpl extends ServiceImpl<AccountMapper, Account> implements IAccountService {

    @Override
    public ResultBean decreaseAccount(AccountDto accountDto) {
        int account = baseMapper.decreaseAccount(accountDto.getUserId() , accountDto.getAmount());
        if (account > 0) {
            return ResultBean.ok();
        }
        return ResultBean.badRequest("金额扣除失败!");
    }
}

商品库存,下单成功后,库存要做增减

代码语言:javascript复制
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.seata.common.rest.ResultBean;
import com.example.seata.storage.bean.Storage;
import com.example.seata.storage.bean.dto.StorageDto;
import com.example.seata.storage.mapper.StorageMapper;
import com.example.seata.storage.service.IStorageService;
import org.springframework.stereotype.Service;

@Service
public class StorageServiceImpl extends ServiceImpl<StorageMapper, Storage> implements IStorageService {


    @Override
    public ResultBean decreaseStock(StorageDto storageDto) {
        int count = baseMapper.decreaseStock(storageDto.getCommodityCode() , storageDto.getCount());
        if (count > 0) {
            return ResultBean.ok();
        }
        return ResultBean.badRequest("商品库存扣除失败!");
    }
}

这里例子,不仅分库了,同时也是跨服务,使用openFeign调用,例子里使用了dubbo调用:

代码语言:javascript复制
package com.example.seata.feign;

import com.example.seata.common.bean.OrderDto;
import com.example.seata.common.rest.ResultBean;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

@FeignClient(name = "order-service" ,path = "/api/order" , url = "http://127.0.0.1:8082")
public interface IOrderFeignService {

    @PostMapping(value = "/createOrder" , consumes = MediaType.APPLICATION_JSON_VALUE)
    ResultBean createOrder(@RequestBody OrderDto orderDto);

}

业务的整合,一个完整的下单过程,创建订单表数据,然后会进行用户账户金额的扣除,然后仓库的商品库存也要进行扣除,这里注意加上@GlobalTransactional和不加上效果的区别,可以模拟异常,或者直接停了仓库服务,然后几个服务的数据会不会都做更改,正常业务场景,不能出现订单数据生成成功,但是账号金额没做扣除,或者商品的库存不变,这些都是不允许的

代码语言:javascript复制
import cn.hutool.core.bean.BeanUtil;
import com.example.seata.common.bean.AccountDto;
import com.example.seata.common.bean.BusinessDto;
import com.example.seata.common.bean.OrderDto;
import com.example.seata.common.bean.StorageDto;
import com.example.seata.feign.IAccountFeignService;
import com.example.seata.feign.IOrderFeignService;
import com.example.seata.feign.IStorageFeignService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class BusinessHandler {

    @Autowired
    private IAccountFeignService accountFeignService;
    @Autowired
    private IOrderFeignService orderFeignService;
    @Autowired
    private IStorageFeignService storageFeignService;


    @GlobalTransactional(name = "my_test_tx_group",rollbackFor = Exception.class)
    public void createOrder(BusinessDto businessDto) {
        log.info("当前xid:{}" , RootContext.getXID());

        log.info("开始创建订单...");
        OrderDto orderDto = BeanUtil.copyProperties(businessDto , OrderDto.class);
        orderFeignService.createOrder(orderDto);
        log.info("创建订单完成...");

        log.info("用户账号扣减金额开始...");
        AccountDto accountDto = BeanUtil.copyProperties(businessDto , AccountDto.class);
        accountFeignService.decreaseAccount(accountDto);
        log.info("用户账号扣减金额完成...");

        log.info("商品库存扣减开始...");
        StorageDto storageDto = BeanUtil.copyProperties(businessDto , StorageDto.class);
        storageFeignService.decreaseStock(storageDto);
        log.info("商品库存扣减完成...");

    }

}

不加Seata分布式事务的情况:

商品库存不变,但是订单已经创建成功

6、注册服务到nacos

下载nacos服务端:https://github.com/alibaba/nacos/releases

ps:GitHub下载太慢,可以去gitee,https://gitee.com/mirrors/Nacos#download,里面有给出一个百度网盘的链接,也可以在gitee下载代码,然后自己编译代码

window启动:在bin目录里,执行cmd,startup.cmd -m standalone

在seata的conf/registry.conf里,修改注册方式:

启动后,可以看到注册成功:

对应的服务,加上nacos配置:

代码语言:javascript复制
spring:
  application:
    name: business-service
  cloud:
    nacos:
      discovery:
        username: nacos
        password: nacos
        server-addr: 127.0.0.1:8848
        namespace: public

服务注册成功:

本博客代码例子可以在GitHub找到下载链接,本博客对seata涉及不多,只是作为入门教程参考

参考资料

  • 分布式事务框架seata落地实践
  • 分布式事务,这一篇就够了
  • 使用Seata彻底解决Spring Cloud中的分布式事务问题!

0 人点赞