源码阅读之dubbo服务发布过程分析
dubbo 是基于 spring 配置来实现服务的发布的,那么一定是基于 spring的扩展来写了一套自己的标签,那么 spring 是如何解析这些配置呢?总的来说,就是可以通过 spring 的扩展机制来扩展自己的标签。大家在dubbo 配置文件中看到的dubbo:service ,就是属于自定义扩展标签
dubbo 配置文件中看到的dubbo:service ,就是属于自定义扩展标签要实现自定义扩展,有三个步骤(在 spring 中定义了两个接口,用来实现扩展)
- NamespaceHandler: 注册一堆 BeanDefinitionParser,利用他们来进 行解析
- BeanDefinitionParser:用于解析每个 element 的内容
- Spring 默认会加载 jar 包下的 META-INF/spring.handlers 文件寻找对 应的 NamespaceHandler。 以下是 Dubbo-config 模块下的 dubbo-config-spring
Dubbo 的接入实现 Dubbo 中 spring 扩展就是使用 spring 的自定义类型,所以同样也有NamespaceHandler、BeanDefinitionParser。而 NamespaceHandler 是DubboNamespaceHandler
代码语言:javascript复制public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
}
}
BeanDefinitionParser 全部都使用了 DubboBeanDefinitionParser,如果我们向看dubbo:service的配置,就直接看 DubboBeanDefinitionParser中 这个里面主要做了一件事,把不同的配置分别转化成spring容器中的bean对象 application 对应 ApplicationConfig registry 对应 RegistryConfig monitor 对应 MonitorConfig provider 对应 ProviderConfig consumer 对应 ConsumerConfig … 为了在 spring 启动的时候,也相应的启动 provider 发布服务注册服务的过程,而同时为了让客户端在启动的时候自动订阅发现服务,加入了两个bean ServiceBean、ReferenceBean。 分别继承了 ServiceConfig 和 ReferenceConfig 同 时 还 分 别 实 现 了 InitializingBean 、 DisposableBean,ApplicationContextAware, ApplicationListener, BeanNameAware InitializingBean 接口为 bean 提供了初始化方法的方式,它只包括afterPropertiesSet 方法,凡是继承该接口的类,在初始化 bean 的时候会执行该方法。 DisposableBean bean 被销毁的时候,spring 容器会自动执行 destory 方法,比如释放资源 ApplicationContextAware 实现了这个接口的 bean,当 spring 容器初始化的时候,会自动的将 ApplicationContext 注入进来 ApplicationListener ApplicationEvent 事件监听,spring 容器启动后会发一个事件通知 BeanNameAware 获得自身初始化时,本身的 bean 的 id 属性 那么基本的实现思路可以整理出来了
- 利用 spring 的解析收集 xml 中的配置信息,然后把这些配置信息存储 到 serviceConfig 中
- 调用 ServiceConfig 的 export 方法来进行服务的发布和注册
serviceBean 是服务发布的切入点,通过 afterPropertiesSet 方法,调用export()方法进行发布。 export 为父类 ServiceConfig 中的方法,所以跳转到 SeviceConfig 类中的export 方法
代码语言:javascript复制public synchronized void export() {
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && ! export.booleanValue()) {
return;
}
if (delay != null && delay > 0) {
Thread thread = new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(delay);
} catch (Throwable e) {
}
doExport();
}
});
thread.setDaemon(true);
thread.setName("DelayExportServiceThread");
thread.start();
} else {
doExport();
}
}
我们发现,delay 的作用就是延迟暴露,而延迟的方式也很直截了当,Thread.sleep(delay)
- export 是 synchronized 修饰的方法。也就是说暴露的过程是原子操作,正常情况下不会出现锁竞争的问题,毕竟初始化过程大多数情况下都是单一线程操作,这里联想到了 spring 的初始化流程,也进行了加锁操作,这里也给我们平时设计一个不错的启示:初始化流程的性能调优优先级应该放的比较低,但是安全的优先级应该放的比较高!
- 继续看 doExport()方法。同样是一堆初始化代码
export 的过程 继续看 doExport(),最终会调用到 doExportUrls()中:
代码语言:javascript复制 @SuppressWarnings({ "unchecked", "rawtypes" })
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);//是不是获得注册中心的配置
for (ProtocolConfig protocolConfig : protocols) { //是不是支持多协议发布
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
这 个 protocols 长 这 个 样 子 <dubbo:protocol name=“dubbo” port=“20888” id=“dubbo” /> protocols 也是根据配置装配出来的。接下来让我们进入 doExportUrlsFor1Protocol 方法看看 dubbo 具体是怎么样将服务暴露出去的
代码语言:javascript复制if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
//拿到所有url 循环
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " interfaceClass.getName() " url " url " to registry " registryURL);
}
//通过proxyFactory来获取Invoker对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//注册服务
Exporter<?> exporter = protocol.export(invoker);
//将exporter添加到list中
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
这个URL很眼熟,没错在注册中心看到的services的providers信息就是这个在上面这段代码中可以看到 Dubbo 的比较核心的抽象:Invoker, Invoker是一个代理类,从 ProxyFactory 中生成。 这个地方可以做一个小结
- Invoker - 执行具体的远程调用(这块后续单独讲)
- Protocol – 服务地址的发布和订阅
- Exporter – 暴露服务或取消暴露
protocol.export(invoker) protocol 这个地方,其实并不是直接调用 DubboProtocol 协议的 export,protocol 这个属性是在哪里实例化的?以及实例化的代码 是什么?
代码语言:javascript复制//Protocol$Adaptive.java
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
实际上这个 Protocol 得到的应该是一个 Protocol
Adaptive 这个动态类的 export 方法。我们看看这段代码
代码语言:javascript复制public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" url.toString() ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName); 这 段 代 码 中 ,ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName); 当 extName 为 registry 的时候,我们不需要再次去阅读这块代码了,直接可以在扩展点中找到相应的实现扩展点[/dubbo-registry-api/src/main/resources/META- INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol] 配置如下
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol 所以,我们可以定位到 RegistryProtocolRegistryProtocol 好这个类中的export 方法
代码语言:javascript复制public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
// 订阅override数据
// FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保证每次export都返回一个新的exporter实例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}
doLocalExport 本地先启动监听服务
代码语言:javascript复制@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker){
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return (ExporterChangeableWrapper<T>) exporter;
}
通过 Wrapper对 Protocol 进 行 装 饰 , 装 饰 器 分 别 为 : ProtocolFilterWrapper/ProtocolListenerWrapper;
ProtocolFilterWrapper 这个类非常重要,dubbo 机制里面日志记录、超时等等功能都是在这一部分实现的 这个类有 3 个特点, 第一它有一个参数为 Protocol protocol 的构造函数; 第二,它实现了 Protocol 接口; 第三,它使用责任链模式,对 export 和 refer 函数进行了封装;部分代码 如下
代码语言:javascript复制public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
DubboProtocol类的export
代码语言:javascript复制public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispaching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice){
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
if (logger.isWarnEnabled()){
logger.warn(new IllegalStateException("consumer [" url.getParameter(Constants.INTERFACE_KEY)
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//暴露服务
openServer(url);
return exporter;
}
dubboProtocol 的 export 方法:openServer(url)
代码语言:javascript复制private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client 也可以暴露一个只有server可以调用的服务。
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
//没有的话就创建服务
serverMap.put(key, createServer(url));
} else {
//server支持reset,配合override功能使用
server.reset(url);
}
}
}
createServer
代码语言:javascript复制private ExchangeServer createServer(URL url) {
//默认开启server关闭时发送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
//默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " str ", url: " url);
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
ExchangeServer server;
try {
//绑定服务
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " url ") " e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " str);
}
}
return server;
}
Exchangers. bind()
代码语言:javascript复制public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
GETEXCHANGER 通过 ExtensionLoader 获得指定的扩展点,type 默认为 header
代码语言:javascript复制public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
HeaderExchanger.bind
代码语言:javascript复制public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
Transporters.bind
代码语言:javascript复制public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
代码语言:javascript复制public HeaderExchangeServer(Server server) {
if (server == null) {
throw new IllegalArgumentException("server == null");
}
this.server = server;
this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
//心跳
startHeatbeatTimer();
}
private void startHeatbeatTimer() {
//关闭心跳定时
stopHeartbeatTimer();
if (heartbeat > 0) {
//每隔heartbeat时间执行一次
heatbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
public Collection<Channel> getChannels() {
return Collections.unmodifiableCollection(
HeaderExchangeServer.this.getChannels() );
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat,TimeUnit.MILLISECONDS);
}
}
private void stopHeartbeatTimer() {
try {
ScheduledFuture<?> timer = heatbeatTimer;
if (timer != null && ! timer.isCancelled()) {
timer.cancel(true);
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
} finally {
heatbeatTimer =null;
}
}
心跳线程 HeartBeatTask 在超时时间之内,发送数据 在超时时间在外,是客户端的话,重连;是服务端,那么关闭 服务发布总结 直接从官方网站上扒了一个图过来,,好这个图显示的很清楚了。
服务注册的过程
前面,我们已经知道,基于 spring 这个解析入口,到发布服务的过程,接着基于 DubboProtocol 去发布,最终调用 Netty 的 api 创建了一个NettyServer。
RegistryProtocol
代码语言:javascript复制public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider -----------------------这里
final Registry registry = getRegistry(originInvoker);
//得到需要注册到zk上的协议地址 也就是dubbo://
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
//注册 这里=========================================
registry.register(registedProviderUrl);
// 订阅override数据
// FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保证每次export都返回一个新的exporter实例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}
final Registry registry = getRegistry(originInvoker);
getRegistry
代码语言:javascript复制/**
* 根据invoker的地址获取registry实例
* @param originInvoker
* @return
*/
private Registry getRegistry(final Invoker<?> originInvoker){
获得registry://192.168.11.156:2181 的协议地址
URL registryUrl = originInvoker.getUrl();
if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
//得到 zookeeper 的协议地址
String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
//registryUrl 就会变成了 zookeeper://192.168.11.156
registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
}
return registryFactory.getRegistry(registryUrl);
}
registryFactory.getRegistry 这段代码很明显了,通过前面这段代码的分析,其实就是把 registry 的协议头改成服务提供者配置的协议地址,也就是我们配置的 <dubbo:registry address=”zookeeper://192.168.11.156:2181”/>然后 registryFactory.getRegistry 的目的,就是通过协议地址匹配到对应的注册中心。那 registryFactory 是一个什么样的对象呢?,我们找一下这个代码的定义
代码语言:javascript复制private RegistryFactory registryFactory;
public void setRegistryFactory(RegistryFactory registryFactory) {
this.registryFactory = registryFactory;
}
RegistryFactory是一个扩展点
代码语言:javascript复制@SPI("dubbo")
public interface RegistryFactory {
/**
* 连接注册中心.
*
* 连接注册中心需处理契约:<br>
* 1. 当设置check=false时表示不检查连接,否则在连接不上时抛出异常。<br>
* 2. 支持URL上的username:password权限认证。<br>
* 3. 支持backup=10.20.153.10备选注册中心集群地址。<br>
* 4. 支持file=registry.cache本地磁盘文件缓存。<br>
* 5. 支持timeout=1000请求超时设置。<br>
* 6. 支持session=60000会话超时或过期设置。<br>
*
* @param url 注册中心地址,不允许为空
* @return 注册中心引用,总不返回空
*/
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
RegistryFactory$Adaptive 我们拿到这个动态生成的自适应扩展点,看看这段代码里面的实现
- 从 url 中拿到协议头信息,这个时候的协议头是 zookeeper://
- 通 过ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(“zookeeper”)去获得一个指定的扩展点,而这个扩展点的配置在dubbo-registry-zookeeper/resources/META-INF/dubbo/internal/com.alibaba.dubbo.registry.RegistryFactory。得 到一个 ZookeeperRegistryFactory
ZookeeperRegistryFactory
代码语言:javascript复制public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
这个方法中并没有 getRegistry 方法,而是在父类 AbstractRegistryFactory
- 从缓存 REGISTRIES 中,根据 key 获得对应的 Registry
- 如果不存在,则创建 Registry
AbstractRegistryFactory
代码语言:javascript复制public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString();
// 锁定注册中心获取过程,保证注册中心单一实例
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//这里======================
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// 释放锁
LOCK.unlock();
}
}
createRegistry
创建一个注册中心,这个是一个抽象方法,具体的实现在对应的子类实例中实现的,在 ZookeeperRegistryFactory 中
代码语言:javascript复制public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
getRegistry 得出了一个结论,根据当前注册中心的配置信息,获得一个匹配的注册中心,也就是 ZookeeperRegistry
registry.register(registedProviderUrl);
调用 registry.register 去讲 dubbo://的协议地址注册到zookeeper 上 这个方法会调用 FailbackRegistry 类中的 register. 为什么呢?因为ZookeeperRegistry 这个类中并没有 register 这个方法,但是他的父类FailbackRegistry中存在register方法,而这个类又重写了AbstractRegistry类中的 register 方法。所以我们可以直接定位大 FailbackRegistry 这个类中的 register 方法中 FailbackRegistry.register
- FailbackRegistry,从名字上来看,是一个失败重试机制
- 调用父类的 register 方法,讲当前 url 添加到缓存集合中
- 调用 doRegister 方法,这个方法很明显,是一个抽象方法,会由 ZookeeperRegistry 子类实现。
FailbackRegistry.register
代码语言:javascript复制@Override
public void register(URL url) {
super.register(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// 向服务器端发送注册请求 这里==================
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 如果开启了启动时检测,则直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if(skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " url " to registry " getUrl().getAddress() ", cause: " t.getMessage(), t);
} else {
logger.error("Failed to register " url ", waiting for retry, cause: " t.getMessage(), t);
}
// 将失败的注册请求记录到失败列表,定时重试
failedRegistered.add(url);
}
}
ZookeeperRegistry.doRegister
代码语言:javascript复制protected void doRegister(URL url) {
try {
//创建dubbo url
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " url " to zookeeper " getUrl() ", cause: " e.getMessage(), e);
}
}
终于找到你了,调用 zkclient.create 在 zookeeper 中创建一个节点。
RegistryProtocol.export 这个方法中后续的代码就不用再分析了。就是去对服务提供端去注册一个 zookeeper 监听,当监听发生变化的时候,服务端做相应的处理。
启动一个nettyserver
获得注册中心 在zookeeper上注册协议地址
源码分析之消费端初始化流程
代码语言:javascript复制ReferenceBean(afterPropertiesSet) ->getObject() ->get()->init()->createProxy 最终会获得一个代理对象。
ReferenceConfig的createProxy
代码语言:javascript复制@SuppressWarnings({ "unchecked", "rawtypes", "deprecation" })
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
if (isInjvm() == null) {
if (url != null && url.length() > 0) { //指定URL的情况下,不做本地引用
isJvmRefer = false;
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
//默认情况下如果本地有服务暴露,则引用本地服务.
isJvmRefer = true;
} else {
isJvmRefer = false;
}
} else {
isJvmRefer = isInjvm().booleanValue();
}
if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " interfaceClass.getName());
}
} else {
if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // 通过注册中心配置拼装URL
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference " interfaceName " on the consumer " NetUtils.getLocalHost() " use dubbo version " Version.getVersion() ", please config <dubbo:registry address="..." /> to your spring config.");
}
}
if (urls.size() == 1) {
//获得invoke代理对象 ===============这里
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最后一个registry url
}
}
if (registryURL != null) { // 有 注册中心协议的URL
// 对有注册中心的Cluster 只用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // 不是 注册中心的URL
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && ! invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " interfaceName ". No provider available for the service " (group == null ? "" : group "/") interfaceName (version == null ? "" : ":" version) " from the url " invoker.getUrl() " to the consumer " NetUtils.getLocalHost() " use dubbo version " Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " interfaceClass.getName() " from url " invoker.getUrl());
}
// 创建服务代理
return (T) proxyFactory.getProxy(invoker);
}
refprotocol.refer(interfaceClass, urls.get(0));
refprotocol这个对象,定义的代码如下,是一个自适应扩展点,得到的是Protocol$Adaptive。
代码语言:javascript复制 public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" url.toString() ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
这段代码中,根据当前的协议url,得到一个指定的扩展点,传递进来的参数中,协议地址为registry://,所以,我们可以直接定位到RegistryProtocol.refer代码
代码语言:javascript复制 @SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//url registry://192.168.2.11
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
//获得注册中心 zookeeperRegister
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0 ) {
if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
|| "*".equals( group ) ) {
return doRefer( getMergeableCluster(), registry, type, url );
}
}
//cluster=Cluster$Adaptive
return doRefer(cluster, registry, type, url);
}
doRefer方法中有一个参数是cluster,我们找到它的定义代码如下,。又是一个自动注入的扩展点。
代码语言:javascript复制从下面的代码可以看出,这个不仅仅是一个扩展点,而且方法层面上,还有一个@Adaptive,表示会动态生成一个自适应适配器Cluster$Adaptive
代码语言:javascript复制通过debug的方式,,获取到Cluster$Adaptive这个适配器,代码如下。我们知道cluster这个对象的实例以后,继续看doRefer方法;
注意:这里的Cluster$Adaptive也并不单纯,大家还记得在讲扩展点的时候有一个扩展点装饰器吗?如果这个扩展点存在一个构造函数,并且构造函数就是扩展接口本身,那么这个扩展点就会这个wrapper装饰,而Cluster被装饰的是:MockClusterWrapper
代码语言:javascript复制public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster {
public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("cluster", "failover");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" url.toString() ") use keys([cluster])");
com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
return extension.join(arg0);
}
}
代码语言:javascript复制这段代码中,有一个RegistryDirectory,可能看不懂,我们暂时先忽略,等会单独讲.(基于注册中心动态发现服务提供者)
1. 将consumer://协议地址注册到注册中心
2. 订阅zookeeper地址的变化
3. 调用cluster.join()方法
代码语言:javascript复制private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
"," Constants.CONFIGURATORS_CATEGORY
"," Constants.ROUTERS_CATEGORY));
return cluster.join(directory);
}
cluster.join
由前面的Cluster$Adaptive这个类中的join方法的分析,得知cluster.join会调用MockClusterWrapper.join方法, 然后再调用FailoverCluster.join方法。
MockClusterWrapper.join
这个意思很明显了。也就是我们上节课讲过的mock容错机制,如果出现异常情况,会调用MockClusterInvoker,否则,调用FailoverClusterInvoker.
代码语言:javascript复制public class MockClusterWrapper implements Cluster {
private Cluster cluster;
public MockClusterWrapper(Cluster cluster) {
this.cluster = cluster;
}
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory));
}
}
小结
refprotocol.ref,这个方法,会返回一个MockClusterInvoker(FailoverClusterInvoker)。这里面一定还有疑问,我们先把主线走完,再回过头看看什么是cluster、什么是directory
proxyFactory.getProxy(invoker);
再回到ReferenceConfig这个类,在createProxy方法的最后一行,调用proxyFactory.getProxy(invoker). 把前面生成的invoker对象作为参数,再通过proxyFactory工厂去获得一个代理对象。接下来我们分析下这段代码做了什么。
其实前面在分析服务发布的时候,基本分析过了,所以再看这段代码,应该会很熟悉
ProxyFactory, 会生成一个动态的自适应适配器。ProxyFactory$Adaptive,然后调用这个适配器中的getProxy方法,代码如下
代码语言:javascript复制public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" url.toString() ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
很显然,又是通过javassist实现的一个动态代理,我们来看看JavassistProxyFactory.getProxy
代码语言:javascript复制public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
在Proxy.getProxy这个类的如下代码中添加断点,在debug下可以看到动态字节码如下
代码语言:javascript复制public java.lang.String sayHello(java.lang.String arg0){
Object[] args = new Object[1];
args[0] = ($w)$1;
Object ret = handler.invoke(this, methods[0], args);
return (java.lang.String)ret;
}
handler在JavassistProxyFactory.getProxy中。传递的new InvokerInvocationHandler(invoker)
消费端调用过程流程图
消费端的调用过程
客户端什么时候建立和服务端的连接
前面我们通过代码分析到了,消费端的初始化过程,但是似乎没有看到客户端和服务端建立NIO连接。实际上,建立连接的过程在消费端初始化的时候就建立好的,只是前面我们没有分析,代码在RegistryProtocol.doRefer方法内的directory.subscribe方法中。
代码语言:javascript复制public void subscribe(URL url) {
setConsumerUrl(url);
//registry=ZookeeperRegistry consumer:// this:RegistryDirectory
registry.subscribe(url, this);
}
FailbackRegistry. subscribe
代码语言:javascript复制 @Override
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// 向服务器端发送订阅请求
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if (urls != null && urls.size() > 0) {
notify(url, listener, urls);
logger.error("Failed to subscribe " url ", Using cached list: " urls " from cache file: " getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") "/dubbo-registry-" url.getHost() ".cache") ", cause: " t.getMessage(), t);
} else {
// 如果开启了启动时检测,则直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if(skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " url ", cause: " t.getMessage(), t);
} else {
logger.error("Failed to subscribe " url ", waiting for retry, cause: " t.getMessage(), t);
}
}
// 将失败的订阅请求记录到失败列表,定时重试
addFailedSubscribed(url, listener);
}
}
zookeeperRegistry. doSubscribe
调用zookeeperRegistry执行真正的订阅操作,这段代码太长,我就不贴出来了,这里面主要做两个操作
- 对providers/routers/configurator三个节点进行创建和监听
- 调用notify(url,listener,urls) 将已经可用的列表进行通知
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
if (! anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && services.size() > 0) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<URL> urls = new ArrayList<URL>();
//循环地址
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
//watch
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//这里=============
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " url " to zookeeper " getUrl() ", cause: " e.getMessage(), e);
}
}
AbstractRegistry.notify
代码语言:javascript复制protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((urls == null || urls.size() == 0)
&& ! Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " url ", urls: " urls);
}
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}
//对provider、configuration、routers路径下进行notify 拿到地址缓存起来
//第一次调用 后续的watch机制通知
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
saveProperties(url);
listener.notify(categoryList);
}
}
RegistryDirectory:
- 整合多个Invoker
- 监听注册中心的变化 刷新本地的List