System|分布式|从Dubbo+Zookeeper开始实现分布式存储系统

2021-11-22 10:24:52 浏览数 (1)

源码已开源。分布式键值存储是很常见的中间件,例如Redis。专业的收口课要求我们以实现一个分布式键值对存储系统,并且除了Zookeeper和RPC框架之外不允许使用其他分布式中间件。

为什么不直接用zookeeper来做键值对存储?

zookeepeer的协议zab是Paxos变体,众所周知,Paxos是一个强一致性协议,需要经过多轮提议才能确定最终的共识。如果使用Zookeeper来进行存储,性能会惨不忍睹。

因此,现在的分布式架构多以Zookeeper作为注册中心存储metadata,涉及性能的data自己处理。在这里,我用Zookeeper Dubbo RPC框架作为基础平台。

需求规约

基本架构

我们的分布式存储系统具有三个原语,实现最简单的API

  • READ(key)
  • PUT(key,value)
  • DELETE(key,value)

客户端对于Master请求应该根据key转发给对应的数据节点,类似于分库分表。这里的难点在于,如何实现scalability?


Zookeepr集群搭建

基本功能实现(单Client 单Master 单Data)

zookeeper就默认安装,改个zoo.cfg,维持2181端口即可。

首先建立一个Maven项目,在其中添加api模块,其中放置我们RPC的接口文件,需要注意,这里的参数和返回值不能是基类,否则在后面的RPC marshal/unmarshal时似乎会出现问题(例如不是Map而是ConcurrentHashMap)。

然后创建三个spring模块client、master、slave(这里的Slave类指的就是Data节点),pom如下。这里的API模块是为了支持依赖注入,服务通过注入的API完成RPC通信。

代码语言:javascript复制
        <dependency>
            <groupId>sjtu</groupId>
            <artifactId>api</artifactId>
            <version>0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo-spring-boot-starter</artifactId>
            <version>${dubbo.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo-dependencies-zookeeper</artifactId>
            <version>${dubbo.version}</version>
            <type>pom</type>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

property设置为

代码语言:javascript复制
spring.application.name= dubbo-master
#依赖注入的service在哪个包扫描,这里包名是sjtu.master,其他类比即可
dubboo.scan.base-packages=sjtu.master
# Dubbo Protocol
dubbo.protocol.name=dubbo
## Random port
dubbo.protocol.port=-1
## Dubbo Registry
dubbo.registry.address=zookeeper://127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
dubbo.registry.file = ${user.home}/dubbo-cache/${spring.application.name}/dubbo.cache
## DemoService version
demo.service.version=1.0.0

Application的注解为@EnableAutoConfiguration, 服务类的实现为(SlaveService的方法可以用ConcurrentHashMap简易实现)

代码语言:javascript复制
@DubboService(version = "${demo.service.version}")
public class SlaveService implements SlaveAPI {
    @Override
    public Boolean PUT(String key, String val) {}
    @Override
    public String READ(String key) {}
    @Override
    public Boolean DELETE(String key) {}
}
@DubboService(version =  "${demo.service.version}")
public class MasterService implements MasterAPI {
    @DubboReference(version = "${demo.service.version}")
    private SlaveAPI slave;
    @Override
    public Boolean PUT(String key, String val){
        Boolean ret =  slave.PUT(key,val);
        return ret;
    }
    @Override
    public String READ(String key){
        String ret =   slave.READ(key);
        return ret;
    }
    @Override
    public Boolean DELETE(String key){
        Boolean ret =  slave.DELETE(key);
        return ret;
    }
}
@EnableAutoConfiguration
public class ClientApplication {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    @DubboReference(version = "${demo.service.version}")
    private MasterAPI master;
    public static void main(String[] args) {
        SpringApplication.run(ClientApplication.class, args);
    }
    @Bean
    public ApplicationRunner runner() {
        return args -> {
            logger.info(master.sayHello("mercyblitz"));
            for(Integer i = 0;i<10;i  ){
                logger.info("PUT "  i   " with "  master.PUT(i.toString(),i.toString()).toString());
            }
        };
    }
}

通过@DubboReference和@DubboService,Dubbo框架帮助我们从Zookeeper进行服务发现,在执行API对象时,我们实际在对注册的节点进行RPC操作。

以上,一个简单的RPC键值对存储系统就实现了,Master会进行RPC从实际存放数据的Slave节点中获取数据并返回。


负载均衡(单Client 单Master 多Data)

问题在于,我们需要实现均衡,让不同的key请求不同的Data节点, 如果注册多个同版本Data实例,在RPC通信时转发给不同的实例,是否就实现了呢?

在没有设置负载均衡时,默认访问第一个实例,而我们希望我们的访问能够根据key访问对应的实例,所以,这里我们需要进行负载均衡拓展,实现我们自定义的转发策略。需要实现它的doSelect接口,从一组服务列表(List<Invoker>)中选择转发的服务。

代码语言:javascript复制
@DubboReference(version = "${demo.service.version}", loadbalance = "TOWLoadBalance")
    private SlaveAPI slave;

新增模块loadbalance,这里我们利用Dubbo开放的SPI拓展,在如下文件中写入

负载均衡SPI拓展

代码语言:javascript复制
TOWLoadBalance=sjtu.loadbalance.TOWLoadBalance

Dubbo将会将我们实现的TOWLoadBalance类注入slave中。

一致性Hash

我们希望在节点数目增加后,请求能最大概率地还访问原本的节点,这就涉及到经典的一致性hash算法了。

利用隐式参数传递在RPC上下文中存储调用的key,送给负载均衡算法供计算。

代码语言:javascript复制
     @Override
    public Boolean PUT(String key, String val){
        RpcCon @ text.getContext().setAttachment("key", key);
        Boolean ret =  slave.PUT(key,val);
        return ret;
    }

Dubbo已经实现了一致性哈希负载均衡,不过作为作业肯定不能用轮子(还有别的原因)。这里我们用实际节点的地址添加后缀,作为虚拟节点,并计算虚拟节点的hash,然后找到第一个hash大于key的虚拟节点(or环形),在找到虚拟节点后去除后缀就能获得对应的实际节点。

我的实现暂时无法公开,思路和下面的博客类似。

数据迁移

问题在于,一致性hash只能使得请求划分到对应的节点上,然而我们的数据可没法凭空过去。新增的节点没有数据,根本没法处理转移的GET操作。

怎么办?

正常的思路是,加入新的节点后,这个时候的数据划分已经不具备正确性了。我们停机吧。在Data之间迁移数据,等数据被正确划分之后再提供服务。

问题是,停机的时候进行数据迁移,那么随着节点数目的增加,停机的时间会越来越长,既不具备scalability,也不具备availability。

Transfer on Write!

这个算法是我水群的时候灵光一闪想出来的,估计很多实现细节没有考虑,专门用于这种数据分库分表的场景,所以没有什么泛用性。

思路和操作系统的Copy on Write一脉相承,COW指的是,当复制某个对象时,我们没有必要真的复制,读的时候我们完全可以读原本的对象,只有写的时候才需要进行复制。

同理,在迁移数据时,我们没有必要真的迁移,读的时候我们完全可以从之前的节点读取,只有写的时候才需要迁移到新的节点。这本质上是惰性思想。

我们维护一张路由表,存储key->Address的历史条目。

  • Read时,从路由表获取节点,沿用之前的节点。(记忆)
  • Write时,转发给一致性hash计算的节点,并且更新路由表。(迁移)
代码语言:javascript复制
      if(method.equals("READ")){
            //有数据,沿用之前的服务器
            if(routeMap.containsKey(key))
                return findInvoker(invokers, routeMap.get(key));
                //无数据,直接使用Hash
            else {
                routeMap.put(key,invoke.getUrl().getAddress());
                return invoke;
            }
        }
        else if(method.equals("PUT")||method.equals("DELETE")){
            //有数据
            if(routeMap.containsKey(key)) {
                //节点增加,更新至新的服务器,这个时候delete是个假的delete,但是新服务器没数据所以等效,但是返回false
                if (!routeMap.get(key).equals( expectAddr))
                    routeMap.put(key,  expectAddr);
                return invoke;
            }
            //无数据,直接使用hash
            else {
                routeMap.put(key,   expectAddr);
                return invoke;
            }
        }

这样有什么好处呢?

  1. Cache - GET时直接使用路由表的节点,而不需要通过一致性hash计算,免去了一致性hash的开销,相当于一个cache。
  2. Lazy - 试想一下这种情况,某个数据在新增节点时进行了转移,结果还没有被读取,数据就被更新或者删除了,那么这个转移还有必要么?按照我们的算法,这种情况根本就不需要进行数据转移就可以直接PUT/DELETE!TOW用惰性的方式有效识别了这种额外开销并予以避免。
  3. Availability:均摊开销,我们只需要每次负载均衡时维护路由表即可,对于用户来说,增加节点是无感知的,数据迁移也通过惰性的方式在写的时候进行,并且没有带来额外开销。相比于停机而言,我们的分布式系统增加Data节点几乎毫无代价,可以平稳连续进行

当然也有缺陷

  1. 弱负载均衡性 - 初始时,原先节点的GET负载没有变化,随着写操作的增加,数据才逐渐转移到新增的节点中,最终达到负载均衡。
  2. 有状态 - Master需要维护这样的路由表记忆,而如果采取停机的方式,Master可以根据一致性Hash实时演算,不需要状态

总而言之,TOW负载均衡算法能够正确地转发请求,并且最终实现正确的数据切分。

负载均衡 数据切分


支持并发之分布式锁(多Client 单Master 多Data)

为了处理高并发的情况,我们需要避免多个写请求同时进入临界区,那么简单粗暴的分布式锁肯定是最佳选择。在这个系统中,最合理的分布式锁显然是读写锁。

zookeeper的临时顺序节点能有效帮助。(吐槽一句,Dubbo你实现的ZkClient里面怎么只有临时节点和永久节点,害得我专门copy源码加了个临时顺序节点的API)

顺序节点可以看成线性表,隐性的FIFO队列,因此我们可以套用读写锁的思路。

  • 读者等待写者离开临界区
  • 写者等待读写者离开临界区

这里就表现为: 读锁监听最近一个写锁节点的销毁,写锁监听上一个读写锁节点的销毁。同样为了避免泄露代码,放出参考思路:

在Master执行READ时,拿key粒度的读锁,在Master执行PUT/DELETE时拿写锁,从而避免了Race Condition的发生。

(这个实现果然很经不起推敲,崩溃的时候容易出现各种奇奇怪怪的东西,不过要高鲁棒性的算法那肯定不止这么几行代码)

代码语言:javascript复制
        Lock lock = new Lock("/rwlock" key);
        lock.lockWrite();
        RpcContext.getContext().setAttachment("key", key);
        Boolean ret =  slave.PUT(key,val);
        lock.unLockWrite();

数据备份(多Client 单Master 多Data Replicas)

我们还需要对Data节点进行多重备份,以便使得Data节点崩溃时存储的K-V数据不会丢失。维护备份数据一致性的方法有很多,在这里我使用基本的2PC协议进行实现。

2PC协议需要引入一个不存储数据的Coordinator节点,我使用Slave节点作为2PC的Coordinator,Data节点作为Participant。这里的Slave和上面出现区别,本身不存储数据,由DataAPI提供数据。

部署时,在property文件中配置group.id,然后在注解Service和Reference的时候增加group属性(这样做其实是不符合Reference规范的,他的意思是同一个接口的不同实现,这里单纯当分组用),group相同的节点视作处于同一2PC协议下。

这里需要注意的是,不能给Slave @DubboService加上group,因为根据Dubbo的规定

2.2.0 以上版本支持,总是只调一个可用组的实现

对于Master调Slave而言,我们需要发现所有的Slave节点;而对于Slave调Data才只需要关注少数。所以Data节点能加上group而Slave不行。

代码语言:javascript复制
@DubboService(version = "${demo.service.version}",group = "${group.id}")
DataService
@DubboReference(version = "${demo.service.version}",group = "${group.id}")
DataService

@DubboReference(version = "${demo.service.version}", loadbalance = "TOWLoadBalance")
    private SlaveAPI slave;

典型2PC协议

达成2PC协议,要求对三方进行修改,按照上面TOWLoadBalance的方式增加一个自定义LoadBalance以支持同时调用所有引用的服务。(这里是很Naive的实现,非礼勿视)

Slave - 先发出写请求,均返回yes,再发出COMMIT请求

代码语言:javascript复制
lock.lock();
if(data.DELETE(key))
    data.COMMIT();
lock.unlock();

LoadBalance - 写请求应当能够转发给所有Backup(下面这个代码只能实现转发,如果对方崩溃了不会导致整个函数崩溃,事实上没法做到ABORT,emmmm)

代码语言:javascript复制
if (invokers.size() > 1) {
     for (int i = 1; i < invokers.size(); i  ) {
       invokers.get(i).invoke(invocation);
      }
    }
return invokers.get(0);

Data - 支持事务回退,All-or-nothing

m是数据表,cas指的是Copy-And-Swap,是C 保证异常安全的手段,意思是修改时先在副本上进行修改,修改完成后再进行原子性的swap,从而替换数据。

在这里,PUT操作仅仅产生了一个新的CAS副本,并没有修改内存中的hash表,直到COMMIT时才完成修改。

(这里data的copy and swap其实很蠢,没必要copy整个表,其实只需要copy一个表项就行,不过我没怎么考虑性能,怎么实现方便怎么来就完事儿)

代码语言:javascript复制
    @Override
    public Boolean COMMIT() {
        m = cas;
        return true;
    }

    @Override
    public Boolean PUT(String key, String val) {
        cas = m;
        cas.put(key, val);
        return true;
    }

Primary/Backup(多Client 单Master 多Data Primary/Backup)

单纯的维护数据一致性是不够的,我们还需要确定谁是Primary。这里我采用了很naive的方法,列表最前的的就是Primary(不过要当心未同步的backup在index 0的特殊情况,后文处理),其他服务均是Backup。

PrimaryLoadBalance实现

读操作转发给Primary,写操作则调用所有服务,所有Backup的invoke均成功才能执行返回Primary并调用

代码语言:javascript复制
         if (method.equals("READ")||method.equals("SYNC")) {
            return invokers.get(0);
        } else if (method.equals("PUT") || method.equals("DELETE") || method.equals("COMMIT")) {
            if (invokers.size() > 1) {
                for (int i = 1; i < invokers.size(); i  ) {
                    invokers.get(i).invoke(invocation);
                }
            }
            return invokers.get(0);
        }

Primary/Backup

Primary节点负责读写,而Backup仅仅需要负责写。

Hot Fix(动态部署Backup)

即使按照最理想的情况也就是Replication State Machine复制状态机,我写的垃圾2PC实现真的保证了所有Data节点都按照相同的顺序执行了相同的操作(更何况这个2PC菜的起飞),那也仅仅保证了在相同的初态下节点之间能保持同步

如果我们需要动态增加某个group的backup来修复备份数目,那么他的初态是无数据,而其他的节点都已经存储了数据,此时2PC没有办法保证数据的一致性。我们需要让这个backup从primary获取数据进行同步。

同步数据,并且在Primary挂掉后顶替

简易实现,如果需要动态部署,那么配置文件中写入group.hotfix =1,通过hotfix变量判断Data节点是否需要同步。

代码语言:javascript复制
@Value("${group.hotfix}")
    private String hotfix;

Data在执行写操作前,通过SYNC指令访问自己的Coordinator(Slave)获取Primary的全部数据。由于Slave已经针对写操作加锁,所以不用担心同时出现多个写操作同时进入Check并进入临界区,这里不需要对Data的写操作加锁。

问题在于,Dubbo里的服务发现只能通过接口、组、版本来完成,但是Master对Slave的要求使得Slave不能具备组。事实上,现在是Slave单方面知道Data的组,而反过来一无所知

那么,Data应该如何获取Primary呢?毕竟Slave才是2PC协议的管理者,才知道真正的Primary。这就要谈到之前的设计了,为什么不在Backup创建时就直接同步Primary,而是在写的时候再去同步?

Slave单方面知道Data,但调用Data的时候能够顺带传递参数,使得Data临时知道Slave的地址。在这里我用了RPC上下文,把TOW时获得的Slave地址最终传递给了Data。每次RPC调用都会使得参数失效,因此调用后都需要重置。

代码语言:javascript复制
@DubboReference(version = "${demo.service.version}",loadbalance = "IPLoadBalance",check=false)
   private SlaveAPI slave;

void Check(){
        if(hotfix.equals("1")) {
            String address = RpcContext.getContext().getAttachment("URL");
            //RpcContext.getContext().setAttachment("URL",address);
            System.out.println("SYNC ");
            System.out.println(address);
            m = slave.SYNC();
            System.out.println(m.toString());
            System.out.println("SYNCED ");
            hotfix = "0";
        }
    }

与此同时,获得了地址的Backup需要找到Slave,这就需要我们继续重写负载均衡策略。这就是单纯从一组服务里筛选出地址相同的服务即可。

代码语言:javascript复制
public class IPLoadBalance extends AbstractLoadBalance {
    protected <T> Invoker<T> findInvoker(List<Invoker<T>> invokers, String addr){
        for( Invoker<T> invoke: invokers)
            if(invoke.getUrl().getAddress().equals(addr))
                return invoke;
        return null;
    }
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String address = RpcContext.getContext().getAttachment("URL");
        System.out.println(address);
        return findInvoker(invokers,address);
    }
}

除了hotfix之外,Data节点的所有配置都相同,且hotfix并不影响Primary/Backup,因此动态部署的Backup也能自动成为Primary,和其他节点没有区别。当心,Backup没准直接跑列表最前面去了,因此我们需要记忆primary以防止节点增加时错把新增的backup当成了primary,不然就变成我同步我自己了。

代码语言:javascript复制
if(primary.equals(""))
            primary = invokers.get(0).getUrl().getAddress();
        //handle加入新节点的情况,沿用之前的primary
        Invoker<T> pri= findInvoker(invokers,primary);
        if(pri == null) {
            //primary 挂了,决议新的
            pri = invokers.get(0);
            primary = invokers.get(0).getUrl().getAddress();
        }

        if (method.equals("READ")||method.equals("SYNC")) {
            return pri;
        } else if (method.equals("PUT") || method.equals("DELETE") || method.equals("COMMIT")) {
                for (int i = 0; i < invokers.size(); i  ) {
                    if(!invokers.get(i).getUrl().getAddress().equals(primary))
                    invokers.get(i).invoke(invocation);
                }
            return pri;
        }
        return pri;

分布式系统架构

问题在于,我们可以很明显的发现,在节点与节点之间,有这么几个部分是仅有一条路径的,用图论来表示就是bridge。他们是:

  • Master : 存储历史一致性hash结果的路由表用于惰性数据迁移
  • Slave Coordinator: 存储Primary信息以防止动态部署的Backup自己同步自己

用FP的说法来说,副作用是万恶之源,同样是函数,副作用的函数调用一百次可能有一百个结果,而纯函数的调用只要输入确定,输出始终如一。

这个思想在分布式中同样适用,想要维持100个一致的数据节点难上加难,但提供100个纯函数无状态服务器集群则易如反掌。只要把数据分离出去,Replicas就是板上定钉的事。

分离数据(多Client 多Master 单Router 多Data Primary/Backup)

现在,我们不在Master节点存储数据了。所有负载均衡中进行的读写操作都转为从Router中获取。

和往常一样,我们建立Router的RPC对象,根据下面的代码,需要实现三个原语:CONTAIN/GET/PUT。

代码语言:javascript复制
if(method.equals("READ")){
            //有数据,沿用之前的服务器
            if(router.CONTAIN(key))
                return findInvoker(invokers, router.GET(key));
                //无数据,直接使用Hash
            else {
                router.PUT(key,invoke.getUrl().getAddress());
                return invoke;
            }
        }
        else if(method.equals("PUT")||method.equals("DELETE")){
            //有数据
            if(router.CONTAIN(key)) {
                //节点增加,更新至新的服务器,这个时候delete是个假的delete,但是新服务器没数据所以等效,但是返回false
                if (!router.GET(key).equals( expectAddr))
                    router.PUT(key,  expectAddr);
                return invoke;
            }
            //无数据,直接使用hash
            else {
                router.PUT(key, expectAddr);
                return invoke;
            }
        }

已经实现了Data,这个东西只不过是照搬一下,而且因为不需要数据切分,也不需要group。啊,易如反掌,这不就是Copy and Paste么。

然而,问题来了!!

我们的router是在LoadBalance里面实现的,而这个东西本身通过@Reference注入SlaveAPI的RPC对象,也就是说,要在依赖注入之后进行第二次依赖注入。

爷吐了。心情见application名。

尝试了整个下午一无所获,issue也没啥人理,那就算了,我用noob的API配置来做,反正也没有性能需求。以下方式手动获得RPC对象。

代码语言:javascript复制
ApplicationConfig application = new ApplicationConfig();
        application.setName("NMSLWSND");
        RegistryConfig registry = new RegistryConfig();
        registry.setAddress("zookeeper://127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183");
        ReferenceConfig<RouterAPI> reference = new ReferenceConfig<RouterAPI>();
        reference.setApplication(application);
        reference.setRegistry(registry);
        reference.setInterface(RouterAPI.class);
        reference.setVersion("1.0.0");
        router = reference.get();

现在,路由表是从远端获取的了,然而这有什么好处呢?远端获取还容易失败。很简单,因为数据分离了,我们的Master节点已经是无状态的了,现在的它可以随意集群化,随意负载均衡,反正有读写锁的存在,不同Master之间也能保持并发的正确性。

Master的单点故障已经消除了,现在,单点故障转移到了Router这边。

路由表备份(多Client 多Master 多Router Primary/Backup 多Data Primary/Backup)

按照之前的2PC协议,进行同样的实现,引入RouterCoordinator和RouterData,和之前的Slave按照同样的方法实现。

现在,数据都是多备份的了,节点崩溃也没关系,反正我们还能动态部署并数据同步,让备份维持在安全的数量。

然而,最后再来看一看,2PC引入的Coordinator本身会不会就是原罪呢?如果Coordinator崩溃,如果我们及时重新部署,的确可以恢复服务,但是在崩溃的期间,显然系统是不可用的。上文已经提到了,Coordinator其实保存了Primary节点的地址,虽然很小,但也是不可忽略的,有状态就意味着复杂的数据一致性。

对于维持这种小数据的一致性,其实Zookeeper本身就有了支持。

分离Coordinator元数据(多Client 多Master 多Router Primary/Backup 多Data Primary/Backup 多Coordinator)

还没有进行备份的数据就只剩下每个2PC中Primary的地址了,只要分离出它,放到zookeeper的节点进行CP的存储,那么Coordinator不就是无状态的么?

继续利用Dubbo提供的Curator客户端,顺便加上非常奇怪但它就是没有的SetContent接口,我们每次先从远端读取primary地址,进行本地计算判断primary是否失效,然后把新决议的primary写回zookeeper。

代码语言:javascript复制
if (!zkClient.checkExists(nodeName)) {
            zkClient.createPersistent(nodeName);
        }
        primary = zkClient.getContent(nodeName);
        String method = invocation.getMethodName();
        if(primary.isEmpty()) {
            primary = invokers.get(0).getUrl().getAddress();
            zkClient.setContent(nodeName,primary);
        }
        Invoker<T> pri= findInvoker(invokers,primary);
        if(pri == null) {
            //primary 挂了,决议新的
            pri = invokers.get(0);
            primary = invokers.get(0).getUrl().getAddress();
            zkClient.setContent(nodeName,primary);
        }

现在,Coordinator已经无状态了,想要集群化 负载均衡也很容易,此时,如果我们需要修改路由表,负载会均摊到所有coordinator身上。

另一个问题出现了,正常跑的时候还好,一旦把原来的primary枪毙了,性能就下降很多。什么原因呢?

在primary消失的上面这段代码,其实是并发的,所有的请求都可以进入临界区。所有的指令都在执行着setContent这个指令,并且每个setContent都会被接受。我们实际上并不是更新一次primary,而是这段时间内所有请求的primary。对于coordinator集群来说,造成了庞大的并发写,本来就是CP协议的zookeeper自然不堪重负了。

在OS中也有类似的概念,对于同一缓存行的高度竞争会导致锁的性能大量下降,从而影响了scalability。

解决方法很简单,在setContent操作前后加锁。

最终架构

通过一堆花里胡哨的分离,最后从单Client 单Master 两Data的小系统变成了这张复杂的架构图,使用敏捷开发里强调的增量式开发。

全程实际上就是在不断地进行迭代,数据和服务进行分离,数据通过一致性协议完成备份,服务通过集群化完成备份,从而消除单点故障。

源代码如下,附带实验报告与文档。

演示视频如下,名字忽略就好反正没啥隐私,懒得打包主要太累,就直接IDEA运行了。


Issue 1: 解决单invoker时不触发负载均衡,导致路由表或Primary无法维护的问题

优先级:高,影响到了可缩性,限制数据集群数目下限至少是3(1个缓冲)

这个问题如果一直只有一个服务或者多个服务都不会出现,问题在于从一个服务增长到多个服务的情况,之前没有存储信息,后面突然要用,就会有问题。

Dubbo为了性能贴心地给我们做了集群节点数目为1的优化,然而我们并不想要他帮我们跳过,所以我们要重写框架。

服务发现源码分析

读Dubbo框架源码,Dubbo在单invoker情况下进行了优化。

代码语言:javascript复制
public abstract class AbstractLoadBalance implements LoadBalance {
     public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        } else {
            return invokers.size() == 1 ? (Invoker)invokers.get(0) : this.doSelect(invokers, url, invocation);
        }
    }

我们在自定义LoadBalance实现中进行重载,结果还是不行

代码语言:javascript复制
    @Override
    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        } else {
            return this.doSelect(invokers, url, invocation);
        }
    }

这里就要讲Dubbo服务发现的原理了,Directory对应的是zookeeper中注册的服务列表,Router是按照条件筛选服务,LoadBalance是按照算法选择服务。

这里的Cluster就是把一组服务伪装成一个服务,也就是按照我们上面定义的Directory、Router、LoadBalance等规则选出Invoker,作为集群被调用的Invoker。

集群架构

我们来看一看Cluster是怎样通过这些规则进行计算的。

Directory:根据URL从Router Chain筛选Invoker

代码语言:javascript复制
public abstract class AbstractDirectory<T> implements Directory<T> {
    protected RouterChain<T> routerChain;
}
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
    private void refreshInvoker(List<URL> invokerUrls) {
            List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList(newUrlInvokerMap.values()));
            this.routerChain.setInvokers(newInvokers);
            this.invokers = this.multiGroup ? this.toMergeInvokerList(newInvokers) : newInvokers;
            this.urlInvokerMap = newUrlInvokerMap;
        }
    }

RouterChain: 每个路由依次进行筛选

代码语言:javascript复制
    public List<Invoker<T>> route(URL url, Invocation invocation) {
        List<Invoker<T>> finalInvokers = this.invokers;
        Router router;
        for(Iterator var4 = this.routers.iterator(); var4.hasNext(); finalInvokers = router.route(finalInvokers, url, invocation)) {
            router = (Router)var4.next();
        }
        return finalInvokers;
    }

Router:根据配置项进行筛选

代码语言:javascript复制
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
                Iterator var5 = invokers.iterator();
                while(var5.hasNext()) {
                    Invoker<T> invoker = (Invoker)var5.next();
                    if (this.matchThen(invoker.getUrl(), url)) {
                        result.add(invoker);
                    }

Cluster : 根据Directory创建集群Invoker,通过directory/router获取invokers

代码语言:javascript复制
public class FailoverCluster extends AbstractCluster { 
    public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker(directory);
    }

ClusterInvoker:doInvoke->AbstractInvoker的select

代码语言:javascript复制
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
   
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
      Invoker<T> invoker = this.select(loadbalance, invocation, copyInvokers, invoked);
    }
}

AbstractInvoker:select->doselect->loadBalance的select

代码语言:javascript复制
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {

    protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
                Invoker<T> invoker = this.doSelect(loadbalance, invocation, invokers, selected);
                if (sticky) {
                    this.stickyInvoker = invoker;
                }
           }
    private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        } else if (invokers.size() == 1) {
            return (Invoker)invokers.get(0);
        } else {
            Invoker<T> invoker = loadbalance.select(invokers, this.getUrl(), invocation);

可以看出,关键在于AbstractInvoker的doSelect这一步,将单个服务的集群直接跳过了。我们需要重写Abstract Invoker,Dubbo是开源的,如果直接fork源码那这一步轻而易举,问题是maven项目里这些都是只读的。

WDNMD这个doSelect居然是个Pirvate!!

WDNMD这就是滥用继承么?写个Private强制继承实现?Protected会死么?

我们首先更换集群策略为FailureBack,只执行一次操作不重连,适合幂等操作,便于实现。原本的select方法无法重写,因此我们直接访问loadbalance,相当于单纯做转发,其他没法访问的代码就全部删除了。

代码语言:javascript复制
 public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        this.checkInvokers(invokers, invocation);
        Invoker invoker = loadbalance.select(invokers, this.getUrl(), invocation);

        try {
            return invoker.invoke(invocation);
        } catch (Throwable var6) {
            if (var6 instanceof RpcException && ((RpcException)var6).isBiz()) {
                throw (RpcException)var6;
            } else {
                throw new RpcException(var6 instanceof RpcException ? ((RpcException)var6).getCode() : 0, "Failfast invoke providers "   invoker.getUrl()   " "   loadbalance.getClass().getSimpleName()   " select from all providers "   invokers   " for service "   this.getInterface().getName()   " method "   invocation.getMethodName()   " on consumer "   NetUtils.getLocalHost()   " use dubbo version "   Version.getVersion()   ", but no luck to perform the invocation. Last error is: "   var6.getMessage(), var6.getCause() != null ? var6.getCause() : var6);
            }
        }
    }

等等,为什么还不行!!!!!!!!!!!!!!!!!!!!!!!!!!

难道是因为Cluster的dojoin一开始就没有被调用么?你为什么这么熟练啊,为什么一个边界检测能检查这么多次啊?测试的工资别发那么高行不行?天天测边界测个锤子。

经过了化身恶魔之后,我继续看哪里还有边界检测,答案是在引用监听里面。一开始的时候URL只有一条就没有集群。

代码语言:javascript复制
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
           @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
                  private T createProxy(Map<String, String> map) {
           if (urls.size() == 1) {
                invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
            }  
           else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                    if (UrlUtils.isRegistry(url)) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { // registry url is available
                    // for multi-subscription scenario, use 'zone-aware' policy by default
                    String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
                    // The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                    invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
                }

我选择死亡。我不走这条路了。我宣布,需要维护数据的集群节点不得小于2.

没法改maven项目的源码好气啊,这就是框架的痛么。


Issue 2: 解决2PC协议是异步invoke,导致backup故障primary依旧能正常调用的问题

优先级: 中,反正都已经发现服务了,调用失败概率很小,corner case摸了摸了

思路: 同步化,捕获调用异常,由Slave Abort请求

Issue 3: 解决Zookeeper集群在setContent时并发getContent,以返回旧版本的问题

优先级: 中,不搭zk集群完事儿

思路: 加细粒度锁,限制getContent时机防止脏读

Issue 4: 解决读写分离/按地址访问不太符合Load Balance的Balance的问题

优先级: 低,语义问题摸了摸了

思路:算法大概前置到路由层,不过因为这里路由没工作,为了实现简单我就统一扔这里了。

思路: Cluster里有BroadCast类型,大概写操作放那里?不过这也不是广播,难顶。

0 人点赞