引言
在使用Dubbo的时候你一定会好奇它是怎么实现RPC的,而要了解它的调用过程,必然需要先了解其服务发布/订阅的过程,本篇将详细讨论Dubbo的发布过程。
源码分析
发布服务
新学Dubbo大都会比较疑惑,服务启动时的入口在哪?是如何加载我们的配置的?由于Dubbo是基于Spring的自定义扩展标签来实现配置的,而发布服务时我们需要配置dubbo:service标签,因此我们可以从这里入手。 首先我们需要知道Spring的自定义扩展标签由xsd后缀的文件及spring.schemas(自定义标签)、spring.handlers及DubboNamespaceHandler(NamespaceHandler注册标签的命名空间,这个文件和类类似SPI机制)、以及DubboBeanDefinitionParser(标签解析类)组成。我们可以在resources/META-INF路径下找到spring.handlers、dubbo.xsd以及spring.schemas,spring.handlers文件中可以看到DubboNamespaceHandler的位置,直接看这个类:
代码语言:javascript复制public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
}
}
主要逻辑在init方法,该方法又会调用registerBeanDefinitionParser方法注册一个个标签解析器,并通过DubboBeanDefinitionParser将配置解析到对应类的属性中,这里我们是分析服务发布的原理,因此直接找到service标签对应的类ServiceBean:
代码语言:javascript复制public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware
该类继承了ServiceConfig(实际的配置类)并实现了很多的接口,每个接口的作用如下:
- InitializingBean:实现了该接口的类会在bean初始化完成后调用afterPropertiesSet方法。
- DisposableBean:实现该接口的类会在bean销毁时调用destroy方法
- ApplicationContextAware:容器初始化完成后,会自动将applicationContext注入到该接口的子类
- ApplicationListener:容器初始化完成后会自动触发调用onApplicationEvent方法
- BeanNameAware:容器初始化完成后会调用setBeanName将容器中的唯一id告诉给bean本身
而服务的发布逻辑则主要是通过onApplicationEvent和afterPropertiesSet实现的。而具体使用哪一个方式来发布流程则是根据delay配置来决定的,该属性表示延迟发布服务的毫秒数,即是在ServiceBean初始化完成后就发布还是延迟相应时间后再发布,-1和null表示延迟到Spring容器启动完成后发布。在本版本中,默认是null,即等到Spring容器启动完成后发布服务(在后续版本中默认值改为0,即立即发布服务),所以直接看onApplicationEvent方法:
代码语言:javascript复制public void onApplicationEvent(ApplicationEvent event) {
if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
// 是否延迟发布&&是否已经发布&&是否已经取消发布
if (isDelay() && ! isExported() && ! isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " getInterface());
}
export();
}
}
}
主要是调用父类ServiceConfig的export方法发布服务:
代码语言:javascript复制public synchronized void export() {
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && ! export.booleanValue()) {
return;
}
if (delay != null && delay > 0) {
Thread thread = new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(delay);
} catch (Throwable e) {
}
doExport();
}
});
thread.setDaemon(true);
thread.setName("DelayExportServiceThread");
thread.start();
} else {
doExport();
}
}
在该方法中我们可以看到具体延迟多少毫秒发布服务是通过新建线程并睡眠相应的毫秒数实现的,如果没有配置延迟发布就直接调用doExport方法发布,而此方法中大部分逻辑都是在检验配置,关键点是调用的doExportUrls方法:
代码语言:javascript复制private void doExportUrls() {
// 从<dubbo:registry>配置中加载注册中心的地址
List<URL> registryURLs = loadRegistries(true);
// 多协议发布则会有多个protocol
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
这里主要是从配置中加载注册中心的地址,并循环调用doExportUrlsFor1Protocol方法处理多协议配置,该方法很长,从开头我们就可以看出若未配置协议,默认使用dubbo协议:
代码语言:javascript复制String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
接着大部分代码是获取服务提供者的主机IP以及组装配置信息,最关键的是服务发布的逻辑:
代码语言:javascript复制// 获取上下文配置
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath "/") path, map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(Constants.SCOPE_KEY);
//配置为none不暴露
if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
//配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " interfaceClass.getName() " to url " url);
}
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " interfaceClass.getName() " url " url " to registry " registryURL);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}
}
首先从url中获取到scop信息,即是否发布服务:none不发布、remote只发布远程服务、local只发布本地jvm服务、null表示既然发布远程又要发布本地服务。首先来看本地服务发布:
代码语言:javascript复制private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(NetUtils.LOCALHOST)
.setPort(0);
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " interfaceClass.getName() " to local registry");
}
}
本地服务是通过protocol.export发布的,这个protocol是通过下面的代码获取的:
代码语言:javascript复制private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
这个是上一篇文章的主要内容自适应扩展,所以这里是Protocol$Adpative对象,而在这个类中主要是通过协议类型获取相应的扩展类,那这里的协议是什么呢?在exportLocal方法中可以看到setProtocol(Constants.LOCAL_PROTOCOL),而LOCAL_PROTOCOL=injvm,所以这里的export最终会进入到InjvmProtocol.export方法中,但不仅仅是这么简单,在分析SPI源码时,在ExtensionLoader.createExtension方法中有这样一段代码:
代码语言:javascript复制// loadFile中会判断当前扩展类是否包含有参构造函数,有的话就就将其赋值给cachedWrapperClasses
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && wrapperClasses.size() > 0) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
就是使用当前扩展接口的装饰扩展类(指包含有参构造,且参数为当前的扩展接口类型)装饰当前扩展类,而Protocol包含两个装饰类:ProtocolFilterWrapper和ProtocolListenerWrapper。所以这里实际应该为ProtocolListenerWrapper(ProtocolFilterWrapper(InjvmProtocol)),这里不用细看,filter和listener是过滤器和监听器,InjvmProtocol就是发布到jvm中,供同一个jvm的消费者调用,重点还是在远程发布服务中。同样的,远程发布服务一样是调用了protocol.export方法:
代码语言:javascript复制Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
Exporter<?> exporter = protocol.export(invoker);
Invoker对象暂时忽略,这里就先简单的看作是代理对象,后面会详细分析。关键是这里应该调用哪一个扩展的export方法呢?根据刚才的分析我们需要看url协议是什么,而这里是的协议是registry,所以会进入到RegistryProtocol(注意也是被包装过的)的export方法中(我想现在你应该能体会到自适应扩展的妙用了):
代码语言:javascript复制public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 启动本地服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// 将远程服务url注册到zookeeper中
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
.......
}
这个方法就是暴露本地服务,并将服务信息注册到zookeeper,要了解Dubbo底层是如何通信的,就需要详细分析doLocalExport:
代码语言:javascript复制private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker){
// 拿到provider的url信息,如dubbo://.....
String key = getCacheKey(originInvoker);
// 已经暴露过的服务会缓存到bounds中
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return (ExporterChangeableWrapper<T>) exporter;
}
又是protocol.export,因为这里是dubbo协议,所以直接看DubboProtocol.export方法:
代码语言:javascript复制public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 这里是获取到发布的接口
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
......
// 启动服务
openServer(url);
return exporter;
}
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client 也可以暴露一个只有server可以调用的服务。
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
if (isServer) {
// 先从缓存中获取server,没有就创建并缓存
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
//server支持reset,配合override功能使用
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
//默认开启server关闭时发送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
//默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " str ", url: " url);
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " url ") " e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " str);
}
}
return server;
}
最终是通过Exchangers.bind创建的server,而该方法同样是通过调用相应扩展的bind方法来开启服务,而Exchanger只有一个扩展HeaderExchanger:
代码语言:javascript复制public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
public HeaderExchangeServer(Server server) {
if (server == null) {
throw new IllegalArgumentException("server == null");
}
this.server = server;
this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
startHeatbeatTimer();
}
初始化代码中只是启动相应的心跳检测,真正创建服务是通过Transporters.bind实现的:
代码语言:javascript复制public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
.......
return getTransporter().bind(url, handler);
}
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
这里首先是Transporter$Adpative对象不用在说了吧,而Transporter有三个扩展类,分别是netty、mina、grizzly,未配置默认使用的是netty方法作为底层通信:
代码语言:javascript复制public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
可以看到doLocalExport中原来做了这么多事,创建好服务后,接着才会将服务信息(url)注册到Zookeeper:
代码语言:javascript复制final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
这段代码流程很清晰,就是拿到一个Registry注册中心对象,然后注册服务,主要看看这里是如何实现的,首先是getRegistry:
代码语言:javascript复制public static final String REGISTRY_PROTOCOL = "registry";
private Registry getRegistry(final Invoker<?> originInvoker){
URL registryUrl = originInvoker.getUrl();
// 如果是registry://...开头的url,就将其替换为zookeeper://...
if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
}
return registryFactory.getRegistry(registryUrl);
}
可以看到是通过registryFactory对象获取具体的注册中心,那这个registryFactory是个啥?
代码语言:javascript复制private RegistryFactory registryFactory;
public void setRegistryFactory(RegistryFactory registryFactory) {
this.registryFactory = registryFactory;
}
这个应该就不陌生吧,就是通过injectExtension依赖注入注入的自适应扩展对象RegistryFactory$Adpative:
代码语言:javascript复制public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) {
if (arg0 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" url.toString() ") use keys([protocol])");
com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension(extName);
return extension.getRegistry(arg0);
}
同样的套路,也是基于url的协议获取相应的扩展,那按照这样的逻辑这里最终调用的应该是ZookeeperRegistryFactory.getRegistry方法,但是这个类根本没有这个方法啊。 别着急,我们可以看到这个类是继承了一个抽象类AbstractRegistryFactory,所以就是调用父类的这个方法:
代码语言:javascript复制public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString();
// 锁定注册中心获取过程,保证注册中心单一实例
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
// 这是一个模板方法,具体的逻辑是由子类实现
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// 释放锁
LOCK.unlock();
}
}
这样做的好处就是使用模板方法模式抽离公共的代码,而具体创建注册中心逻辑则是由子类自己实现:
代码语言:javascript复制public Registry createRegistry(URL url) {
// 这里就不再多说了,就是通过Zookeeper客户端创建连接,而Zookeeper客户端
// 有curator和zkClient两个,默认使用的是zkClient。
return new ZookeeperRegistry(url, zookeeperTransporter);
}
看到这里就明白了getRegistry方法最终返回的是ZookeeperRegistry对象,然后调用register方法注册服务,同样的也是采用模板方法模式实现,所以去父类FailbackRegistry中找:
代码语言:javascript复制public void register(URL url) {
// 参数校验
super.register(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// 向服务器端发送注册请求
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 如果开启了启动时检测,则直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if(skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " url " to registry " getUrl().getAddress() ", cause: " t.getMessage(), t);
} else {
logger.error("Failed to register " url ", waiting for retry, cause: " t.getMessage(), t);
}
// 将失败的注册请求记录到失败列表,定时重试
failedRegistered.add(url);
}
}
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " url " to zookeeper " getUrl() ", cause: " e.getMessage(), e);
}
}
最终就是通过zookeeper客户端创建一个节点就完成,至此,整个服务发布流程就结束了。但刚刚还漏掉了一个Invoker没有分析,接下来就详细看看它是个啥!
Invoker分析
Invoker是一个非常重要的模型,在服务端和客户端都会用到它,它的作用可以类比为JDK动态代理中的InvocationHandler并且存储了url、服务接口等信息,可以通过proxyFactory.getInvoker创建。经过前面的学习,我们很容易就能定位到JavassistProxyFactory的getInvoker方法中:
代码语言:javascript复制public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper类不能正确处理带$的类名
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
该方法就是创建一个AbstractProxyInvoker的匿名实现类,重写doInvoke并调用wrapper的invokeMethod方法,而doInvoke是在Invoker的invoke方法被调用时触发的,那么invokeMethod是啥?该方法在Wrapper中是一个抽象方法,具体的实现是通过getWrapper -> makeWrapper生成的,这里生成的细节就不详细分析了,主要看看生成的invokeMethod代码:
代码语言:javascript复制public Object invokeMethod(Object o,String n,Class[]p,Object[]v)throws java.lang.reflect.InvocationTargetException {
cn.dark.api.IDemoService w;
try{
w=((cn.dark.api.IDemoService)$1);
}catch(Throwable e) {
throw new IllegalArgumentException(e);
}
try{
if("sayHello".equals($2)&&$3.length==1){
return($w)w.sayHello((java.lang.String)$4[0]);
}
}catch(Throwable e){
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method "" $2 "" in class cn.dark.api.IDemoService.");
}
没啥好说的,就是去调用具体的服务接口,所以Invoker就相当于是一个代理对象,当客户端发起调用时,就会通过该类转发请求到具体的实现类去。