SpringCloud2023中使用Seata解决分布式事务

2024-08-10 16:37:56 浏览数 (1)

你好,这里是codetrend专栏“SpringCloud2023实战”。

可以点击合集查看往期文章,这是第10篇更新。

本文简单介绍SpringCloud2023中集成Seata来使用分布式事务。

前言

对于分布式系统而言,需要保证分布式系统中的数据一致性,保证数据在子系统中始终保持一致,避免业务出现问题。分布式系统中对数据的操作要么一起成功,要么一起失败,必须是一个整体性的事务。

分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。

简单的说,在分布式系统中一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务节点上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。

举个例子:在电商网站中,用户对商品进行下单,需要在订单表中创建一条订单数据,同时需要在库存表中修改当前商品的剩余库存数量,两步操作一个添加,一个修改,一定要保证这两步操作一定同时操作成功或失败,否则业务就会出现问题。

任何事务机制在实现时,都应该考虑事务的 ACID 特性,包括:本地事务、分布式事务。对于分布式事务而言,即使不能都很好的满足,也要考虑支持到什么程度。

典型的分布式事务场景:跨库事务、分库分表、微服务化。

Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。在 Seata 开源之前,其内部版本在阿里系内部一直扮演着应用架构层数据一致性的中间件角色,帮助经济体平稳的度过历年的双11,对上层业务进行了有力的技术支撑。

启动 Seata Server

具体的详细教程可以查看seata文档,搜索seata即可。

  • 导入相关脚本。undo_log 、global_table、branch_table、lock_table、distributed_lock。
  • 下载seata server应用,启动 Seata server。

相关文件可以在seata的源码仓库获取到。

代码语言:shell复制
# linux系统
sh seata-server.sh -p $LISTEN_PORT -m $MODE(file or db) -h $HOST -e $ENV
# windows 系统
seata-server.bat -p $LISTEN_PORT -m $MODE(file or db) -h $HOST -e $ENV
  • Seata 1.5.1支持Seata控制台本地访问控制台地址:http://127.0.0.1:7091
  • 通过Seata控制台可以观察到正在执行的事务信息和全局锁信息,并且在事务完成时删除相关信息。
  • 修改seata server的启动配置,端口为7091,数据库用mysql,不适用注册中心。配置文件路径为 seataconfapplication.yml (可以参考配置 seataconfapplication.example.yml)。
代码语言:yaml复制
server:
  port: 7091
spring:
  application:
    name: seata-server

logging:
  config: classpath:logback-spring.xml
  file:
    path: ${log.home:${user.home}/logs/seata}
  extend:
    logstash-appender:
      destination: 127.0.0.1:4560
    kafka-appender:
      bootstrap-servers: 127.0.0.1:9092
      topic: logback_to_logstash

console:
  user:
    username: seata
    password: seata
seata:
  config:
    # support: nacos, consul, apollo, zk, etcd3
    type: file
  registry:
    # support: nacos, eureka, redis, zk, consul, etcd3, sofa
    type: file
  store:
    # support: file 、 db 、 redis
    mode: db
    db:
      datasource: druid
      db-type: mysql
      driver-class-name: com.mysql.cj.jdbc.Driver
      url: jdbc:mysql://localhost:20020/seata?rewriteBatchedStatements=true&useSSL=true
      user: root
      password: 123456
      min-conn: 10
      max-conn: 100
      global-table: global_table
      branch-table: branch_table
      lock-table: lock_table
      distributed-lock-table: distributed_lock
      query-limit: 1000
      max-wait: 5000    
#  server:
#    service-port: 8091 #If not configured, the default is '${server.port}   1000'
  security:
    secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
    tokenValidityInMilliseconds: 1800000
    ignore:
      urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login

Seata 集成之客户端

  • 需要三个应用以上才能进行演示,根据seata文档搭建account-server、order-service、storage-service、business-service这四个应用。
  • 数据库采用mysql,微服务组件选型还是使用之前确定的组件。
  • 更新: 由于seata支持zookeeper依赖的zkClient不支持jdk17(springboot3的最低要求),目前seata无法在zookeeper注册中心使用。通过如下配置直连seata server。
代码语言:yaml复制
## 分布式事务相关配置
seata:
  enabled: true
  application-id: ${spring.application.name}
  tx-service-group: ${spring.application.name}-tx-group
  service:
    grouplist:
      default: 127.0.0.1:8091 # 需要是服务端端口,对应 seata.server.service-port
#    vgroup-mapping:
#      my_test_tx_group: seata-server # 此处配置对应Server端配置registry.eureka.application的值
  config:
    type: file
    file:
      name: "seata.properties"
  registry:
    type: file

引入pom.xml

  • 引入Seata 主要是引入 spring-cloud-starter-alibaba-seata
代码语言:xml复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>banana</artifactId>
        <groupId>io.rainforest</groupId>
        <version>1.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <packaging>pom</packaging>

    <artifactId>banana-seata-example</artifactId>
    <description>banana-seata-example 父工程</description>

    <modules>
        <module>account-server</module>
        <module>business-service</module>
        <module>order-service</module>
        <module>storage-service</module>
    </modules>

    <!-- 以下依赖 全局所有的模块都会引入  -->
    <dependencies>
        <!-- 引入分布式事务seata -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        </dependency>
    </dependencies>
</project>

修改配置

  • 新增配置文件 application.yml,配置主要是 seata 下面的配置。
代码语言:yaml复制
base:
  config:
    mdb:
      hostname: 127.0.0.1 #your mysql server ip address
      dbname: seata #your database name for test
      port: 20020 #your mysql server listening port
      username: 'root' #your mysql server username
      password: '123456' #your mysql server password

spring.application.name: account-service
spring:
  cloud:
    zookeeper:
      connect-string: localhost:2181
  datasource:
    name: storageDataSource
    #    druid don't support GraalVM now because of there is CGlib proxy
    #    type: com.alibaba.druid.pool.DruidDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://${base.config.mdb.hostname}:${base.config.mdb.port}/${base.config.mdb.dbname}?useSSL=false&serverTimezone=UTC
    username: ${base.config.mdb.username}
    password: ${base.config.mdb.password}
server:
  port: 10301
  servlet:
    context-path: /
# springdoc-openapi项目配置
springdoc:
  swagger-ui:
    path: /swagger-ui.html
    tags-sorter: alpha
    operations-sorter: alpha
  api-docs:
    path: /v3/api-docs
  group-configs:
    - group: 'default'
      paths-to-match: '/**'
      packages-to-scan: io.rainforest.banana.app.web
# knife4j的增强配置,不需要增强可以不配
knife4j:
  enable: true
  setting:
    language: zh_cn
  basic:
    enable: true
    # Basic认证用户名
    username: yulin
    # Basic认证密码
    password: 123yl.

## 分布式事务相关配置
seata:
  enabled: true
  application-id: ${spring.application.name}
  tx-service-group: ${spring.application.name}-tx-group
  service:
    grouplist:
      default: 127.0.0.1:8091 # 需要是服务端端口,对应 seata.server.service-port
  config:
    type: file
    file:
      name: "seata.properties"
  registry:
    type: file

seata.properties 的相关配置:

代码语言:shell复制
service.vgroupMapping.order-service-tx-group=default
service.vgroupMapping.account-service-tx-group=default
service.vgroupMapping.business-service-tx-group=default
service.vgroupMapping.storage-service-tx-group=default

修改启动类

  • 启动类不需要特殊修改。
代码语言:java复制
package io.rainforest.banana.client1;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;

@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class Application {
	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}
}

搭建演示应用

配置和pom依赖说明
  • application.yml 只有 server.port 有差异,其他配置相同,参考上述 修改配置 一节。
    • account-service 10301
    • business-service 10302
    • order-service 10303
    • storage-service 10304
  • pom.xml 全都相同,只有 artifactId 不一样,参考上述 引入pom.xml 一节。
  • 以下例子代码均来自 git/spring-cloud-alibaba/tree/2022.x/spring-cloud-alibaba-examples/seata-example ,部分例子代码单独编写。
account-server 搭建
代码语言:java复制
@RestController
public class AccountControllerDemo {

	private static final Logger LOGGER = LoggerFactory.getLogger(AccountControllerDemo.class);

	private static final String SUCCESS = "SUCCESS";

	private static final String FAIL = "FAIL";

	private final JdbcTemplate jdbcTemplate;

	private Random random;

	public AccountControllerDemo(JdbcTemplate jdbcTemplate) {
		this.jdbcTemplate = jdbcTemplate;
		this.random = new Random();
	}

	@PostMapping(value = "/account2", produces = "application/json")
	@GlobalTransactional(timeoutMills = 300000, name = "spring-cloud-demo-tx")
	public String account(String userId, int money) {
		LOGGER.info("Account Service ... xid: "   RootContext.getXID());

		int result = jdbcTemplate.update(
				"update account_tbl set money = money - ? where user_id = ?",
				new Object[] { money, userId });

		if (random.nextBoolean()) {
			throw new RuntimeException("this is a mock Exception");
		}

		LOGGER.info("Account Service End ... ");
		if (result == 1) {
			return SUCCESS;
		}
		return FAIL;
	}

}


@RestController
public class AccountControllerDemo {

	private static final Logger LOGGER = LoggerFactory.getLogger(AccountControllerDemo.class);

	private static final String SUCCESS = "SUCCESS";

	private static final String FAIL = "FAIL";

	private final JdbcTemplate jdbcTemplate;

	private Random random;

	public AccountControllerDemo(JdbcTemplate jdbcTemplate) {
		this.jdbcTemplate = jdbcTemplate;
		this.random = new Random();
	}

	@PostMapping(value = "/account2", produces = "application/json")
	@GlobalTransactional(timeoutMills = 300000, name = "spring-cloud-demo-tx")
	public String account(String userId, int money) {
		LOGGER.info("Account Service ... xid: "   RootContext.getXID());

		int result = jdbcTemplate.update(
				"update account_tbl set money = money - ? where user_id = ?",
				new Object[] { money, userId });

		if (random.nextBoolean()) {
			throw new RuntimeException("this is a mock Exception");
		}

		LOGGER.info("Account Service End ... ");
		if (result == 1) {
			return SUCCESS;
		}
		return FAIL;
	}

}


package io.rainforest.banana.app.web.demo;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.sql.DataSource;

/**
 * 初始化账户数据
 */
@Configuration
public class DatabaseConfiguration {
	@Bean
	public JdbcTemplate jdbcTemplate(DataSource dataSource) {
		JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

		jdbcTemplate.update("delete from account_tbl where user_id = 'U100001'");
		jdbcTemplate.update(
				"insert into account_tbl(user_id, money) values ('U100001', 10000)");

		return jdbcTemplate;
	}

}
  • 通过注解@GlobalTransactional开启事务控制。
  • 访问 http://localhost:10301/account2?userId=33&money=333 可以验证事务已开启成功。
  • 通过 seata server界面 http://localhost:7091/#/transaction/list 能够看到事务记录,账号密码 seata/seata 。
order-service 搭建
代码语言:java复制
@RestController
public class OrderController {

	private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);

	private static final String SUCCESS = "SUCCESS";

	private static final String FAIL = "FAIL";

	private static final String USER_ID = "U100001";

	private static final String COMMODITY_CODE = "C00321";

	private final JdbcTemplate jdbcTemplate;

	private final RestTemplate restTemplate;

	private Random random;

	public OrderController(JdbcTemplate jdbcTemplate, RestTemplate restTemplate) {
		this.jdbcTemplate = jdbcTemplate;
		this.restTemplate = restTemplate;
		this.random = new Random();
	}

	@PostMapping(value = "/order", produces = "application/json")
	public String order(String userId, String commodityCode, int orderCount) {
		LOGGER.info("Order Service Begin ... xid: "   RootContext.getXID());

		int orderMoney = calculate(commodityCode, orderCount);

		invokerAccountService(orderMoney);

		final Order order = new Order();
		order.userId = userId;
		order.commodityCode = commodityCode;
		order.count = orderCount;
		order.money = orderMoney;

		KeyHolder keyHolder = new GeneratedKeyHolder();

		int result = jdbcTemplate.update(new PreparedStatementCreator() {

			@Override
			public PreparedStatement createPreparedStatement(Connection con)
					throws SQLException {
				PreparedStatement pst = con.prepareStatement(
						"insert into order_tbl (user_id, commodity_code, count, money) values (?, ?, ?, ?)",
						PreparedStatement.RETURN_GENERATED_KEYS);
				pst.setObject(1, order.userId);
				pst.setObject(2, order.commodityCode);
				pst.setObject(3, order.count);
				pst.setObject(4, order.money);
				return pst;
			}
		}, keyHolder);

		order.id = keyHolder.getKey().longValue();

		if (random.nextBoolean()) {
			throw new RuntimeException("this is a mock Exception");
		}

		LOGGER.info("Order Service End ... Created "   order);

		if (result == 1) {
			return SUCCESS;
		}
		return FAIL;
	}

	private int calculate(String commodityId, int orderCount) {
		return 2 * orderCount;
	}

	private void invokerAccountService(int orderMoney) {
		String url = "http://127.0.0.1:10301/account";
		HttpHeaders headers = new HttpHeaders();
		headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);

		MultiValueMap<String, String> map = new LinkedMultiValueMap<String, String>();

		map.add("userId", USER_ID);
		map.add("money", orderMoney   "");

		HttpEntity<MultiValueMap<String, String>> request = new HttpEntity<MultiValueMap<String, String>>(
				map, headers);

		ResponseEntity<String> response = restTemplate.postForEntity(url, request,
				String.class);
	}

}


@Configuration
public class DatabaseConfiguration {
	@Bean
	public RestTemplate restTemplate() {
		return new RestTemplate();
	}
	@Bean
	public JdbcTemplate jdbcTemplate(DataSource dataSource) {
		JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

		jdbcTemplate.execute("TRUNCATE TABLE order_tbl");

		return jdbcTemplate;
	}

}

package io.rainforest.banana.app.web.demo;

import java.io.Serializable;

public class Order implements Serializable {

	/**
	 * id.
	 */
	public long id;

	/**
	 * user id.
	 */
	public String userId;

	/**
	 * commodity code.
	 */
	public String commodityCode;

	/**
	 * count.
	 */
	public int count;

	/**
	 * money.
	 */
	public int money;

	@Override
	public String toString() {
		return "Order{"   "id="   id   ", userId='"   userId   '''   ", commodityCode='"
				  commodityCode   '''   ", count="   count   ", money="   money   '}';
	}

}
storage-service 搭建
代码语言:java复制
@Configuration
public class DatabaseConfiguration {


//  druid don't support GraalVM now because of there is CGlib proxy
	/*@Bean
	@Primary
	@ConfigurationProperties("spring.datasource")
	public DataSource storageDataSource() {
		return new DruidDataSource();
	}*/

	@Bean
	public JdbcTemplate jdbcTemplate(DataSource dataSource) {

		JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

		jdbcTemplate.update("delete from storage_tbl where commodity_code = 'C00321'");
		jdbcTemplate.update(
				"insert into storage_tbl(commodity_code, count) values ('C00321', 100)");

		return jdbcTemplate;

	}

}

@RestController
public class StorageController {

	private static final Logger LOGGER = LoggerFactory.getLogger(StorageController.class);

	private static final String SUCCESS = "SUCCESS";

	private static final String FAIL = "FAIL";

	private final JdbcTemplate jdbcTemplate;

	public StorageController(JdbcTemplate jdbcTemplate) {
		this.jdbcTemplate = jdbcTemplate;
	}

	@GetMapping(value = "/storage/{commodityCode}/{count}", produces = "application/json")
	public String echo(@PathVariable String commodityCode, @PathVariable int count) {
		LOGGER.info("Storage Service Begin ... xid: "   RootContext.getXID());
		int result = jdbcTemplate.update(
				"update storage_tbl set count = count - ? where commodity_code = ?",
				new Object[] { count, commodityCode });
		LOGGER.info("Storage Service End ... ");
		if (result == 1) {
			return SUCCESS;
		}
		return FAIL;
	}

}
business-service 搭建
代码语言:java复制
@RestController
public class HomeController {

	private static final Logger LOGGER = LoggerFactory.getLogger(HomeController.class);

	private static final String SUCCESS = "SUCCESS";

	private static final String FAIL = "FAIL";

	private static final String USER_ID = "U100001";

	private static final String COMMODITY_CODE = "C00321";

	private static final int ORDER_COUNT = 2;

	private final RestTemplate restTemplate;

	private final OrderService orderService;

	private final StorageService storageService;

	public HomeController(RestTemplate restTemplate, OrderService orderService,
			StorageService storageService) {
		this.restTemplate = restTemplate;
		this.orderService = orderService;
		this.storageService = storageService;
	}

	@GlobalTransactional(timeoutMills = 300000, name = "spring-cloud-demo-tx")
	@GetMapping(value = "/seata/rest", produces = "application/json")
	public String rest() {

		String result = restTemplate.getForObject(
				"http://127.0.0.1:10304/storage/"   COMMODITY_CODE   "/"   ORDER_COUNT,
				String.class);

		if (!SUCCESS.equals(result)) {
			throw new RuntimeException();
		}

		String url = "http://127.0.0.1:10303/order";
		HttpHeaders headers = new HttpHeaders();
		headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);

		MultiValueMap<String, String> map = new LinkedMultiValueMap<String, String>();
		map.add("userId", USER_ID);
		map.add("commodityCode", COMMODITY_CODE);
		map.add("orderCount", ORDER_COUNT   "");

		HttpEntity<MultiValueMap<String, String>> request = new HttpEntity<MultiValueMap<String, String>>(
				map, headers);

		ResponseEntity<String> response;
		try {
			response = restTemplate.postForEntity(url, request, String.class);
		}
		catch (Exception exx) {
			throw new RuntimeException("mock error");
		}
		result = response.getBody();
		if (!SUCCESS.equals(result)) {
			throw new RuntimeException();
		}

		return SUCCESS;
	}

	@GlobalTransactional(timeoutMills = 300000, name = "spring-cloud-demo-tx")
	@GetMapping(value = "/seata/feign", produces = "application/json")
	public String feign() {

		String result = storageService.storage(COMMODITY_CODE, ORDER_COUNT);

		if (!SUCCESS.equals(result)) {
			throw new RuntimeException();
		}

		result = orderService.order(USER_ID, COMMODITY_CODE, ORDER_COUNT);

		if (!SUCCESS.equals(result)) {
			throw new RuntimeException();
		}

		return SUCCESS;

	}

}

业务测试验证

场景说明

business-service 主业务服务提供一个场景,主要是如下两个接口,业务场景一致,分别使用openfeign和restTemplate调用。

  • http://localhost:10302/seata/feign
  • http://localhost:10302/seata/rest
  1. openfeign调用
  • 调用storage-service接口,扣减库存
  • 调用order-service接口,保存订单
  • order-service调用account-server接口,扣减账户余额
  1. restTemplate调用
  • 调用storage-service接口,扣减库存
  • 调用order-service接口,保存订单
  • order-service调用account-server接口,扣减账户余额

事务场景:

  • 库存服务扣减库存,如果库存不足或者保存异常,则抛出异常,回滚
  • 订单服务保存订单,如果订单保存失败,则抛出异常,回滚
  • 账户服务扣减账户余额,如果账户余额不足或者保存异常,则抛出异常,回滚
启动
  • 启动seata-server,启动zookeeper注册中心。
  • 启动account-server、order-servicer、storage-service、business-service
  • 访问http://localhost:10302/seata/feign,验证分布式事务
  • 访问http://localhost:10302/seata/rest,验证分布式事务

通过控制台打印的 RootContext.getXID() 可以看出多个接口处于同一个事务。

关于作者

来自一线全栈程序员nine的探索与实践,持续迭代中。

欢迎关注、评论、点赞。

0 人点赞