项目推荐 I 手写RPC框架(四)

2022-04-11 18:49:21 浏览数 (1)

前言

关于手写RPC框架的推文接近尾声了!大家一起撒花庆祝,动手实践起来吧!

RPC框架代码量较多,将仅对核心过程进行梳理,完整代码见:https://github.com/wdw87/wRpc

在这篇推文中,将介绍注册中心相关的内容。

在本项目的系统推文中,将对项目进行详细的介绍

主要将按照下面的内容进行分配(蓝色字体可戳):

手写RPC框架(一)

RPC简介、技术栈介绍、测试Demo

手写RPC框架(二)

远程通信实现

手写RPC框架(三)

制定协议与编解码器、动态代理

手写RPC框架(四)

注册中心

Rpc框架示意图

七、注册中心

我们已经梳理过了RPC框架所要实现的主要部分,还剩下最后一个问题:客户端如何得知服务所在的具体服务器?

这时候就需要注册中心出场了

服务端,即Provider将拥有的服务以及自己的ip注册到注册中心中,客户端,即Consumer监听注册中心,从而得知服务所在的服务器列表。

Zookeeper实现的注册中心

本框架采用Zookeeper实现注册中心。

ZooKeeper的数据模型很简单,就是一棵树,作为注册中心,ZooKeeper的数据模型完全够用。看下图:

支持事件监听

ZooKeeper有一个Watcher机制。用户可以在节点上注册一些Watcher,并且当一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上。正是因为Watcher机制,才可以满足服务订阅的需求:服务消费者可以订阅某个服务,当服务提供者地址更新或者该服务有新的节点加入到集群中时,订阅该服务的服务消费者可以感知到的需求。

崩溃恢复模式

ZooKeeper的崩溃恢复模式能保证注册中心崩溃或者断连后,重启可以自动恢复注册数据以及订阅请求,因为这个时候会有新的Leader服务器与该重启的服务器进行数据同步。

框架使用zkclient作为Zookeeper客户端。

服务注册

在服务端启动时,首先根据xml配置文件,将服务实现类注册为一个Spring Bean,交由Spring管理。同时将服务发布到Zookeeper

代码语言:javascript复制
@Data
@Slf4j
public class ServiceConfig implements ApplicationContextAware, InitializingBean {

   private String id;
   private String name;
   private String ref;
   private ApplicationContext applicationContext;
   private ServiceRegistry serviceRegistry = ServiceRegistry.getInstance();

   @Override
   public void afterPropertiesSet() throws Exception {
       if (!applicationContext.containsBean("server")) {
           log.info("没有配置server,不能发布到注册中心");
           return;
      }
       if (!applicationContext.containsBean("registry")) {
           log.info("registry,不能发布到注册中心");
           return;
      }
       ApplicationContext context = SpringUtil.getApplicationContext();
       //获取服务提供者信息,ip等
       ServerConfig serverConfig = (ServerConfig)context.getBean("server");
       //获取服务提供类的实例,实例通过xml配置的方式注册为一个Spring Bean,交由Spring管理
       RegistryConfig registryConfig = (RegistryConfig)context.getBean("registry");
       //连接zookeeper
       serviceRegistry.connectServer(serverConfig,registryConfig);
       //将服务发布到注册中心
       serviceRegistry.register(name);
  }
   @Override
   public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
       this.applicationContext = applicationContext;
  }
}

服务发现

客户端连接zookeeper,并监听相关服务,本部分代码较长,省略了相关部分:

代码语言:javascript复制
public class ServiceDiscovery {

...

   private volatile List<String> serviceList = new ArrayList<>();
   private ConcurrentHashMap<String, List<String>> serverMap = new ConcurrentHashMap<>();
   private ZkClient zkClient;

   public void discovery(String serviceName) {
       //获取本地所有服务
       importService(serviceName);
       // 发布客户端引用到注册中心
       registerReference(serviceName);
       // 获取引用
       // 缓存引用
       getReference(serviceName);
       // 订阅服务变化
       subscribeServiceChange(serviceName);
  }

  ...
}

可以看出,在客户端初始化过程中,连接zookeeper后,会执行如下过程来发现服务:

  • 查询本地服务存根
代码语言:javascript复制
private void importService(String name) {
   boolean flag = false;
   List<Class<?>> services = ClassUtils.getClasses(name.substring(0,name.lastIndexOf(".")));
   //扫描服务接口所在包,查询是否有配置文件中对应的借口
   for (Class<?> clazz : services) {
       if(clazz.getName().equals(name)){
           serviceList.add(name);
           flag = true;
           break;
      }
  }
   //如果服务接口信息与配置文件不符,报错
   if(!flag){
       log.error("No such interface {} ,please check your config..", name);
       throw new RuntimeException("No such interface");
  }
}
  • 发布客户端引用到注册中心

将客户端信息作为一个临时节点,挂载在注册中心中相应服务目录下,表示引用了这个服务

代码语言:javascript复制
private void registerReference(String service) {
   String ip = RpcContext.getLocalIp();

   String basePath = ZK_REGISTRY_PATH   "/"   service   "/consumers";
   String path = basePath   "/"   ip;

   createPersistent(basePath);
   //临时节点
   if (!zkClient.exists(path)) {
       zkClient.createEphemeral(path);
  }
   log.info("客户端引用发布成功:[{}]", path);
}
  • 获取服务引用,并放入serverMap中缓存
代码语言:javascript复制
private void getReference(String service) {
   //删除旧的缓存
   if(serverMap.containsKey(service)) {
       serverMap.remove(service);
  }
   log.info("正在获取服务引用[{}]", service);
   String path = ZK_REGISTRY_PATH   "/"   service   "/providers";
   List<String> serverList = null;
   try {
       serverList = zkClient.getChildren(path);
  } catch (Exception e) {
       log.error("服务器无此服务,请检查相关配置");
       e.printStackTrace();
  }
   log.info("发现服务器列表"   serverList);
   serverMap.put(service, serverList);
   log.info("引用服务获取完成["   path   "]:"   service);
  ;
}
  • 监听服务变化

监听订阅的服务,当服务器列表发生变化时(例如有新的服务器加入了服务列表,或者有服务器发生故障),获取最新的服务列表

代码语言:javascript复制
private void subscribeServiceChange(String service) {
   String path = ZK_REGISTRY_PATH   "/"   service   "/providers";
   log.info("订阅服务变化[{}]", service);
   zkClient.subscribeChildChanges(path, new IZkChildListener() {
       @Override
       public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
           serverMap.remove(service);
           serverMap.put(service, currentChilds);
           log.info("服务[{}]发生变化,当前服务节点为{}", service, currentChilds);
      }
  });
}

项目讲解系列至此结束!

完整代码见:https://github.com/wdw87/wRpc

作者:好吃懒做贪玩东

编辑:西瓜媛

0 人点赞