Spring Cloud Alibaba系列之分布式事务Seata
1、分布式事务
分布式事务不是在现在微服务分布式架构上才产生的问题,在单体应用同样存在分布式事务问题,典型的场景就是单体应用使用了多个数据源。所以分布式事务的场景就是分布式的多进程环境,或者多数据源的情况。然后为什么需要有分布式事务这些组件框架?Spring 框架的@Transactional
是我们使用比较多的,但是这个注解只能支持单数据源,而且不能支持分布式的场景,所以就需要一些分布式事务的框架或者解决方案出来。
2、什么是Seata?
Seata官网也提供了Seata比较齐全的文档
总而言之,Seata是Alibaba开源的一款分布式事务解决方案框架,致力于提供简单易用,高性能的服务。Seata支持多种分布式事务解决方法,比如常见的TCC、XA、SAGA,同时还有独创的AT模式
3、AT事务模式
图来自官网,图中显示了在一个分布式事务中RM于TC的交互过程:
分布式事务处理过程的三个组件:
Transaction Coordinator (TC)
:事务协调器,维护全局和分支事务的运行状态,协调全局事务提交或回滚Transaction Manager (TM)
:事务管理器,定义全局事务的边界,发起开始全局事务、提交或回滚全局事务。Resource Manager (RM)
:资源管理器, 控制分支事务,管理分支事务处理的资源,负责本地分支的注册和状态汇报,接收TC命令,驱动分支(本地)事务的提交和回滚。
一个分布式事务有全局唯一的xid,每个分支事务有全局唯一的branchid。AT模式的交互动作主要有:
- branchRegister:分支事务在 commit 之前与 TC 交互获取 全局锁 和返回 branchId。获取全局锁过程可以成功,也有可能失败,成功就获取全局锁并返回branchId,失败就按照规则重试,若重试到最大次数失败,则发起全局事务的回滚, 对已完成的分支事务执行回滚
- branchReport:在本地事务提交后,与TC交互,上传本地事务已完成标识。branchReport动作在1.0版本做了改造,本地事务 commit 不上 报,本地事务 rollback 上报。这个改造提高了性能。
- branchCommit:在形成 globalCommit 决议后执行。AT模式,是通过异步来执行的,通过定时任务 sql 批量合并的方式,主要用于删除一阶段的undo_log
- branchRollback:在形成 globalRol lback 决议后执行。RM收到branceRollback请求后,取取 undo_log 表中对应的 branchId 记录解析 rollback_info 字 段,对比现有数据和undo log镜像,对比不同分支事务回滚,对比成功,则根据镜像构造SQL反向执行,并删除undo log
4、Seata Server下载部署
Seata官网页提供了比较友好的入门教程,Seata新手部署指南(1.4.0版本),本博客按照官网和网上资料,自己摸索写出来
在GitHub下载seata server,链接:https://github.com/seata/seata/releases
创建一个seata服务端的数据库,命名为seata,然后去GitHub找对应的脚本,https://github.com/seata/seata/blob/1.4.2/script/server/db/mysql.sql,sql脚本执行:
/conf/file.conf配置文件,默认是文件file保存的,可以修改为数据库保存:
5、Seata集成Spring Cloud
例子使用官网的示例,这里稍作改动,图来自官网:
官网提供了客户端的file.conf配置,链接:https://github.com/seata/seata/blob/develop/script/client/conf/file.conf,这里将配置信息复制到原来的file.conf里:
按照GitHub官网给的sample建业务表,然后每个库里也要加个undo_log表:
代码语言:javascript复制CREATE TABLE `undo_log`(
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY(`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8
建好的数据库:
开发环境
- JDK 1.8
- SpringBoot2.2.1
- MybatisPlus3.4.3.4
- Maven 3.2
- Mysql5.7.36
- 开发工具
- IntelliJ IDEA
- smartGit
使用阿里的脚手架,链接:https://start.aliyun.com
建立一个parent工程:
加上seata配置:
然后再建立多个子工程:
订单服务工程:
要涉及的服务都加上这个配置:
订单,下单业务:
代码语言:javascript复制import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.seata.common.rest.ResultBean;
import com.example.seata.order.bean.Order;
import com.example.seata.order.bean.dto.OrderDto;
import com.example.seata.order.mapper.OrderMapper;
import com.example.seata.order.service.IOrderService;
import org.springframework.stereotype.Service;
@Service
public class OrderService extends ServiceImpl<OrderMapper , Order> implements IOrderService {
@Override
public ResultBean createOrder(OrderDto orderDto) {
Order order = BeanUtil.copyProperties(orderDto , Order.class);
int suc = baseMapper.insert(order);
if (suc > 0) {
return ResultBean.ok();
}
return ResultBean.badRequest("创建订单失败!");
}
}
账户,扣除金额:
代码语言:javascript复制import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.seata.account.bean.Account;
import com.example.seata.account.bean.dto.AccountDto;
import com.example.seata.account.mapper.AccountMapper;
import com.example.seata.account.service.IAccountService;
import com.example.seata.common.rest.ResultBean;
import org.springframework.stereotype.Service;
@Service
public class AccountServiceImpl extends ServiceImpl<AccountMapper, Account> implements IAccountService {
@Override
public ResultBean decreaseAccount(AccountDto accountDto) {
int account = baseMapper.decreaseAccount(accountDto.getUserId() , accountDto.getAmount());
if (account > 0) {
return ResultBean.ok();
}
return ResultBean.badRequest("金额扣除失败!");
}
}
商品库存,下单成功后,库存要做增减
代码语言:javascript复制import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.seata.common.rest.ResultBean;
import com.example.seata.storage.bean.Storage;
import com.example.seata.storage.bean.dto.StorageDto;
import com.example.seata.storage.mapper.StorageMapper;
import com.example.seata.storage.service.IStorageService;
import org.springframework.stereotype.Service;
@Service
public class StorageServiceImpl extends ServiceImpl<StorageMapper, Storage> implements IStorageService {
@Override
public ResultBean decreaseStock(StorageDto storageDto) {
int count = baseMapper.decreaseStock(storageDto.getCommodityCode() , storageDto.getCount());
if (count > 0) {
return ResultBean.ok();
}
return ResultBean.badRequest("商品库存扣除失败!");
}
}
这里例子,不仅分库了,同时也是跨服务,使用openFeign调用,例子里使用了dubbo调用:
代码语言:javascript复制package com.example.seata.feign;
import com.example.seata.common.bean.OrderDto;
import com.example.seata.common.rest.ResultBean;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@FeignClient(name = "order-service" ,path = "/api/order" , url = "http://127.0.0.1:8082")
public interface IOrderFeignService {
@PostMapping(value = "/createOrder" , consumes = MediaType.APPLICATION_JSON_VALUE)
ResultBean createOrder(@RequestBody OrderDto orderDto);
}
业务的整合,一个完整的下单过程,创建订单表数据,然后会进行用户账户金额的扣除,然后仓库的商品库存也要进行扣除,这里注意加上@GlobalTransactional
和不加上效果的区别,可以模拟异常,或者直接停了仓库服务,然后几个服务的数据会不会都做更改,正常业务场景,不能出现订单数据生成成功,但是账号金额没做扣除,或者商品的库存不变,这些都是不允许的
import cn.hutool.core.bean.BeanUtil;
import com.example.seata.common.bean.AccountDto;
import com.example.seata.common.bean.BusinessDto;
import com.example.seata.common.bean.OrderDto;
import com.example.seata.common.bean.StorageDto;
import com.example.seata.feign.IAccountFeignService;
import com.example.seata.feign.IOrderFeignService;
import com.example.seata.feign.IStorageFeignService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class BusinessHandler {
@Autowired
private IAccountFeignService accountFeignService;
@Autowired
private IOrderFeignService orderFeignService;
@Autowired
private IStorageFeignService storageFeignService;
@GlobalTransactional(name = "my_test_tx_group",rollbackFor = Exception.class)
public void createOrder(BusinessDto businessDto) {
log.info("当前xid:{}" , RootContext.getXID());
log.info("开始创建订单...");
OrderDto orderDto = BeanUtil.copyProperties(businessDto , OrderDto.class);
orderFeignService.createOrder(orderDto);
log.info("创建订单完成...");
log.info("用户账号扣减金额开始...");
AccountDto accountDto = BeanUtil.copyProperties(businessDto , AccountDto.class);
accountFeignService.decreaseAccount(accountDto);
log.info("用户账号扣减金额完成...");
log.info("商品库存扣减开始...");
StorageDto storageDto = BeanUtil.copyProperties(businessDto , StorageDto.class);
storageFeignService.decreaseStock(storageDto);
log.info("商品库存扣减完成...");
}
}
不加Seata分布式事务的情况:
商品库存不变,但是订单已经创建成功
6、注册服务到nacos
下载nacos服务端:https://github.com/alibaba/nacos/releases
ps:GitHub下载太慢,可以去gitee,https://gitee.com/mirrors/Nacos#download,里面有给出一个百度网盘的链接,也可以在gitee下载代码,然后自己编译代码
window启动:在bin目录里,执行cmd,startup.cmd -m standalone
在seata的conf/registry.conf里,修改注册方式:
启动后,可以看到注册成功:
对应的服务,加上nacos配置:
代码语言:javascript复制spring:
application:
name: business-service
cloud:
nacos:
discovery:
username: nacos
password: nacos
server-addr: 127.0.0.1:8848
namespace: public
服务注册成功:
本博客代码例子可以在GitHub找到下载链接,本博客对seata涉及不多,只是作为入门教程参考
参考资料
- 分布式事务框架seata落地实践
- 分布式事务,这一篇就够了
- 使用Seata彻底解决Spring Cloud中的分布式事务问题!