前言
关于手写RPC框架的推文接近尾声了!大家一起撒花庆祝,动手实践起来吧!
RPC框架代码量较多,将仅对核心过程进行梳理,完整代码见:https://github.com/wdw87/wRpc
在这篇推文中,将介绍注册中心相关的内容。
在本项目的系统推文中,将对项目进行详细的介绍。
主要将按照下面的内容进行分配(蓝色字体可戳):
手写RPC框架(一) | RPC简介、技术栈介绍、测试Demo |
---|---|
手写RPC框架(二) | 远程通信实现 |
手写RPC框架(三) | 制定协议与编解码器、动态代理 |
手写RPC框架(四) | 注册中心 |
Rpc框架示意图
七、注册中心
我们已经梳理过了RPC框架所要实现的主要部分,还剩下最后一个问题:客户端如何得知服务所在的具体服务器?
这时候就需要注册中心出场了
服务端,即Provider将拥有的服务以及自己的ip注册到注册中心中,客户端,即Consumer监听注册中心,从而得知服务所在的服务器列表。
Zookeeper实现的注册中心
本框架采用Zookeeper实现注册中心。
ZooKeeper的数据模型很简单,就是一棵树,作为注册中心,ZooKeeper的数据模型完全够用。看下图:
支持事件监听
ZooKeeper有一个Watcher机制。用户可以在节点上注册一些Watcher,并且当一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上。正是因为Watcher机制,才可以满足服务订阅的需求:服务消费者可以订阅某个服务,当服务提供者地址更新或者该服务有新的节点加入到集群中时,订阅该服务的服务消费者可以感知到的需求。
崩溃恢复模式
ZooKeeper的崩溃恢复模式能保证注册中心崩溃或者断连后,重启可以自动恢复注册数据以及订阅请求,因为这个时候会有新的Leader服务器与该重启的服务器进行数据同步。
框架使用zkclient作为Zookeeper客户端。
服务注册
在服务端启动时,首先根据xml配置文件,将服务实现类注册为一个Spring Bean,交由Spring管理。同时将服务发布到Zookeeper
代码语言:javascript复制@Data
@Slf4j
public class ServiceConfig implements ApplicationContextAware, InitializingBean {
private String id;
private String name;
private String ref;
private ApplicationContext applicationContext;
private ServiceRegistry serviceRegistry = ServiceRegistry.getInstance();
@Override
public void afterPropertiesSet() throws Exception {
if (!applicationContext.containsBean("server")) {
log.info("没有配置server,不能发布到注册中心");
return;
}
if (!applicationContext.containsBean("registry")) {
log.info("registry,不能发布到注册中心");
return;
}
ApplicationContext context = SpringUtil.getApplicationContext();
//获取服务提供者信息,ip等
ServerConfig serverConfig = (ServerConfig)context.getBean("server");
//获取服务提供类的实例,实例通过xml配置的方式注册为一个Spring Bean,交由Spring管理
RegistryConfig registryConfig = (RegistryConfig)context.getBean("registry");
//连接zookeeper
serviceRegistry.connectServer(serverConfig,registryConfig);
//将服务发布到注册中心
serviceRegistry.register(name);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
服务发现
客户端连接zookeeper,并监听相关服务,本部分代码较长,省略了相关部分:
代码语言:javascript复制public class ServiceDiscovery {
...
private volatile List<String> serviceList = new ArrayList<>();
private ConcurrentHashMap<String, List<String>> serverMap = new ConcurrentHashMap<>();
private ZkClient zkClient;
public void discovery(String serviceName) {
//获取本地所有服务
importService(serviceName);
// 发布客户端引用到注册中心
registerReference(serviceName);
// 获取引用
// 缓存引用
getReference(serviceName);
// 订阅服务变化
subscribeServiceChange(serviceName);
}
...
}
可以看出,在客户端初始化过程中,连接zookeeper后,会执行如下过程来发现服务:
- 查询本地服务存根
private void importService(String name) {
boolean flag = false;
List<Class<?>> services = ClassUtils.getClasses(name.substring(0,name.lastIndexOf(".")));
//扫描服务接口所在包,查询是否有配置文件中对应的借口
for (Class<?> clazz : services) {
if(clazz.getName().equals(name)){
serviceList.add(name);
flag = true;
break;
}
}
//如果服务接口信息与配置文件不符,报错
if(!flag){
log.error("No such interface {} ,please check your config..", name);
throw new RuntimeException("No such interface");
}
}
- 发布客户端引用到注册中心
将客户端信息作为一个临时节点,挂载在注册中心中相应服务目录下,表示引用了这个服务
代码语言:javascript复制private void registerReference(String service) {
String ip = RpcContext.getLocalIp();
String basePath = ZK_REGISTRY_PATH "/" service "/consumers";
String path = basePath "/" ip;
createPersistent(basePath);
//临时节点
if (!zkClient.exists(path)) {
zkClient.createEphemeral(path);
}
log.info("客户端引用发布成功:[{}]", path);
}
- 获取服务引用,并放入serverMap中缓存
private void getReference(String service) {
//删除旧的缓存
if(serverMap.containsKey(service)) {
serverMap.remove(service);
}
log.info("正在获取服务引用[{}]", service);
String path = ZK_REGISTRY_PATH "/" service "/providers";
List<String> serverList = null;
try {
serverList = zkClient.getChildren(path);
} catch (Exception e) {
log.error("服务器无此服务,请检查相关配置");
e.printStackTrace();
}
log.info("发现服务器列表" serverList);
serverMap.put(service, serverList);
log.info("引用服务获取完成[" path "]:" service);
;
}
- 监听服务变化
监听订阅的服务,当服务器列表发生变化时(例如有新的服务器加入了服务列表,或者有服务器发生故障),获取最新的服务列表
代码语言:javascript复制private void subscribeServiceChange(String service) {
String path = ZK_REGISTRY_PATH "/" service "/providers";
log.info("订阅服务变化[{}]", service);
zkClient.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
serverMap.remove(service);
serverMap.put(service, currentChilds);
log.info("服务[{}]发生变化,当前服务节点为{}", service, currentChilds);
}
});
}
项目讲解系列至此结束!
完整代码见:https://github.com/wdw87/wRpc
作者:好吃懒做贪玩东
编辑:西瓜媛