Java 分布式解决方案

2022-09-02 09:54:04 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

文章目录

  • 一、基础知识
    • 1. CAP理论
    • 2. BASE理论
    • 3. 分布式共识算法
      • 3.1 Raft
        • leader election
        • Log Replication
  • 二、 分布式锁
    • 1. Redis 分布式锁
      • 1.1 加锁
      • 1.2 解锁
      • 1.3 Redisson
        • RLock
      • 1.4 总结
    • 2. ZooKeeper 分布式锁
      • 2.1 基本原理
      • 2.2 curator 实现
    • 3. 两者的对比
  • 三、 分布式事务
    • 1. 2PC 两阶段提交
    • 2. 3PC 三阶段提交
    • 3. Seata
    • 4. TCC 模式
    • 5. SAGA 模式 (最大努力通知)
    • 6. 可靠事件队列(可靠消息最终一致性)
    • 7. 总结
  • 四、接口幂等性
    • 1. 令牌机制
    • 2. 锁机制以及其他
  • 五、负载均衡

一、基础知识

1. CAP理论

CAP是 Consistency、Availability、Partition tolerance三个词语的缩写,分别表示一致性、可用性、分区容忍性

  • Consistency 一致性 代表数据在任何时刻、任何分布式节点中所看到的都是符合预期的。写操作后的读操作可以读取到最新的数据状态,当数据分布在多个节点上,从任意结点读取到的数据都是最新的状态。
  • Availability 可用性 可用性是指任何事务操作都可以得到响应结果,且不会出现响应超时或响应错误。
  • Partition tolerance 分区容忍性 通常分布式系统的各各结点部署在不同的子网,这就是网络分区,不可避免的会出现由于网络问题而导致结点之间通信失败,此时仍可对外提供服务,这叫分区容忍性。

总结:

一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容忍性(Partition tolerance)这三项中的两项。它可以作为我们进行架构设计、技术选型的考量标准。对于多数大型互联网应用的场景,结点众多、部署分散,而且现在的集群规模越来越大,所以节点故障、网络故障是常态,而且要保证服务可用性达到N个9(99.99…%),并要达到良好的响应性能来提高用户体验,因此一般都会做出如下选择:保证P和A,舍弃C强一致,保证最终一致性。

  • 如果放弃分区容忍性 CA(CA without P) 意味着我们将假设节点之间通信永远是可靠的。永远可靠的通信在分布式系统中必定不成立的,这不是你想不想的问题,而是只要用到网络来共享数据,分区现象就会始终存在。在现实中,最容易找到放弃分区容忍性的例子便是传统的关系数据库集群,这样的集群虽然依然采用由网络连接的多个节点来协同工作,但数据却不是通过网络来实现共享的。以 Oracle 的 RAC 集群为例,它的每一个节点均有自己独立的 SGA、重做日志、回滚日志等部件,但各个节点是通过共享存储中的同一份数据文件和控制文件来获取数据的,通过共享磁盘的方式来避免出现网络分区。因而 Oracle RAC 虽然也是由多个实例组成的数据库,但它并不能称作是分布式数据库。
  • 如果放弃可用性 CP(CP without A) 意味着我们将假设一旦网络发生分区,节点之间的信息同步时间可以无限制地延长,此时,问题相当于退化到前面“全局事务”中讨论的一个系统使用多个数据源的场景之中,我们可以通过 2PC/3PC 等手段,同时获得分区容忍性和一致性。在现实中,选择放弃可用性的 CP 系统情况一般用于对数据质量要求很高的场合中,除了 DTP 模型的分布式数据库事务外,著名的 HBase 也是属于 CP 系统,以 HBase 集群为例,假如某个 RegionServer 宕机了,这个 RegionServer 持有的所有键值范围都将离线,直到数据恢复过程完成为止,这个过程要消耗的时间是无法预先估计的。
  • 如果放弃一致性 AP(AP without C) 意味着我们将假设一旦发生分区,节点之间所提供的数据可能不一致。选择放弃一致性的 AP 系统目前是设计分布式系统的主流选择,因为 P 是分布式网络的天然属性,你再不想要也无法丢弃;而 A 通常是建设分布式的目的,如果可用性随着节点数量增加反而降低的话,很多分布式系统可能就失去了存在的价值,除非银行、证券这些涉及金钱交易的服务,宁可中断也不能出错,否则多数系统是不能容忍节点越多可用性反而越低的。目前大多数 NoSQL 库和支持分布式的缓存框架都是 AP 系统,以 Redis 集群为例,如果某个 Redis 节点出现网络分区,那仍不妨碍各个节点以自己本地存储的数据对外提供缓存服务,但这时有可能出现请求分配到不同节点时返回给客户端的是不一致的数据。

2. BASE理论

1. 理解强一致性和最终一致性

CAP理论告诉我们一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容忍性(Partition tolerance)这三项中的两项,其中AP在实际应用中较多,AP即舍弃一致性,保证可用性和分区容忍性,但是在实际生产中很多场景都要实现一致性,比如前边我们举的例子主数据库向从数据库同步数据,即使不要一致性,但是最终也要将数据同步成功来保证数据一致,这种一致性和CAP中的一致性不同,CAP中的一致性要求在任何时间查询每个结点数据都必须一致,它强调的是强一致性,但是最终一致性是允许可以在一段时间内每个结点的数据不一致,但是经过一段时间每个结点的数据必须一致,它强调的是最终数据的一致性。

2. BASE 理论介绍 BASE 是 Basically Available(基本可用)、Soft state(软状态)和 Eventually consistent (最终一致性)三个短语的缩写。BASE理论是对CAP中AP的一个扩展,通过牺牲强一致性来获得可用性,当出现故障允许部分不可用但要保证核心功能可用,允许数据在一段时间内是不一致的,但最终达到一致状态。满足BASE理论的事务,我们称之为“柔性事务”。

  • 基本可用: 分布式系统在出现故障时,允许损失部分可用功能,保证核心功能可用。如,电商网站交易付款出现问题了,商品依然可以正常浏览。
  • 软状态: 由于不要求强一致性,所以BASE允许系统中存在中间状态(也叫软状态),这个状态不影响系统可用性,如订单的”支付中”、“数据同步中”等状态,待数据最终一致后状态改为“成功”状态。
  • 最终一致: 最终一致是指经过一段时间后,所有节点数据都将会达到一致。如订单的”支付中”状态,最终会变为“支付成功”或者”支付失败”,使订单状态与实际交易结果达成一致,但需要一定时间的延迟、等待。

3. 分布式共识算法

3.1 Raft

Raft 是一种一致性协议,相对于Paxos 相对简单一些。

主要分为3个子问题解决:

  • leader election
  • log replication
leader election

Raft的所有节点分为三种状态,Leader、Follower 和 Candidate。

如何触发选举

  • Leader 周期性的发送心跳包(RPC请求)给所有 Follower 节点。
  • 如果 Follower 在周期内没有收到心跳包,则发起选举。

选举流程

  • Follower 发起重新选举,把 term 1 代表新的一轮,然后变成 Candidate 状态。
  • 首先给自己投票,然后像其他节点发送 RequestVote Rpc 收集投票。
  • 其他节点如果没有投票就会投出给他。
    • 如果发现日志比他更新,则拒绝投票。
  • 如果超过半数的节点都投票给该节点,则该节点就会变成新Leader。
  • 一个 Term 只会产生一个 Leader ,如果没有选举出Leader就会进入下一轮。
  • 老的Leader如果重连回集群,发现term比他的大,就会更新term并变成Follower。
Log Replication

Raft 的日志记录了操作内容,每一个模块的数据结构是一个 entry,包括三个部分。

  • Term:请求时 leader 的term
  • Index:索引,也就是当前日志在 logs 中的位置
  • Command:包含客户端的请求指令

复制过程

其实就类似于一个二阶段提交的过程。

  • leader 将客户端的请求指令组成一个新的log条目添加到本地的log中,然后发送包含最新log 的rpc 给其他的follower(通过AppendEntries rpc)
  • 然后如果超过一半的 Follower 的执行RPC成功,将 log 写入之后,则代表本次复制成功,完成 commit。
  • 出现日志不一致的情况则以 Leader 为准。

二、 分布式锁

1. Redis 分布式锁

分布式锁的基本原理,就是向同一个地方获取锁,如果能获取则可以继续访问。

使用 Redis 分布式锁的基本,就是将 Redis 中使用 SET 命令存放一个一个key,使用这个命令时,库中没有该键则插入成功,有的话则返回失败,意味着没有占到锁。

1.1 加锁

一般使用该命令进行操作,设置 SET 一个键值 NX 表示原库中没有则加入成功。并且可以原子性的设置过期时间。

代码语言:javascript复制
SET key value [EX seconds] [PX milliseconds] [NX|XX]

设置过期时间是因为,如果加锁成功之后服务器宕机,则无法删除锁造成死锁,所以要设置过期时间。

对于的 Java 描述如下:

  • 添加的 key 是事先在多个微服务节点统一的KEY。
  • value 值为 uuid 当前线程ID 是为了能够在删除锁的时候,检查是否是自己的锁。因为如果该进程执行业务耗费了很长的时间,超过了锁的过期时间,锁已经过期,别的进程抢占生成了新的锁,而之前的锁删除操作可能删除新的锁造成混乱。
代码语言:javascript复制
String value = UUIDUtil.uuid()  Thread.currentThread().getName();
redisTemplate.opsForValue().setIfAbsent(KEY, value , 10, TimeUnit.SECONDS);

1.2 解锁

解锁的时候,需要先检查是否是自己的锁,如果是则删除。

但是以下这种方式,显然是错误的,因为获取值,比较和删除,这三个操作不是原子操作,可能在获取和比较的时候是当前 value 但是删除的时候,已经改变了。

代码语言:javascript复制
        String lockValue = (String) redisTemplate.opsForValue().get(KEY);
        if (lockValue.equals(value)) { 
   
            redisTemplate.delete(KEY);
        }

所以要通过 Redis 和 LUA 脚本进行一个原子操作。Redis 官网也演示了该解锁脚本:可以添加两个参数,一个是 KEYS[1] 表示想要删除的键,ARGV[1] 表示如果该键对应的 value 是这个参数的值才进行删除。

正确的删除写法:

代码语言:javascript复制
        String script =
                "if redis.call('get',KEYS[1]) == ARGV[1]"  
                "then"  
                    "return redis.cal1 ('del',KEYS[1])"  
                "else"  
                    "return 0"  
                "end";
        /** * 传入的参数 1. RedisScript<T> script 构造一个 DefaultRedisScript 传入执行的脚本和返回值类型。 * 2. List<K> keys, 代表脚本中的 KEYS[1] 参数,是一个链表,表示删除的键 * 3. Object... args,代表 ARGV[1] 参数,是一个动态数组和 keys 一一对应,表示要删除的值 */
        redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class),
                Arrays.asList(KEY),
                value);

这种方法也存在问题,就是当执行业务时间很长的情况下,锁会过期,会导致多个进程进入,并且锁也不能重入。

1.3 Redisson

Redisson 相当于实现了分布式环境下的JUC。

使用可以参照官方文档:https://github.com/redisson/redisson/

RLock
代码语言:javascript复制
    public void test() { 
   
        RLock lock = redissonClient.getLock(KEY);
        try { 
   
            lock.lock();
            // ...
            
        }catch (Exception e) { 
   
            e.printStackTrace();
        }finally { 
   
            lock.lock();
        }
    }
代码语言:javascript复制
public interface RLock extends Lock
  • 可以使用 getLock 方法通过键名获取到对应的锁,如果键名一样,则在分布式系统中是同一把锁。
  • 并且获取到的 RLock 实现了 Lock 接口。可以很方便的使用 lock 和 unlock 进行加锁和解锁。

lock( ) 方法

  • 如果指定了超时时间:就直接通过 Redis 执行器执行一段LUA脚本,过期则删除对应的 key。
代码语言:javascript复制
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { 
   
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then "  
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); "  
                        "redis.call('pexpire', KEYS[1], ARGV[1]); "  
                        "return nil; "  
                        "end; "  
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then "  
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); "  
                        "redis.call('pexpire', KEYS[1], ARGV[1]); "  
                        "return nil; "  
                        "end; "  
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
  • 如果没有指定超时时间:
代码语言:javascript复制
    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { 
   
        // 指定超时时间则走这个if,也就是直接设置一个超时时间,不会续期
        if (leaseTime != -1) { 
   
            return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        // 没指定超时时间就通过 getLockWatchdogTimeout() 获取超时时间
        // 也就是 private long lockWatchdogTimeout = 30 * 1000; 【30s】
        // 然后通过 Redis LUA 脚本设置30s的过期时间
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> { 
   
            if (e != null) { 
   
                return;
            }
            // 如果时间到期了,会执行该方法
            // 该方法中又主要有一个 renewExpiration();方法
            // 这个方法会创建一个 TimeTask 定时任务,每 internalLockLeaseTime / 3 【10s】进行一次,给该锁续期。
            if (ttlRemaining == null) { 
   
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

unlock( ) 方法

就是简单的运行一个异步任务,使用 LUA 脚本删除该键值对。

另外,Redisson 还实现了很多JUC包下的组件,例如 ReadWriteLock,CountDownLatch,Semaphore等,这些组件原本在 jdk 中采用AQS,在分布式环境中就用 Redis 的键值对代替了原本的 state 变量,另外,因为采用LUA脚本所以能保证操作的原子性。

1.4 总结

Redis 实现分布式锁,主要就是让所有进程都去同一个地方抢占锁,如果抢到就能继续执行程序。

  • 加锁操作,通过 SET NX 指令可以原子性的设置锁和超时时间,该指令在没有该键值对的时候才能插入成功,插入成功则代表获得锁,另外,设置超时时间是为了该进程加锁之后,服务器意外宕机导致锁无法删除而造成死锁,所以两个操作必须是原子操作。
  • 解锁操作,需要先检查该锁是否是该进程添加的,如果是,则删除该锁,同样这两个操作也要保证是原子操作,所以采用 LUA 脚本实现。

2. ZooKeeper 分布式锁

2.1 基本原理

利用 ZooKeeper 实现分布式锁的方式和 Redis 类似,在 Zookeeper 中加入相同前缀的临时顺序节点

如果是顺序最小的节点,则可以获取锁,如果不是,则注册Watcher,监听比自己序号小的节点,如果序号小的节点删除,则监听他的节点可以被唤醒获取锁。

2.2 curator 实现

  • 在容器中加入操作Zookeeper 客户端的集成框架 curator 。
代码语言:javascript复制
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.WatchedEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

/** * @Description * @Date 2021/9/10 20:18 * @author: A.iguodala */
@Configuration
public class CuratorFrameworkConfig { 
   


    /** * 创建操作 Zookeeper 客户端框架 * @return */
    @Bean
    public CuratorFramework curatorFramework() { 
   

        // ExponentialBackoffRetry是种重连策略,每次重连的间隔会越来越长,1000毫秒是初始化的间隔时间,3代表尝试重连次数。
        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3);
        // 创建client
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("121.196.166.231:2181", retry);
        // 添加watched 监听器
        curatorFramework.getCuratorListenable().addListener(new CuratorListener() { 
   
            @Override
            public void eventReceived(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { 
   
                CuratorEventType type = curatorEvent.getType();
                if (type == CuratorEventType.WATCHED) { 
   
                    WatchedEvent watchedEvent = curatorEvent.getWatchedEvent();
                    String path = watchedEvent.getPath();
                    System.out.println(watchedEvent.getType()   " -- "   path);
                    // 重新设置改节点监听
                    if (null != path) { 
   
                        curatorFramework.checkExists().watched().forPath(path);
                    }
                }
            }
        });
        curatorFramework.start();
        return curatorFramework;
    }
}
  • 然后可以通过 InterProcessSemaphoreMutex 类进行加锁。
代码语言:javascript复制
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/** * @Description * @Date 2021/9/10 20:36 * @author: A.iguodala */
@RestController
@Slf4j
public class LockTestController { 
   

    /** * 加锁节点 */
    private final String lockPath = "/lock/test";

    /** * 操作 Zookeeper 客户端 */
    @Autowired
    private CuratorFramework curatorFramework;
    
    @GetMapping("/test01")
    public String test() { 
   
        // 创建锁
        InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(curatorFramework, lockPath);
        try { 
   
            // 获取锁
            lock.acquire();
        } catch (Exception e) { 
   
            e.printStackTrace();
        }finally { 
   
            try { 
   
                lock.release();
            } catch (Exception e) { 
   
                e.printStackTrace();
            }
        }
        return "OK";
    }
}

3. 两者的对比

  • Redis 分布式锁实现,例如Redisson,当没有获取锁时会一直自旋,直到获取锁,而Zookeeper 则没有获取锁就只监听上一个节点,不需要要一直占用 CPU。
  • Zookeeper 保证了通过单一Leader 节点以及分布式共识保证了强一致性,而Redis 不能。
  • 但是Zookeeper 由于增删节点都需要Leader节点完成并广播给其他节点,所以比较耗时,并发度不够。
  • 综上,在可靠性要求高的情况下使用Zookeeper,而并发量大的情况下使用Redis。

三、 分布式事务

在分布式系统中,各个节点之间在物理上相互独立,通过网络进行沟通和协调。由于存在事务机制,可以保证每个独立节点上的数据操作可以满足ACID。但是,相互独立的节点之间无法准确的知道其他节点中的事务执行情况。所以不知道该事务到底应该提交还是回滚。常规的解决办法就是引入一个事务协调器的组件来统一调度所有分布式节点的执行。

1. 2PC 两阶段提交

二阶段提交的算法思路可以概括为:执行事务程序将操作成败通知事务管理器,再由管理器根据所有参与事务者的反馈情况决定各参与者是否要提交操作还是混滚操作。

两阶段分为:

  • 准备阶段
    • 事务管理器向所有事务参与者(资源管理器)发送一个 prepare 的请求,询问是否可以提交操作。
    • 各个事务执行操作,将操作写入 undo log 和 redo log。
    • 之后向事务管理器发送应答响应,如果成功执行事务就返回提交信息,如果失败就返回回滚。
  • 提交阶段
    • 提交阶段事务管理器根据多个事务参与者返回的消息,进行提交操作或者回滚操作。

缺点:

  • 同步阻塞:执行过程中,所有参与节点都是事务阻塞型的。
  • 单点故障:由于事务管理器十分重要,如果在执行过程中,事务管理器宕机,那么每个节点的事务就会一直阻塞。
  • 数据不一致:如果在事务管理器发送提交请求之后,由于网络原因没有到达某个事务参与者,则该事务就没有提交数据而造成的数据的不一致。

2. 3PC 三阶段提交

三阶段提交主要就是对二阶段提交的改进,主要改动了两个方面:

  1. 引入超时机制。同时在协调者和参与者中都引入超时机制。
  2. 在第一阶段和第二阶段中插入一个准备阶段。保证了在最后提交阶段之前各参与节点的状态是一致的。

三阶段提交主要分为三个阶段:

  • CanCommit阶段
    • 和二阶段提交准备阶段一样,发出事务请求,每个节点开始执行任务。
  • PreCommit阶段
    • 事务执行完成之后,所有事务给事务管理器发送完成响应。
    • 管理器接收到之后,进行一次预提交。
    • 如果所有事务都提交成功,则返回对应的ACK。
  • doCommit阶段
    • 事务管理器只有接收到所有的ACK才会提交事务,不然就会回滚。

3. Seata

Seata 主要有三个组件

  • TC - 事务协调者
    • 维护全局和分支事务状态,驱动全局事务提交或者回滚,类似于二阶段提交的事务管理器。
  • TM - 事务管理器
    • 控制全局事务的范围,开始全局事务或者,结束的时候提交或者回滚事务。相当于剥离了原本的控制事务状态的功能交给TC,自己只执行全局事务的具体操作。
  • RM - 资源管理器
    • 管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。相当于管理本地事务以及和 TC 进行交流自己的事务状态。

大致工作流程:

  1. 事务管理器 TM 可以向事务协调器 TC 开启一个全局事务,然后在 TC 中生成一个唯一的事务 ID。
  2. 各个分支事务资源管理器 RM 可以向 TC 注册开启自己节点的分支事务,并向 TC 报告状态。
  3. TC 会接收到所有分支事务的状态,如果有一个回滚则通过 TM 对该事务ID下的所有分支事务进行回滚。
  4. 如果全部提交成功,则提交成功。另外,每个分支事务在自己提交之后就完成提交,并不会阻塞等待。

AT 模式使用

AT 即,auto, 自动事务提交回滚的模式。只需要在总方法上加上一个 @GlobalTransactional 注解就能完成需求。

  1. 首先需要给分布式事务中的分支事务加上一个数据库表,因为分支事务会自己提交,不能使用本事务的undo log 进行回滚。
代码语言:javascript复制
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
  1. 下载 seata 和修改配置导入依赖。
    • seata 下载地址
    • 主要修改conf 目录下的 file.conf 和 register.conf
  2. 让 seata 代理自己的数据源
代码语言:javascript复制
import com.zaxxer.hikari.HikariDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import javax.sql.DataSource;

/** * @Description * @Date 2021/9/2 13:54 * @author: A.iguodala */
@Configuration
public class SeataConfig { 
   

    /** * 首先获取到数据源的默认配置信息 */
    @Autowired
    private DataSourceProperties dataSourceProperties;

    @Bean
    public DataSource dataSource() { 
   
        // 构造对应数据源的DataSource
        HikariDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
        if (StringUtils.hasText(dataSourceProperties.getName())) { 
   
            dataSource.setPoolName(dataSourceProperties.getName());
        }
        // 返回包装后的代理对象
        return new DataSourceProxy(dataSource);
    }
}

4. TCC 模式

TCC 是另一种常见的分布式事务机制,它是“Try-Confirm-Cancel”三个单词的缩写。

就是 3PC 三阶段提交的一种具体实现。

  • Try:尝试执行阶段,完成所有业务可执行性的检查(保障一致性),并且预留好全部需用到的业务资源(保障隔离性)。
  • Confirm:确认执行阶段,不进行任何业务检查,直接使用 Try 阶段准备的资源来完成业务处理。Confirm 阶段可能会重复执行,因此本阶段所执行的操作需要具备幂等性。
  • Cancel:如果发生异常或者需要回滚,则取消执行阶段,释放 Try 阶段预留的业务资源。Cancel 阶段可能会重复执行,也需要满足幂等性。

后两个阶段都是必须成功的阶段,所以在失败后会进行重试,所以要保证幂等性。

5. SAGA 模式 (最大努力通知)

SAGA 事务主要是为了解决 TCC 事务的业务侵入性很强的问题,例如在美团点了外卖想使用支付宝付款,但是支付宝不可能让美团对其代码进行侵入,所以 try 阶段可能就无法实施。

SAGA 模式将一个大事务差分成很多个小事务,并且通过补偿的机制来代替回滚:

  • 正向恢复(Forward Recovery):如果 Ti事务提交失败,则一直对 Ti进行重试,直至成功为止(最大努力交付)。这种恢复方式不需要补偿,适用于事务最终都要成功的场景,譬如在别人的银行账号中扣了款,就一定要给别人发货。正向恢复的执行模式为:T1,T2,…,Ti(失败),Ti(重试)…,Ti 1,…,Tn。
  • 反向恢复(Backward Recovery):如果 Ti事务提交失败,则一直执行 Ci对 Ti进行补偿,直至成功为止(最大努力交付)。这里要求 Ci 必须(在持续重试后)执行成功。反向恢复的执行模式为:T1,T2,…,Ti(失败),Ci(补偿),…,C2,C1。

6. 可靠事件队列(可靠消息最终一致性)

可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。一般采用消息中间件来完成。

例如,商品消费扣款的操作和生成订单的操作:(两个操作的运行顺序通常安排成最容易出错的最先进行,可以减少执行次数和占用资源。)

  • 在进行扣款成功之后,写入一张消息表,存储了事务的ID,事务的状态等信息(进行中)。
  • 让消息系统服务定时轮询该表,将进行中还没有完成的消息发送给订单服务,如果没有完成就一直重发。
  • 订单服务在处理完消息之后给消息系统发送消息表示事务完成,更改事务状态。
  • 为防止消息在网络中消失而造成消息系统重复发送信息,导致消费者重复消费,也就是为了保证幂等性,该消费者服务也需要维护一张消息表,表示处理过的消息,在消息消费之前,先检查消息表,如果处理过则直接返回成功消息。

7. 总结

分布式系统中,每个本地事务可以保证自己的ACID,但是对于其他事务的执行情况是不可知的,所以需要分布式事务的解决方案,一般会采用加入一个事务协调器来进行统一协调。

具体的解决方案主要包括:2PC3PCTCCSAGA可靠事件队列 等方式实现。

四、接口幂等性

保证接口幂等

接幂等性就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。比如说支付场景,用户购买了商品支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额返发现多扣钱了,流水记录也变成了两条,这就没有保证接口的幂等性。

1. 令牌机制

通过分析哪些业务是存在幂等问题的,就需要在执行业务之前获取令牌,服务器将令牌保存在 Redis 中,第一次调用时,会删除该令牌,之后的操作发现 Redis 中已经不存在该令牌则直接返回,典型的该机制实现就是验证码。

对于令牌的删除应该采用先删除令牌再执行逻辑的顺序,因为如果先执行业务,则可能造成多个请求都验证通过而执行业务,另外,令牌从 Redis 的取,比较,删除三个操作应该是原子操作。所以应该采用LUA脚本来实现。

代码语言:javascript复制
if redis.call('get',KEYS[1]) == ARGV[1]
then
    return redis.call('del', KEYS[1])
else 
    return 0
end

2. 锁机制以及其他

  • 数据库锁
    • 对于查询场景,可以采用悲观锁,通过select 加上 for update 来进行锁定,但是查询操作本身就是幂等的(删除操作和通过唯一主键进行插入的操作也是)
    • 对于修改场景,则可以使用乐观锁,就是带上version 版本号,每次对某个业务进行操作的时候,先获取版本号,进行一次操作就对版本号进行 1 操作,每次只处理版本号 1 的操作。
  • 分布式锁
    • 多台机器的操作就可以采用分布式锁,多次请求只有获得锁的操作可以继续执行,并且每次执行操作前先获取该操作是否已经完成处理。
  • 唯一约束
    • 数据库通过建立唯一索引来保证插入数据行的唯一性。
    • 可以通过 redis 的 set 来确保该操作是否已经进行。比如上传文件的幂等性,如果一个文件上传就会在Redis的Set中生成一个散列值,下一次上传就会先查询是否有相同的散列值,如果有就直接返回。
  • 防重表
    • 在消息队列消费者避免重复消费起了很大的作用,每次的操作往防重表中插入一条数据,每次执行业务之前先检查防重表中是否有该数据,有则直接返回。
  • 全局请求唯一ID
    • 调用接口时,生成一个唯一ID来判断是否重复。

具体使用哪种要根据具体的业务具体判断。

五、负载均衡

  • 轮循均衡(Round Robin):每一次来自网络的请求轮流分配给内部中的服务器,从 1 至 N 然后重新开始。此种均衡算法适合于集群中的所有服务器都有相同的软硬件配置并且平均服务请求相对均衡的情况。
  • 权重轮循均衡(Weighted Round Robin):根据服务器的不同处理能力,给每个服务器分配不同的权值,使其能够接受相应权值数的服务请求。譬如:服务器 A 的权值被设计成 1,B 的权值是 3,C 的权值是 6,则服务器 A、B、C 将分别接收到 10%、30%、60%的服务请求。此种均衡算法能确保高性能的服务器得到更多的使用率,避免低性能的服务器负载过重。 随机均衡(Random):把来自客户端的请求随机分配给内部中的多个服务器,在数据足够大的场景下能达到相对均衡的分布。
  • 权重随机均衡(Weighted Random):此种均衡算法类似于权重轮循算法,不过在分配处理请求时是个随机选择的过程。
  • 一致性哈希均衡(Consistency Hash):根据请求中某一些数据(可以是 MAC、IP 地址,也可以是更上层协议中的某些参数信息)作为特征值来计算需要落在的节点上,算法一般会保证同一个特征值每次都一定落在相同的服务器上。一致性的意思是保证当服务集群某个真实服务器出现故障,只影响该服务器的哈希,而不会导致整个服务集群的哈希键值重新分布。
  • 响应速度均衡(Response Time):负载均衡设备对内部各服务器发出一个探测请求(例如 Ping),然后根据内部中各服务器对探测请求的最快响应时间来决定哪一台服务器来响应客户端的服务请求。此种均衡算法能较好的反映服务器的当前运行状态,但这最快响应时间仅仅指的是负载均衡设备与服务器间的最快响应时间,而不是客户端与服务器间的最快响应时间。
  • 最少连接数均衡(Least Connection):客户端的每一次请求服务在服务器停留的时间可能会有较大的差异,随着工作时间加长,如果采用简单的轮循或随机均衡算法,每一台服务器上的连接进程可能会产生极大的不平衡,并没有达到真正的负载均衡。最少连接数均衡算法对内部中需负载的每一台服务器都有一个数据记录,记录当前该服务器正在处理的连接数量,当有新的服务连接请求时,将把当前请求分配给连接数最少的服务器,使均衡更加符合实际情况,负载更加均衡。此种均衡策略适合长时处理的请求服务,如 FTP 传输。

参考:

《凤凰架构 》| 周志明

https://www.hollischuang.com/archives/2591

https://seata.io/zh-cn/

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/139999.html原文链接:https://javaforall.cn

0 人点赞