分布式系统技术全栈

2021-07-22 10:09:57 浏览数 (1)

0x01: 分布式系统

根据分布式的CAP理论我们了解“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。”

所以我们在系统设计之初就分析调研,根据分析调研结果对这三者做取舍。

借鉴在主流互联网的经验,都牺牲强一致性来换取系统的高可用性,系统的“一致性”只保证“最终一致性”,这个最终时间需要在可控可接受的范围内。很多场景下,我们为了保证最终一致性,都会做很多技术方案来支持,比如分布式事务、分布式锁。

0x02: SSO单点登录

Single Sign On 在多个应用系统中,只需要登录一次,就可以访问其他相互信任的应用系统。一处登录,处处登录;一处注销,处处注销。

父域和子域:总公司名为1xuetang.cn就是父域,sz.1xuetang.cn sh.1xuetang.cn bj.1xuetang.cn这三个就是子域。

顶级域名 www.imooc.com 和 *.imooc.com 的cookie值是可以共享的,可以被携带

比如设置为 .imooc.com , .t.mukewang.com ,如此是OK的。

二级域名自己的独立cookie是不能共享的,不能被其他二级域名获取,比如:music.imooc.com 的cookie是不能被 mtv.imooc.com 共

互不影响,要共享必须设置为 .imooc.com 。

  • JWT方案

同一公司,同父域下的单点登录解决方案

JWT,可以储存在 Cookie 里面,也可以储存在 localStorage。

此后,客户端每次与服务器通信,都要带上这个 JWT。你可以把它放在 Cookie 里面自动发送,但是这样不能跨域,所以更好的做法是放在 HTTP 请求的头信息Authorization字段里面。

// Authorization: Bearer <token>

  • CAS方案

同一公司,不同域下的单点登录解决方案.

  • Oauth2方案

不同公司之间,不同域下的 第三方登录功能实现。

OAuth2并不是一个SSO框架,但可以实现SSO功能。

OAuth2服务端负责令牌的发放等操作,令牌可以采用JWT,也就是说JWT是用来承载用户的Access_Token的

用Oauth2方案来实现SSO的时候是不需要资源服务器这个角色的,有授权服务器和客户端就够了。

  • SSO服务端和SSO客户端直接是通过授权以后发放Token的形式来访问受保护的资源
  • 相对于浏览器来说,业务系统是服务端,相对于SSO服务端来说,业务系统是客户端
  • 浏览器和业务系统之间通过会话正常访问
  • 不是每次浏览器请求都要去SSO服务端去验证,只要浏览器和它所访问的服务端的会话有效它就可以正常访问

OAuth2中的几个重要概念:

  • resource owner: 拥有被访问资源的用户
  • user-agent: 一般来说就是浏览器
  • client: 第三方应用
  • Authorization server: 认证服务器,用来进行用户认证并颁发token
  • Resource server:资源服务器,拥有被访问资源的服务器,需要通过token来确定是否有权限访问

授权模式

1 简化模式(Implicit)

2 授权码模式(Authorization Code)

3 密码模式(Resource Owner Password Credentials Grant)

4 客户端模式(Client Credentials)

通过将用户信息这个资源设置为被保护资源,可以使用OAuth2技术实现单点登陆(SSO),而Spring Security OAuth2就是这种OAuth2 SSO方案的一个实现。

OAuth 2.0:Bearer Token、MAC Token区别

token_type 是访问令牌生成对授权服务器的调用中的参数,其实质上表示如何为资源访问调用生成和呈现access_token . 您在访问令牌生成调用中向授权服务器提供token_type .

如果您给出 Bearer (大多数实现时为默认值),则会生成 access_token 并将其发送给您 . 持票人可以简单地理解为"give access to the bearer of this token."一个有效的令牌并且没有问题 . 另一方面,如果选择Mac和 sign_type (在大多数实现中默认为 hmac-sha-1 ),则生成访问令牌并将其作为属性保存在密钥管理器中,并将加密密钥作为 access_token 发回 .

区别项

Bearer Token

MAC Token

1

(优点) 调用简单,不需要对请求进行签名。

(优点) 不依赖https协议,无协议加密带来的性能开销。

2

(缺点) 请求API需要使用https协议保证信息传输安全。

3

(缺点) Access Token有效期一个月,过期后需要使用Refresh Token进行刷新。

(优点) Access Token长期有效,无需使用Refresh Token刷新。

4

(缺点)需要进行MAC计算。

0x03: 分库分表

  • 数据切分

数据切分,简单的说,就是通过某种条件,将我们之前存储在一台数据库上的数据,分散到多台数据库中,从而达到降低单台数据库负载的效果。数据切分,根据其切分的规则,大致分为两种类型,垂直切分和水平切分。

  • 垂直切分

垂直切分就是按照不同的表或者Schema切分到不同的数据库中,比如:在我们的课程中,订单表(order)和商品表(product)在同一个数据库中,而我们现在要对其切分,使得订单表(order)和商品表(product)分别落到不同的物理机中的不同的数据库中,使其完全隔离,从而达到降低数据库负载的效果。

优点:

拆分后业务清晰,拆分规则明确;

系统之间容易扩展和整合;

数据维护简单

缺点:

部分业务表无法join,只能通过接口调用,提升了系统的复杂度;

跨库事务难以处理;

垂直切分后,某些业务数据过于庞大,仍然存在单体性能瓶颈;

  • 水平切分

水平切分相比垂直切分,更为复杂。它需要将一个表中的数据,根据某种规则拆分到不同的数据库中,例如:订单尾号为奇数的订单放在了订单数据库1中,而订单尾号为偶数的订单放在了订单数据库2中。这样,原本存在一个数据库中的订单数据,被水平的切分成了两个数据库。在查询订单数据时,我们还要根据订单的尾号,判断这个订单在数据库1中,还是在数据库2中,然后将这条SQL语句发送到正确的数据库中,查出订单。

几种水平拆分的典型的分片规则:

用户id求模,我们前面已经提到过;

按照日期去拆分数据;

按照其他字段求模,去拆分数据;

优点:

解决了单库大数据、高并发的性能瓶颈;

拆分规则封装好,对应用端几乎透明,开发人员无需关心拆分细节;

提高了系统的稳定性和负载能力;

缺点:

拆分规则很难抽象;

分片事务一致性难以解决;

二次扩展时,数据迁移、维护难度大。比如:开始我们按照用户id对2求模,但是随着业务的增长,2台数据库难以支撑,还是继续拆分成4个数据库,那么这时就需要做数据迁移了。

多数据源管理问题

针对多数据源的管理问题,主要有两种思路:

客户端模式,在每个应用模块内,配置自己需要的数据源,直接访问数据库,在各模块内完成数据的整合;

中间代理模式,中间代理统一管理所有的数据源,数据库层对开发人员完全透明,开发人员无需关注拆分的细节。

基于这两种模式,目前都有成熟的第三方软件,接下来在我们的视频中,会分别给大家介绍这两种模式的代表作:

  • 中间代理模式:MyCat

https://www.jianshu.com/p/f81422b1c915 数据库中间件Mycat SpringBoot完成分库分表

https://www.jianshu.com/p/c6e29d724fca MyCat 看了这篇什么都会了

用户配置文件:server.xml

最重要的配置文件mycat/conf/schema.xml:

代码语言:javascript复制
<?xml version="1.0"?>
<!DOCTYPE mycat:schema SYSTEM "schema.dtd">
<mycat:schema xmlns:mycat="http://io.mycat/">

    <schema name="TESTDB" checkSQLschema="false" sqlMaxLimit="100">
            <table name="user" primaryKey="id" dataNode="dn01,dn02" rule="rule1" />  
    </schema>

    <!-- 设置dataNode 对应的数据库,及 mycat 连接的地址dataHost -->  
    <dataNode name="dn01" dataHost="dh01" database="db01" />  
    <dataNode name="dn02" dataHost="dh01" database="db02" />   

    <!-- mycat 逻辑主机dataHost对应的物理主机.其中也设置对应的mysql登陆信息 -->  
    <dataHost name="dh01" maxCon="1000" minCon="10" balance="0" writeType="0" dbType="mysql" dbDriver="native">  
            <heartbeat>select user()</heartbeat>  
            <writeHost host="server1" url="127.0.0.1:3306" user="root" password="WolfCode_2017"/>  
    </dataHost> 
</mycat:schema>

Balance参数设置:

1. balance=“0”, 所有读操作都发送到当前可用的writeHost上。

2. balance=“1”,所有读操作都随机的发送到readHost。

3. balance=“2”,所有读操作都随机的在writeHost、readhost上分发

WriteType参数设置:

1. writeType=“0”, 所有写操作都发送到可用的writeHost上。

2. writeType=“1”,所有写操作都随机的发送到readHost。

3. writeType=“2”,所有写操作都随机的在writeHost、readhost分上发。

需要注意:配置balance为读写分离的前提是配置了Mysql的主从复制。

Mysql主从复制 https://www.jianshu.com/p/faf0127f1cb2

详解Mysql主从同步配置实战 https://www.imooc.com/article/44470

切分规则 mycat/conf/rule.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mycat:rule SYSTEM "rule.dtd">
<mycat:rule xmlns:mycat="http://io.mycat/">
    <tableRule name="rule1">
        <rule>
            <columns>id</columns>
            <algorithm>mod-long</algorithm>
        </rule>
    </tableRule>
    <function name="mod-long" class="io.mycat.route.function.PartitionByMod">
        <!-- how many data nodes -->
        <property name="count">2</property>
    </function>
</mycat:rule>

这里定义的是切分规则,是按照id列进行切分,切分规则是采取取模的方式

springboot配置

代码语言:javascript复制
#配置数据源
spring.datasource.druid.driver-class-name=com.mysql.jdbc.Driver
#这里配置的是Mycat中server.xml中配置账号密码,不是数据库的密码。
spring.datasource.druid.username=root
spring.datasource.druid.password=123456
#mycat的逻辑库 端口也是mycat的
spring.datasource.druid.url=jdbc:mysql://192.168.142.129:8066/TESTDB

注意mycat的端口是8066

mycat的全局表

类似于字典表这种数据量小,而且不怎么变化的表。不希望进行分片。

MyCAT 定义了一种特殊的表,称之为“全局表”,全局表具有以下特性:

• 全局表的插入、更新操作会实时在所有节点上执行,保持各个分片的数据一致性

• 全局表的查询操作,只从一个节点获取

• 全局表可以跟任何一个表进行 JOIN 操作

全局表配置比较简单,不用写 Rule 规则,修改schema.xml,如下配置即可:

代码语言:javascript复制
<table name="company" primaryKey="id" type="global" dataNode="dn1,dn2" />

需要注意的是,全局表每个分片节点上都会运行创建表的 DDL 语句。

type不配置默认是分片表。

mycat的子表

订单表和订单明细表都进行分片时,订单id对应的订单明细如果不在一个分片,进行关联查询就会跨库。所以希望订单id对应的订单明细也能落在相同的分片上。在table标签内增加<childTable>标签。

代码语言:javascript复制
<table name="customer" primaryKey="ID" dataNode="dn1,dn2"
               rule="sharding-by-intfile">
            <childTable name="orders" primaryKey="ID" joinKey="customer_id"
                        parentKey="id">
                <childTable name="order_items" joinKey="order_id"
                            parentKey="id" />
            </childTable>
            <childTable name="customer_addr" primaryKey="ID" joinKey="customer_id"
                        parentKey="id" />
</table>
代码语言:javascript复制
name属性
定义子表的表名。
joinKey属性
插入子表的时候会使用这个列的值查找父表存储的数据节点。
parentKey属性
属性指定的值一般为与父表建立关联关系的列名。程序首先获取joinkey的值,再通过**parentKey**属性指定的列名产生查询语句,通过执行该语句得到父表存储在哪个分片上。从而确定子表存储的位置。
primaryKey属性
同table标签所描述的。
needAddLimit属性
同table标签所描述的。
mycatdeHA(高可用)

https://segmentfault.com/a/1190000022478277 centos下搭建mycat高可用(keepalived haproxy)集群

不使用Nginx进行负载均衡是因为Ng是Http协议,mycat是TCP层的请求。所以需要用haproxy。keepavlived保证haproxy是双机热备的。

  • 客户端模式:sharding-jdbc

sharding-jdbc是以jar包的形式提供服务,不需要再单独启动一个服务。

Sharding-JDBC的实践 https://blog.csdn.net/u014534808/article/details/90138993

Spring Boot整合Sharding-JDBC实现读写分离和分库分表https://www.fangzhipeng.com/mysql.html

1. 数据分片

分库 & 分表

读写分离

分片策略定制化

无中心化分布式主键

2. 分布式事务

标准化事务接口

XA强一致事务

柔性事务

3. 数据库治理

配置动态化

编排 & 治理

数据脱敏

可视化链路追踪

弹性伸缩(规划中)

代码语言:javascript复制
<!-- Sharding-jdbc 依赖 -->
<dependency>
      <groupId>org.apache.shardingsphere</groupId>
      <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
      <version>4.0.0</version>
</dependency>
mycat和sharding-jdbc的比较

mycat不支持同一库内的水平切分,sharding-jdbc支持

分布式事务

1. 分布式事务概念

讨论分布式事务之前我们分清两个概念:本地事务、分布式事务;

本地事务是解决单个数据源上的数据操作的一致性问题的话,而分布式事务则是为了解决跨越多个数据源上数据操作的一致性问题。

百度官方对分布式事务的定义是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。

也就是说我们在操作一个业务逻辑过程中,涉及两个数据源(A、B),且很多时候A、B这两个数据源属于两个不同的物理环境。当我们操作A数据源过程中出现异常情况,那么必须让针对B数据源的操作回滚,同时A数据源的操作也回滚。

JAVA领域中针对分布式事务的解决方案就是JTA(即Java Transaction API);本章节我们只针对SpringBoot官方提供的Atomikos 和 Bitronix的两种做描述解决思路;

2. 产生原因
2.1. 数据库分库分表

当数据库单表数据量超过2000W,就要考虑分库分表,这时候,如果一个操作既访问01库,又访问02库,而且要保证数据的一致性,那么就要用到分布式事务。

2.2. 应用服务化

业务的服务化。比如原来单机支撑的应用服务,拆解为一块一块独立的服务,例如用户中心、订单中心、账户中心、库存中心。对于订单中心,有专门的数据库存储订单信息,用户中心也有专门的数据库存储用户信息,库存中心也会有专门的数据库存储库存信息。这时候如果要同时对订单进行操作,那么就会涉及到订单数据库和账户数据库,为了保证数据一致性,就需要用到分布式事务。

3. 事务ACID特性

类别

描述

原子性

整个事务中的所有操作,要么全部完成,要么全部不做,没有中间状态。对于事务在执行中发生错误,所有的操作都会被回滚,整个事务就像从没被执行过一样

一致性

事务的执行必须保证系统的一致性,就拿转账为例,A有500元,B有300元,如果在一个事务里A成功给B转账50元,那么不管并发多少,不管发生什么,只要事务执行成功了,那么最后A账户一定是450元,B账户一定是350元。

隔离性

事务与事务之间不会互相影响,一个事务的中间状态不会被其他事务感知。

持久性

单事务完成,那么事务对数据所做的变更就完全保存在数据库中,即使发生停电,系统宕机也是如此。

4. 应用场景
4.1. 支付

最经典场景就是支付,一笔支付,是对买家账户进行扣款,同时对卖家账户进行加钱,这些操作必须在一个事务里执行,要么全部成功,要么全部失败。而对于买家账户属于买家中心,对应的是买家数据库,而卖家账户属于卖家中心,对应的是卖家数据库,对不同数据库的操作必然需要引入分布式事务。

4.2. 在线订单

买家在电商平台下单,往往会涉及到两个动作,一个是扣库存,第二个是更新订单状态,库存和订单一般属于不同的数据库,需要使用分布式事务保证数据一致性。

5. 行业中常见解决方案
5.1. 本地消息表(异步确保)——可靠消息最终一致性

可靠消息最终一致性方案要解决以下几个问题:

1.本地事务与消息发送的原子性问题本地事务与消息发送的原子性问题即:事务发起方在本地事务执行成功后消息必须发出去,否则就丢弃消息。即实现本地事务和消息发送的原子性,要么都成功,要么都失败。本地事务与消息发送的原子性问题是实现可靠消息最终一致性方案的关键问题。下面这种操作,先发送消息,在操作数据库:

begin transaction; //1.发送MQ //2.数据库操作 commit transation;

  1. 这种情况下无法保证数据库操作与发送消息的一致性,因为可能发送消息成功,数据库操作失败。那么第二种方案,先进行数据库操作,再发送消息:

begin transaction; //1.数据库操作 //2.发送MQ commit transation;

这种情况下貌似没有问题,如果发送 MQ 消息失败,就会抛出异常,导致数据库事务回滚。但如果是超时异常,数据库回滚,但 MQ 其实已经正常发送了,同样会导致不一致。

2.事务参与方接收消息的可靠性事务参与方必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息。

3.消息重复消费的问题由于网络2的存在,若某一个消费节点超时但是消费成功,此时消息中间件会重复投递此消息,就导致了消息的重复消费。要解决消息重复消费的问题就要实现事务参与方的方法幂等性。

本地消息表(异步确保)可解决上述问题

本地消息表这种实现方式应该是业界使用最多的,其核心思想是将分布式事务拆分成本地事务进行处理,这种思路是来源于ebay。

基本思路:

消息生产方,需要额外建一个消息表,并记录消息发送状态。消息表和业务数据要在一个事务里提交,也就是说他们要在一个数据库里面。然后消息会经过MQ发送到消息的消费方。如果消息发送失败,会进行重试发送。

消息消费方,需要处理这个消息,并完成自己的业务逻辑。此时如果本地事务处理成功,表明已经处理成功了,如果处理失败,那么就会重试执行。如果是业务上面的失败,可以给生产方发送一个业务补偿消息,通知生产方进行回滚等操作。

  1. 用户注册用户服务在本地事务新增用户和增加 "积分消息日志"。(用户表和消息表通过本地事务保证一致)

begin transaction //1.新增用户 //2.存储积分消息日志 commit transation

  1. 这种情况下,本地数据库操作与存储积分消息日志处于同一个事务中,本地数据库操作与记录消息日志操作具备原子性。
  2. 定时任务扫描日志如何保证将消息发送给消息队列呢?经过第一步消息已经写到消息日志表中,可以启动独立的线程,定时对消息日志表中的消息进行扫描并发送至消息中间件,在消息中间件反馈发送成功后删除该消息日志,否则等待定时任务下一周期重试。
  3. 消费消息如何保证消费者一定能消费到消息呢?这里可以使用 MQ 的 ack(即消息确认)机制,消费者监听 MQ,如果消费者接收到消息并且业务处理完成后向 MQ 发送 ack(即消息确认),此时说明消费者正常消费消息完成,MQ 将不再向消费者推送消息,否则消费者会不断重试向消费者来发送消息。积分服务接收到"增加积分"消息,开始增加积分,积分增加成功后向消息中间件回应 ack,否则消息中间件将重复投递此消息。由于消息会重复投递,积分服务的"增加积分"功能需要实现幂等性。

特点:生产方和消费方定时扫描本地消息表,把还没处理完成的消息或者失败的消息再发送一遍。如果有靠谱的自动对账补账逻辑,这种方案还是非常实用的。

不使用MQ直接调用业务方接口进行通知也是可以的。这个对于不同公司,由于不是一个内网比较适合。

代码语言:javascript复制
@Service
public class OrderSchedule {

    @Resource
    private PaymentMsgMapper paymentMsgMapper;

    //给订单处理接口发送通知
    @Scheduled(cron = "0/10 * * * * ?")
    public void orderNotify() throws IOException {

        List<PaymentMsg> list = paymentMsgMapper.selectUnSendMsgList();
        if (list == null || list.size() == 0) {
            return;
        }

        for (PaymentMsg paymentMsg : list) {
            int orderId = paymentMsg.getOrderId();
            CloseableHttpClient httpClient = HttpClientBuilder.create().build();
            HttpPost httpPost = new HttpPost("http://localhost:8080/handlerOrder");
            NameValuePair orderIdPair = new BasicNameValuePair("orderId", orderId   "");
            List<NameValuePair> nvlist = new ArrayList<>();
            nvlist.add(orderIdPair);
            HttpEntity httpEntity = new UrlEncodedFormEntity(nvlist);
            httpPost.setEntity(httpEntity);
            CloseableHttpResponse response =    httpClient.execute(httpPost);
            String s = EntityUtils.toString(response.getEntity());
            if("success".equals(s)){
                paymentMsg.setStatus(1); //发送成功
                paymentMsg.setUpdateTime(new Date());
                paymentMsg.setUpdateUser(0); //系统更新
                paymentMsgMapper.updateByPrimaryKey(paymentMsg);
            }else {
                int failCnt = paymentMsg.getFailCnt();
                failCnt   ;
                paymentMsg.setFailCnt(failCnt);
                if(failCnt > 5){

                    paymentMsg.setStatus(2); //超过5次,改成失败
                }
                paymentMsg.setUpdateUser(0); //系统更新
                paymentMsg.setUpdateTime(new Date());
                paymentMsgMapper.updateByPrimaryKey(paymentMsg);
            }

        }
    }

}
5.2. 两阶段提交

2PC 即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase),2 是指两个阶段,P 是指准备阶段,C 是指提交阶段。

XA是X/Open CAE Specification (Distributed Transaction Processing)模型中定义的TM(Transaction Manager)与RM(Resource Manager)之间进行通信的接口。

两阶段提交是XA的标准实现。它将分布式事务的提交拆分为2个阶段:prepare和commit/rollback。

在XA规范中,数据库充当RM角色,应用需要充当TM的角色,即生成全局的txId,调用XAResource接口,把多个本地事务协调为全局统一的分布式事务。

XA中有两个重要的概念:事务管理器和本地资源管理器。其中本地资源管理器往往由数据库实现,比如Oracle、DB2这些商业数据库都实现了XA接口,而事务管理器作为全局的调度者,负责各个本地资源的提交和回滚。XA实现分布式事务的原理如下:

特点:XA协议比较简单,目前很多商业数据库实现XA协议,使用分布式事务的成本也比较低。但是,XA也有致命的缺点,那就是性能不理想,特别是在交易下单链路,往往并发量很高,XA无法满足高并发场景。XA目前在商业数据库支持的比较理想,在mysql数据库中支持的不太理想,mysql的XA实现,没有记录prepare阶段日志,主备切换回导致主库与备库数据不一致。许多nosql也没有支持XA,这让XA的应用场景变得非常狭隘。在prepare阶段需要等待所有参与子事务的反馈,因此可能造成数据库资源锁定时间过长,不适合并发高以及子事务生命周长较长的业务场景。两阶段提交这种解决方案属于牺牲了一部分可用性来换取的一致性。

springboot jta atomikos实现两阶段提交分布式事务的管理(数据库相关——JTA atomikos_传统的分布式事务解决方案)

Seata ——实现2PC

官方文档 http://seata.io/zh-cn/docs/user/quickstart.html

微服务应用整合SEATA实现分布式事务 https://www.cnblogs.com/yg_zhang/p/13061286.html

Seata 定义了 3 个组件来协议分布式事务的处理过程:

Transaction Coordinator(TC):事务协调器,它是独立的中间件,需要独立部署运行,它维护全局事务的运行状态,接收 TM 指令发起全局事务的提交与回滚,负责与 RM 通信协调各各分支事务的提交或回滚。

Transaction Manager(TM):事务管理器,TM 需要嵌入应用程序中工作,它负责开启一个全局事务,并最终向 TC 发起全局提交或全局回滚的指令。

Resource Manager(RM):控制分支事务,负责分支注册、状态汇报,并接收事务协调器 TC 的指令,驱动分支(本地)事务的提交和回滚。

新用户注册送积分举例Seata的分布式事务过程:

具体的执行流程如下:

  1. 用户服务的 TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID。
  2. 用户服务的 RM 向 TC 注册分支事务,该分支事务在用户服务执行新增用户逻辑,并将其纳入 XID 对应全局事务的管辖。
  3. 用户服务执行分支事务,向用户表插入一条记录。
  4. 逻辑执行到远程调用积分服务时(XID 在微服务调用链路的上下文中传播)。积分服务的 RM 向 TC 注册分支事务,该分支事务执行增加积分的逻辑,并将其纳入 XID 对应全局事务的管辖。
  5. 积分服务执行分支事务,向积分记录表插入一条记录,执行完毕后,返回用户服务。
  6. 用户服务分支事务执行完毕。
  7. TM 向 TC 发起针对 XID 的全局提交或回滚决议。
  8. TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

Seata实现2PC与传统2PC的差别

架构层次方面:传统 2PC 方案的 RM 实际上是在数据库层,RM 本质上就是数据库自身,通过 XA 协议实现,而 Seata 的 RM 是以 jar 包的形式作为中间件层部署在应用程序这一侧的。

两阶段提交方面:传统 2PC无论第二阶段的决议是 commit 还是 rollback ,事务性资源的锁都要保持到 Phase2 完成才释放。而 Seata 的做法是在 Phase1 就将本地事务提交,这样就可以省去 Phase2 持锁的时间,整体提高效率。

5.3. MQ事务消息 最终一致性

https://www.jianshu.com/p/84d830bee587 RocketMQ事务消息

https://www.cnblogs.com/xuwc/p/9034029.html

5.3 MQ事务消息和 5.1. 本地消息表(异步确保)的区别:

RocketMQ最大的改变,其实就是把“扫描消息表”这个事情,不让业务方做,而是消息中间件帮着做了。

至于消息表,其实还是没有省掉。因为消息中间件要询问发送方,事物是否执行成功,还是需要一个“变相的本地消息表”,记录事物执行状态。

事务消息作为一种异步确保型事务, 将两个事务分支通过MQ进行异步解耦,事务消息的设计流程同样借鉴了两阶段提交理论。

  • 事务发起方首先发送prepare消息到MQ。
  • 在发送prepare消息成功后执行本地事务。
  • 根据本地事务执行结果返回commit或者是rollback。
  • 如果消息是rollback,MQ将删除该prepare消息不进行下发,如果是commit消息,MQ将会把这个消息发送给consumer端。
  • 如果执行本地事务过程中,执行端挂掉,或者超时,MQ将会不停的询问其同组的其它producer来获取状态。
  • Consumer端的消费成功机制有MQ保证。

基于消息中间件的两阶段提交往往用在高并发场景下,将一个分布式事务拆成一个消息事务(A系统的本地操作 发消息) B系统的本地操作,其中B系统的操作由消息驱动,只要消息事务成功,那么A操作一定成功,消息也一定发出来了,这时候B会收到消息去执行本地操作,如果本地操作失败,消息会重投,直到B操作成功,这样就变相地实现了A与B的分布式事务

特点:第三方的MQ是支持事务消息的,比如RocketMQ,但是市面上一些主流的MQ都是不支持事务消息的,比如 RabbitMQ 和 Kafka 都不支持。

5.4. 补偿事务(TCC)

TCC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。TCC模型是把锁的粒度完全交给业务处理。它分为三个阶段:

  • Try 阶段主要是对业务系统做检测及资源预留
  • Confirm 阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行 Confirm阶段时,默认 Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。
  • Cancel 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

特点:TCC模型对业务的侵入强,改造的难度大。

一致性较差,补偿操作本身也有可能出错。当涉及多个系统时更难保证一致性。

代码语言:javascript复制
使用TCC事务时,伪代码如下所示:

@Compensable(confirmMethod = "transferConfirm", cancelMethod = "transferCancel")
@Transactional
public void transferTry(long fromAccountId, long toAccountId, int amount) {
    //检查Tom账户
    //锁定Tom账户
    //锁定Tracy账户
}

@Transactional
public void transferConfirm(long fromAccountId, long toAccountId, int amount) {
    //tom账户-10元
    //tracy账户 10元
}

@Transactional
public void transferCancel(long fromAccountId, long toAccountId, int amount) {
    //解除Tom账户锁定
    //接触Tracy账户锁定
}
6.分布式事务对比分析

2PC 最大的诟病是一个阻塞协议。RM 在执行分支事务后需要等待 TM 的决定,此时服务会阻塞并锁定资源。由于其阻塞机制和最差时间复杂度高,因此,这种设计不能适应随着事务涉及的服务数量增加而扩展的需要,很难用于并发较高以及子事务生命周期较长(long-running transactions) 的分布式服务中。

如果拿TCC事务的处理流程与2PC两阶段提交做比较,2PC 通常都是在跨库的 DB 层面,而 TCC 则在应用层面的处理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使得降低锁冲突、提高吞吐量成为可能。而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现 Try、Confirm、Cancel 三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实 现不同的回滚策略。典型的使用场景:满减,登录送优惠券等。

可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。典型的使用场景:注册送积分,登录送优惠券等。

最大努力通知是分布式事务中要求最低的一种,适用于一些最终一致性时间敏感度低的业务;允许发起通知方处理业务失败,在接收通知方收到通知后积极进行失败处理,无论发起通知方如何处理结果都会不影响到接收通知方的后续处理;发起通知方需提供查询执行情况接口,用于接收通知方校对结果。典型的使用场景:银行通知、支付结果通知等。

2PC

TCC

可靠消息

最大努力通知

一致性

强一致性

最终一致性

最终一致性

最终一致性

吞吐量

实现复杂度

分布式锁

在分布式锁的技术实现上,主流认可有三种实现方式,从复杂度来看,由难至易依次增加:

基于数据库实现分布式锁;

基于缓存(Redis/Memcached等)实现分布式锁;

基于Zookeeper实现分布式锁;

数据库实现(乐观锁、悲观锁)
乐观锁

每次去取数据的时候都会认为不会有其他线程对数据进行修改,因此不会上锁,但是在更新时会判断其他线程在这之前有没有对数据进行修改,一般会使用版本号机制或CAS操作实现;乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库如果提供类似于write_condition机制的其实都是提供的乐观锁。

乐观锁关键点:

  • 锁服务要有递增的版本号version
  • 每次更新数据都要先判断版本号对不对,然后再写入新的版本号
悲观锁

每次取数据时都认为其他线程会修改,所以都会加锁(读锁、写锁、行锁等),当其他线程想要访问数据时,都需要block阻塞挂起。可以依靠数据库实现,如行锁、读锁和写锁等,都是在操作之前加锁,它对数据被外界(包括本系统当前的其他事务,以及来自外部系统的事务处理)修改持保守态度,因此,在整个数据处理过程中,将数据处于锁定状态。悲观锁的实现,往往依靠数据库提供的锁机制(也只有数据库层提供的锁机制才能真正保证数据访问的排他性,否则,即使在本系统中实现了加锁机制,也无法保证外部系统不会修改数据)。在 Java中,synchronized的思想也是悲观锁。

乐观锁、悲观锁两种锁各有优缺点,不可认为一种好于另一种,像乐观锁适用于写比较少的情况下,即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果经常产生冲突,上层应用会不断的进行retry,这样反倒是降低了性能,所以这种情况下用悲观锁就比较合适。

代码语言:javascript复制
DROP TABLE IF EXISTS method_lock;

CREATE TABLE method_lock (
lck_id INT(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
method_name VARCHAR(64) NOT NULL COMMENT '锁定的方法名',
state TINYINT NOT NULL COMMENT '1:未分配;2:已分配',
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
ver INT NOT NULL COMMENT '版本号',
PRIMARY KEY (lck_id),
UNIQUE KEY uidx_method_name (method_name) USING BTREE
) ENGINE=INNODB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
方案1_for update

Oracle、Mysql中是基于for update来实现加锁的。在MYSQL中需要注意的是,在InnoDB中只有字段加了索引的,才会是行级锁,否者是表级锁,所以一定要对where的条件字段加索引。

当这条记录加上排它锁之后,其它线程是无法操作这条记录的。

除了可以通过增删操作数据表中的记录以外,其实还可以借助数据中自带的锁来实现分布式的锁。我们还用刚刚创建的那张数据库表。可以通过数据库的排他锁来实现分布式锁。

代码语言:javascript复制
connection.setAutoCommit(false)
int count = 0;
while(count < 4){
try{
select * from lock where lock_name=xxx for update;
if(结果不为空){
//代表获取到锁
return;
}
}catch(Exception e){
}
//为空或者抛异常的话都表示没有获取到锁
sleep(1000);
count  ;
}
throw new LockException();
}
通过connection.commit()操作来释放锁。
方案2_唯一性约束

基于数据库表数据记录做唯一约束(表中记录方法名称)。

要实现分布式锁,最简单的方式可能就是直接创建一张锁表,然后通过操作该表中的数据来实现了。

对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功。

代码语言:javascript复制
--当我们想要锁住某个方法时,执行以下SQL
INSERT INTO method_lock (method_name, desc) VALUES ('methodName', 'methodName');
--当我们想释放某个方法时:
DELETE FROM method_lock WHERE method_name='methodName';
方案3_查询并占有

先获取锁的信息:

代码语言:javascript复制
select id, method_name, state,version from method_lock where state=1 and method_name='methodName';

再占有

代码语言:javascript复制
update t_resoure set state=2, version=2, update_time=now() where method_name='methodName' and state=1 and version=2;

如果没有更新影响到一行数据,则说明这个资源已经被别人占位了。

缓存_Redis实现

基于Redis实现的锁机制,主要是依赖redis自身的原子操作,因为redis是单线程。要求redis版本大于2.6.12。

redis单实例
  • 引入pom

版本号大家以实际使用中的为准,我这里仅供参考,因为spring-boot的自动注册功能会为我们提供StringRedisTemplate,直接使用即可。

代码语言:javascript复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.1.3.RELEASE</version>
</dependency>
  • yml文件
代码语言:javascript复制
server:
  port: 9090
spring:
  datasource:
    name: mysql
    type: com.alibaba.druid.pool.DruidDataSource
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/springboot?characterEncoding=utf-8&useSSL=true
    username: root
    password: 123456
    druid:
      initial-size: 5
      min-idle: 5
      max-active: 20
      max-wait: 30000
      time-between-eviction-runs-millis: 60000
      min-evictable-idle-time-millis: 300000
      validation-query: select 1
      test-while-idle: true
      test-on-borrow: false
      test-on-return: false
      pool-prepared-statements: false
      max-pool-prepared-statement-per-connection-size: 20
      connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=6000
  redis:
    database: 0
    host: 127.0.0.1
    port: 6379
mybatis:
  mapperLocations: classpath:mapper/**/*.xml
  • 核心实现
代码语言:javascript复制
package xyz.wongs.weathertop.comp;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class RedisLockComponent {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    /**
     * @Description 获取锁,默认:失效时间为5,失效时间的单位为秒,重试次数为3,休眠4秒
     * @param key
     * @param value
     * @param expire    redis过期时间
     * @return boolean
     * @throws
     * @date 2019/11/16 21:37
     */
    public boolean getLock(String key,String value){
        return getLock(key,value,5, TimeUnit.SECONDS,3,5000);
    }
    /**
     * @Description 获取锁,默认:失效时间的单位为秒,重试次数为3,休眠4秒
     * @param key
     * @param value
     * @param expire    redis过期时间
     * @return boolean
     * @throws
     * @date 2019/11/16 21:37
     */
    public boolean getLock(String key,String value,long expire){
        return getLock(key,value,expire, TimeUnit.SECONDS,3,5000);
    }
    /**
     * @Description 获取锁,默认重试次数为3,休眠5秒
     * @param key
     * @param value
     * @param expire    redis过期时间
     * @param unit
     * @return boolean
     * @throws
     * @date 2019/11/16 21:37
     */
    public boolean getLock(String key,String value,long expire, TimeUnit unit){
        return getLock(key,value,expire, unit,3,5000);
    }
    /**
     * @Description
     * @param key
     * @param value
     * @param expire    redis过期时间
     * @param unit
     * @param tryCount  重试次数
     * @param waitMillis 每次重试要等待时间
     * @return boolean
     * @throws
     * @date 2019/11/16 21:37
     */
    public boolean getLock(String key,String value,long expire, TimeUnit unit,int tryCount,int waitMillis){
        //setIfAbsent如果键不存在则新增,存在则不改变已经有的值。
        boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key,value,expire,unit);
        if(success) {
            return true;
        }
        //判断锁超时 - 防止原来的操作异常,没有运行解锁操作  防止死锁
        String val = stringRedisTemplate.opsForValue().get(key);
        if(!Strings.isNullOrEmpty(val)){
            if(System.currentTimeMillis() - Long.parseLong(val) > unit.toMillis(expire)){
                // 超时移除
                stringRedisTemplate.delete(key);
            }
        }
        // 重试、等待
        if (tryCount > 0 && waitMillis > 0) {
            try {
                Thread.sleep(waitMillis);
            } catch (InterruptedException e) {
                log.error("getLock exception{}",e.getMessage());
            }
            return getLock(key,value,expire,unit,tryCount - 1,waitMillis);
        }
        return false;
    }
    /**
     * @Description 获取等待时间
     * @param key
     * @param expire
     * @param unit
     * @return long 秒
     * @throws
     * @date 2019/11/16 21:44
     */
    public long getWaitSecond(String key,long expire,TimeUnit unit) {
        long currentTime = System.currentTimeMillis();
        long preTime = Long.parseLong(stringRedisTemplate.opsForValue().get(key));
        return (preTime   unit.toMillis(expire) - currentTime) / 1000;
    }
    /**
     * @Description 设置锁的过期时间,默认单位为毫秒
     * @param key
     * @param expTime
     * @return Boolean
     * @throws
     * @date 2019/11/16 21:18
     */
    public Boolean renewal(String key,int expTime){
        return renewal(key, expTime, TimeUnit.MILLISECONDS);
    }
    /**
     * @Description 设置锁的过期时间
     * @param key
     * @param expTime
     * @param unit
     * @return Boolean
     * @throws
     * @date 2019/11/16 21:18
     */
    public Boolean renewal(String key,int expTime,TimeUnit unit){
        return stringRedisTemplate.expire(key, expTime, unit);
    }
    /**
     * @Description 解锁
     * @param key
     * @param val
     * @return void
     * @throws
     * @date 2019/11/16 21:13
     */
    public void unlock(String key,String val){
        try {
            String value = stringRedisTemplate.opsForValue().get(key);
            if(!Strings.isNullOrEmpty(value) && val.equals(value) ){
                // 删除锁状态
                stringRedisTemplate.opsForValue().getOperations().delete(key);
            }
        } catch (Exception e) {
            log.error("unlock exception{}",e);
        }
    }
}
  • 模拟样例
代码语言:javascript复制
package xyz.wongs.weathertop.web;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import xyz.wongs.weathertop.base.message.enums.ResponseCode;
import xyz.wongs.weathertop.base.message.response.ResponseResult;
import xyz.wongs.weathertop.comp.RedisLockComponent;
import xyz.wongs.weathertop.deno.entity.RedisLock;
import xyz.wongs.weathertop.deno.mapper.RedisLockMapper;
import java.util.concurrent.TimeUnit;
@RestController
@Slf4j
public class RedisController {
    @Autowired
    private RedisLockMapper redisLockMapper;
    @Autowired
    private RedisLockComponent redisLockComponent;
    /**
     * 超时时间 5s
     */
    private static final int TIMEOUT = 3;
    @RequestMapping(value = "/seckilling/{key}")
    public ResponseResult Seckilling(@PathVariable("key") String key){
        ResponseResult responseResult = new ResponseResult();
        //1、加锁
        String value = System.currentTimeMillis()   "";
        if(!redisLockComponent.getLock(key,value,6)){
            responseResult.setStatus(false);
            responseResult.setCode(ResponseCode.DICT_LOCK_FAIL.getCode());
            responseResult.setMsg("排队人数太多,请稍后再试.");
            return responseResult;
        }
        RedisLock redisLock = redisLockMapper.selectByPrimaryKey(1);
        // 2、查询库存,为0则活动结束
        if(redisLock.getCounts()==0){
            responseResult.setStatus(false);
            responseResult.setCode(ResponseCode.RESOURCE_NOT_EXIST.getCode());
            responseResult.setMsg("库存不够.");
            return responseResult;
        }
        //3、减库存
        redisLock.setCounts(redisLock.getCounts()-1);
        redisLockMapper.updateByPrimaryKeySelective(redisLock);
        try{
            Thread.sleep(5000);//模拟减库存的处理时间
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        //4、释放锁
        redisLockComponent.unlock(key,value);
        responseResult.setMsg("恭喜您,秒杀成功。");
        return responseResult;
    }
}

需要注意的问题 1.锁超时是什么意思呢?如果一个得到锁的线程在执行任务的过程中挂掉,来不及显式地释放锁,这块资源将会永远被锁住,别的线程再也别想进来。

所以,setnx的key必须设置一个超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放。setnx不支持超时参数,所以需要额外的指令,伪代码如下:

expire(key, 30)

2.setnx和expire的非原子性

设想一个极端场景,当某线程执行setnx,成功得到了锁:

setnx刚执行成功,还未来得及执行expire指令,节点1 Duang的一声挂掉了。

这把锁就没有设置过期时间,变得“长生不老”,别的线程再也无法获得锁了。

怎么解决呢?setnx指令本身是不支持传入超时时间的,Redis 2.6.12以上版本为set指令增加了可选参数,伪代码如下:set(key,1,30,NX),这样就可以取代setnx指令

3.超时后使用del 导致误删其他线程的锁

又是一个极端场景,假如某线程成功得到了锁,并且设置的超时时间是30秒。

如果某些原因导致线程A执行的很慢很慢,过了30秒都没执行完,这时候锁过期自动释放,线程B得到了锁。

随后,线程A执行完了任务,线程A接着执行del指令来释放锁。但这时候线程B还没执行完,线程A实际上删除的是线程B加的锁

怎么办呢?我们可以让获得锁的线程开启一个守护线程,用来给快要过期的锁“续航”。

当过去了29秒,线程A还没执行完,这时候守护线程会执行expire指令,为这把锁“续命”20秒。守护线程从第29秒开始执行,每20秒执行一次。

当线程A执行完任务,会显式关掉守护线程。

另一种情况,如果节点1 忽然断电,由于线程A和守护线程在同一个进程,守护线程也会停下。这把锁到了超时的时候,没人给它续命,也就自动释放了。

代码语言:javascript复制
Thread daemonTread = new Thread();  

  // 设定 daemonThread 为 守护线程,default false(非守护线程)  
 daemonThread.setDaemon(true);  

 // 验证当前线程是否为守护线程,返回 true 则为守护线程  
 daemonThread.isDaemon();
zookeeper实现

如果有客户端C、客户端D等N个客户端争抢一个zk分布式锁,原理都是类似的。

  • 大家都是上来直接创建一个锁节点下的一个接一个的临时顺序节点
  • 如果自己不是第一个节点,就对自己上一个节点加监听器
  • 只要上一个节点释放锁,自己就排到前面去了,相当于是一个排队机制

而且用临时顺序节点的另外一个用意就是,如果某个客户端创建临时顺序节点之后,不小心自己宕机了也没关系,zk感知到那个客户端宕机,会自动删除对应的临时顺序节点,相当于自动释放锁,或者是自动取消自己的排队。

Curator是基于zk的分布式锁的实现的框架,它就是Netflix开源的一套ZooKeeper客户端框架,它提供了zk场景的绝大部分实现,使用Curator就不必关心其内部算法,Curator提供了来实现分布式锁,用方法获取锁,以及用方法释放锁,同其他锁一样,方法需要放在finally代码块中,确保锁能正确释放。

Curator提供了四种分布式锁:

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器

说了这么多,Zookeeper如何实现分布式锁,接下来代码奉上,我们以Springboot载体,写一个案例。

1. 引入POM依赖
代码语言:javascript复制
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.10</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </exclusion>
        <exclusion>
            <artifactId>log4j</artifactId>
            <groupId>log4j</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </exclusion>
    </exclusions>
</dependency>
2. application文件
代码语言:javascript复制
curator:
  retryCount: 5
  elapsedTimeMs: 5000
  connectString: 192.168.147.132:2181
  # session超时时间
  sessionTimeoutMs: 60000
  # 连接超时时间
  connectionTimeoutMs: 5000
3. 核心实现
代码语言:javascript复制
/**
    * @Description 获取分布式锁
    * @param path 提供可供写入的路径
    * @return void
    * @throws
    * @date 2019/11/4 9:36
    */
public boolean acquireDistributedLock(String path) {
    boolean lock = true;
    String keyPath = "/"   ROOT_PATH_LOCK   "/"   path;
    while (true) {
        try {
            curatorFramework
                    .create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL)
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .forPath(keyPath);
            log.info("success to acquire lock for path:{}", keyPath);
            break;
        } catch (Exception e) {
            log.info("failed to acquire lock for path:{}", keyPath);
            log.info("while try again .......");
            try {
                if (countDownLatch.getCount() <= 0) {
                    countDownLatch = new CountDownLatch(1);
                }
                countDownLatch.await();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            lock = false;
        }
    }
    return lock;
}
/**
    * @Description
    * @param path  释放分布式锁
    * @return boolean
    * @throws
    * @date 2019/11/4 9:36
    */
public boolean releaseDistributedLock(String path) {
    boolean release = true;
    String keyPath = "/"   ROOT_PATH_LOCK   "/"   path;;
    try {
        if (curatorFramework.checkExists().forPath(keyPath) != null) {
            curatorFramework.delete().forPath(keyPath);
        }
    } catch (Exception e) {
        log.error("failed to release lock");
        release = false;
    }
    return release;
}
4. 演示结果

我用webcontroller实现一个restfull接口,同时打开两个URL获取同一把锁,获取的为成功,否则失败。

代码语言:javascript复制
@GetMapping("/lock10")
public ResponseResult getLock1() {
    ResponseResult responseResult = new ResponseResult();
    Boolean acquire = distributedLockByZookeeper.acquireDistributedLock(PATH);
    try {
        if(acquire) {
            log.error("I am lock1,i am updating resource……!!!");
            Thread.sleep(2000);
        } else{
            responseResult.setCode(ResponseCode.SYSNC_LOCK.getCode());
            responseResult.setMsg(ResponseCode.SYSNC_LOCK.getMsg());
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        distributedLockByZookeeper.releaseDistributedLock(PATH);
    }
    return responseResult;
}

分布式文件系统(fastDFS & 阿里云OSS)

使用分布式文件系统可以解决如下几点问题:

1. 海量文件数据存储

2. 文件数据高可用(冗余备份)

3. 读写性能和负载均衡

以上3点都是我们之前使用tomcat或nginx所不能够实现的,这也是我们为什么要使用分布式文件系统的原因。

FastDFS 、 HDFS、MINIO

说到分布式文件存储,肯定会有人想到HDFS,他们两者主要定位和应用场景是不一样的。

1. Hadoop中的文件系统HDFS主要解决并行计算中分布式存储数据的问题。其单个数据文件通常很大,采用了分块(切分)存储的方式

数据大文件存储来使用的场景。

2. FastDFS主要用于互联网网站,为文件上传和下载提供在线服务。所以在负载均衡、动态扩容等方面都支持得比较好,FastDFS不会对

快存储。FastDFS用于存储中小文件都是不错的,比如用户头像啊,一些较小的音视频文件啊等等都行。

MinIO很强让我放弃FastDFS拥抱MinIO的8个理由http://slack.minio.org.cn/question/23

MinIO官网地址:http://docs.minio.org.cn/docs

FastDFS

1.常见术语

tracker:追踪者服务器,主要用于协调调度,可以起到负载均衡的作用,记录storage的相关状态信息。

storage:存储服务器,用于保存文件以及文件的元数据信息。

group:组,同组节点提供冗余备份,不同组用于扩容。

mata data:文件的元数据信息,比如长宽信息,图片后缀,视频的帧数等。

主要是上传下载数据的服务器,也就是我们自己的项目所部署在的服务器。每个客户端服务器都需要安装Nginx。

代码语言:javascript复制
https://blog.csdn.net/zhangcongyi420/article/details/82958495  springboot整合fastdfs做文件上传
代码语言:javascript复制
https://www.yuque.com/molizhuzhu/thrgrk/on4kly#L4QKK

0 人点赞