一、服务消费端应该做哪些事?
- 生成代理对象(帮我们实现通信细节)
- 建立通信连接(netty)
- 从zk获取服务提供者地址(订阅提供者)
- 负载均衡
- 容错
- 序列化
- ...
二、两个步骤
上面的逻辑可以大致分为两个步骤
- 服务启动阶段
构建通信连接,创建代理对象
- 远程调用阶段
远程调用服务提供者方法
三、服务启动阶段
Dubbo 服务引用的时机有两个:
- 第一个是在 Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务
- 第二个是在 ReferenceBean对应的服务被注入到其他类中时引用
这两个引用服务的时机区别在于,第一个是饿汉式的,第二个是懒汉式的。 默认情况下,Dubbo 使用懒汉式引用服务。如果需要使用饿汉式,可通过配置 dubbo:reference 的 init 属性开启。
通常我们在代码中通过如下方式使用dubbo服务:
代码语言:javascript复制@Reference
private ISayHelloService iSayHelloService;
ISayHelloService.hello();
可以通过注解@Reference
或者配置文件dubbo-consumer.xml
的方式,我这里用的是注解。
我们先来debug看一下,这个ISayHelloService
对象是个什么东西。
可以看到,ISayHelloService
是个代理对象,并且是个包装类,里面包了很多层。
ReferenceAnnotationBeanPostProcessor -> ReferenceBean -> InvokerHandler -> mockClusterInvoker -> RegistryDirectory
熟悉spring的同学应该知道,BeanPostProcessor
这样的类会在spring启动时执行,所以我们从这个类入手。
//ReferenceAnnotationBeanPostProcessor.java
private static class ReferenceBeanInvocationHandler implements InvocationHandler {
private final ReferenceBean referenceBean;
private Object bean;
private ReferenceBeanInvocationHandler(ReferenceBean referenceBean) {
this.referenceBean = referenceBean;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
return method.invoke(bean, args);
}
//初始化方法
private void init() {
this.bean = referenceBean.get();
}
}
复制代码
ReferenceBean.get()
继续追踪 referenceBean.get():
代码语言:javascript复制//ReferenceConfig.java
//获取服务引用
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
// 检测 ref 是否为空,为空则通过 init 方法创建
if (ref == null) {
// init 方法主要用于处理配置,以及调用 createProxy 生成代理类
init();
}
return ref;
}
复制代码
继续追踪 init():
代码语言:javascript复制//ReferenceConfig.java
private void init() {
if (initialized) {
return;
}
initialized = true;
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:reference interface="" /> interface not allow null!");
}
// get consumer's global configuration
checkDefault();
//填充ConsumerConfig
appendProperties(this);
//...省略
//组装URL
Map<String, String> map = new HashMap<String, String>();
Map<Object, Object> attributes = new HashMap<Object, Object>();
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
//...省略
//获取注册中心host
String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
if (hostToRegistry == null || hostToRegistry.length() == 0) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" Constants.DUBBO_IP_TO_REGISTRY ", value:" hostToRegistry);
}
map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
//attributes are stored by system context.
StaticContext.getSystemContext().putAll(attributes);
//重点!! 创建代理对象
ref = createProxy(map);
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}
复制代码
这里的代码非常多,我省略了一些,我们主要看大致流程。 首先是通过系统变量或 dubbo.properties 配置文件填充 ConsumerConfig 的字段。 然后收集各种配置,并将配置存储到 map,用以组装URL。 最后创建代理对象。
createProxy()
我们重点来看创建代理对象的方法:
代码语言:javascript复制//ReferenceConfig.java
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
//...省略
//本地引用
if (isJvmRefer) {
//...省略
} else {
//远程引用-直连
if (url != null && url.length() > 0) {
//...省略
} else { //远程引用-走注册中心
// 加载注册中心 url
List<URL> us = loadRegistries(false);
if (us != null && !us.isEmpty()) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 添加 refer 参数到 url 中,并将 url 添加到 urls 中
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " interfaceName " on the consumer " NetUtils.getLocalHost() " use dubbo version " Version.getVersion() ", please config <dubbo:registry address="..." /> to your spring config.");
}
}
// 单个注册中心或服务提供者(服务直连,下同)
if (urls.size() == 1) {
//自适应 -> wrapper(filter(RegisterProtocol)).refer
// 调用 RegistryProtocol 的 refer 构建 Invoker 实例
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
//...省略
}
}
//生成代理类
// create service proxy
return (T) proxyFactory.getProxy(invoker);
}
复制代码
这里的逻辑比较简单,对服务调用的方式进行逻辑处理,我们直接看最重要的refprotocol.refer(interfaceClass, urls.get(0))
方法。
这个方法的入参urls.get(0)
是注册中心的url,解析出来应该是类似于registry://registry-host/org.apache.dubbo.registry.RegistryService?refer=URL.encode("consumer://consumer-host/com.foo.FooService?version=1.0.0")
基于扩展点自适应机制,通过 URL 的 registry:// 协议头识别,就会调用 RegistryProtocol.refer()
方法,基于 refer 参数中的条件,查询提供者 URL,如: dubbo://service-host/com.foo.FooService?version=1.0.0
然后通过提供者 URL 的 dubbo:// 协议头识别,就会调用 DubboProtocol.refer()
方法,得到提供者引用。
RegistryProtocol.refer()
所以我们接着看refer()
方法:
//RegistryProtocol.java
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
//获取注册中心对象
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
//...省略
// 调用 doRefer 继续执行服务引用逻辑
return doRefer(cluster, registry, type, url);
}
复制代码
继续追踪 doRefer()
//RegistryProtocol.java
/**
*
* @param cluster
* @param registry 注册中心对象
* @param type
* @param url 注册中心url
* @param <T>
* @return
*/
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//获得 RegistryDirectory 对象,即服务目录
//服务目录类似于注册中心,管理生产者ip、port等,实际上是一个invoker集合
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry); //设置注册中心
directory.setProtocol(protocol); //设置服务提供者
// 创建订阅 URL 即 消费者url
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
// 向注册中心注册自己(服务消费者)
//把自己写入zk中的 /dubbo/com.foo.BarService/consumers 目录
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
// 订阅 providers、configurators、routers 等节点数据
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
"," Constants.CONFIGURATORS_CATEGORY
"," Constants.ROUTERS_CATEGORY));
// 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
//MockClusterWrapper(FailoverCluster)
Invoker invoker = cluster.join(directory);
// 向本地注册表,注册消费者
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
复制代码
这里主要做了以下事情:
- 连接注册中心
- 向注册中心注册自己(服务消费者)
- 订阅注册中心providers等节点
subscribe()
我们看最重要的directory.subscribe()
方法,即订阅:
//ZookeeperRegistry.java
/**
*
* @param url 消费者url consumer://
* @param listener RegistryDirectory
*/
@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
// interface = *,即订阅全局
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
//...省略
} else {
// interface = 特定接口,只有这个接口的子节点改变时,才触发回调
// 如:interface = com.lol.test.SayFacade
// Service 层下的所有 URL
List<URL> urls = new ArrayList<URL>();
//path是zk中生产者的目录:path -> /dubbo/com.foo.BarService/providers
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
// 获得 ChildListener 对象
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
// 服务提供者url变更时,调用 `#notify(...)` 方法,通过监听器回调
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
//children 是 子节点 -> 服务提供者
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
//urls 是真正的服务提供者url
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//第一次订阅,全量通知,创建对应invoker
notify(url, listener, urls);
}
}
}
复制代码
这里的大致逻辑就是:
- 通过zk获取对应service的服务提供者urls
- 然后通过
RegistryDirectory
对这些urls进行监听,如果有变动,则调用notify()
方法 - 第一次则对所有urls调用
notify()
方法
notify()
接下来我们重点看notify()
:
//AbstractRegistry.java
/**
*
* @param url 服务消费者url consumer://
* @param listener RegistryDirectory
* @param urls 服务提供者urls
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
//...省略
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
//categoryList 服务提供者url
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
saveProperties(url);
listener.notify(categoryList);
}
}
复制代码
继续追踪
代码语言:javascript复制//RegistryDirectory.java
/**
* 接收服务变更通知
* @param urls 服务提供者url集合
*/
@Override
public synchronized void notify(List<URL> urls) {
// 定义三个集合,分别用于存放服务提供者 url,路由 url,配置器 url
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
// 获取 category 参数
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
// 根据 category 参数将 url 分别放到不同的列表中
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
// 添加服务提供者 url
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " category " in notified url: " url " from registry " getUrl().getAddress() " to consumer " NetUtils.getLocalHost());
}
}
// 将 url 转成 Configurator
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls);
}
// 将 url 转成 Router
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
//重点!! 刷新 Invoker 列表
refreshInvoker(invokerUrls);
}
复制代码
这里我们只需要看最后一行代码refreshInvoker(invokerUrls)
,它的作用是刷新invoker。
该方法是保证RegistryDirectory
中的服务提供者集合methodInvokerMap
随注册中心变化而变化的关键。
refreshInvoker()
代码语言:javascript复制//RegistryDirectory.java
/**
*
* @param invokerUrls 服务提供者url
*/
private void refreshInvoker(List<URL> invokerUrls) {
// 如果invokerUrls 仅有一个元素,且 url 协议头为 empty,此时表示禁用所有服务
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
// 设置 forbidden 为 true
this.forbidden = true;
this.methodInvokerMap = null;
// 销毁所有 Invoker
destroyAllInvokers();
} else {
this.forbidden = false;
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap;
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
// 添加缓存 url 到 invokerUrls 中
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
// 缓存 invokerUrls
this.cachedInvokerUrls.addAll(invokerUrls);
}
if (invokerUrls.isEmpty()) {
return;
}
// 将 url 转成 Invoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
// 将 newUrlInvokerMap 转成方法名到 Invoker 列表的映射
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
// state change
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" invokerUrls.size() ", invoker.size :0. urls :" invokerUrls.toString()));
return;
}
// 合并多个组的 Invoker
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
// 销毁无用 Invoker
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
复制代码
这里的大致逻辑为:
- 根据协议头来判断是否禁用服务
- 将url转为invoker
- 销毁无用的 Invoker
首先当url 协议头为empty://
,此时表示禁用所有服务,会销毁所有Invoker。
然后把将url转为invoker,得到 <url, Invoker> 的映射关系。然后进一步进行转换,得到 <methodName, Invoker 列表> 映射关系。
之后进行多组 Invoker 合并操作,并将合并结果赋值给 methodInvokerMap。
最后销毁无用的 Invoker,避免服务消费者调用已下线的服务的服务
Invoker 是 Dubbo 的核心模型,代表一个可执行体。在服务提供方,Invoker 用于调用服务提供类。在服务消费方,Invoker 用于执行远程调用。
此时的重点就是url转为invoker的过程了,因为dubbo远程调用是通过invoker来进行的,所以在转变的过程中肯定有很多重要的内容。
toInvokers()
我们接着看toInvokers(invokerUrls)
:
//RegistryDirectory.java
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
// 获取服务消费端配置的协议
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
// 检测服务提供者协议是否被服务消费者所支持
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
// 若服务消费者协议头不被消费者所支持,则忽略当前 providerUrl
continue;
}
}
// 忽略 empty 协议
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
// 通过 SPI 检测服务端协议是否被消费端支持,不支持则抛出异常
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol..."));
continue;
}
// 合并 url
URL url = mergeUrl(providerUrl);
String key = url.toFullString();
if (keys.contains(key)) {
// 忽略重复 url
continue;
}
keys.add(key);
// 将本地 Invoker 缓存赋值给 localUrlInvokerMap
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
// 获取与 url 对应的 Invoker
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
// 缓存未命中
if (invoker == null) {
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
// 获取 disable 配置,取反,然后赋值给 enable 变量
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
// 获取 enable 配置,并赋值给 enable 变量
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
// 调用 refer 获取 Invoker
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface...");
}
if (invoker != null) {
// 缓存 Invoker 实例
newUrlInvokerMap.put(key, invoker);
}
// 缓存命中
} else {
// 将 invoker 存储到 newUrlInvokerMap 中
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
这里的逻辑很简单:
- 首先会对服务提供者 url 进行检测,若服务消费端的配置不支持服务端的协议,或服务端 url 协议头为 empty 时,toInvokers 均会忽略服务提供方 url。
- 合并 url,然后访问缓存,尝试获取与 url 对应的 invoker。
- 如果缓存命中,直接将 Invoker 存入 newUrlInvokerMap 中。
- 如果未命中,则需新建 Invoker。
DubboProtocol.refer()
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
这段代码就是创建invoker的代码,这里通过自适应拿到的是DubboProtocol
, 所以我们深入看一下DubboProtocol.refer()
:
//DubboProtocol.java
//客户端实例
private final ExchangeClient[] clients;
private final AtomicPositiveInteger index = new AtomicPositiveInteger();
private final String version;
private final ReentrantLock destroyLock = new ReentrantLock();
private final Set<Invoker<?>> invokers;
/**
* url 服务提供者url
*/
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// 创建 DubboInvoker
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
复制代码
这里的代码很简单,只是new了一个Invoker。 重点是对DubboInvoker
的属性进行填充。
我们先来看下ExchangeClient[] clients
属性,它是客户端实例集合,通过getClients(url)
方法获取。
ExchangeClient 实际上并不具备通信能力,它需要基于更底层的客户端实例进行通信。比如 NettyClient、MinaClient 等,默认情况下,Dubbo 使用 NettyClient 进行通信。接下来,我们简单看一下 getClients
方法的逻辑。
getClients()
代码语言:javascript复制//DubboProtocol.java
private ExchangeClient[] getClients(URL url) {
// 是否共享连接
boolean service_share_connect = false;
// 获取连接数,默认为0,表示未配置
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// 如果未配置 connections,则共享连接
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i ) {
if (service_share_connect) {
// 获取共享客户端
clients[i] = getSharedClient(url);
} else {
// 初始化新的客户端
clients[i] = initClient(url);
}
}
return clients;
}
这里根据 connections
数量决定是获取共享客户端还是创建新的客户端实例,默认情况下,使用共享客户端实例。getSharedClient
方法中也会调用 initClient
方法,因此下面我们一起看一下这两个方法。
//DubboProtocol.java
private ExchangeClient getSharedClient(URL url) {
String key = url.getAddress();
// 获取带有“引用计数”功能的 ExchangeClient
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if (client != null) {
if (!client.isClosed()) {
// 增加引用计数
client.incrementAndGetCount();
return client;
} else {
referenceClientMap.remove(key);
}
}
locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) {
if (referenceClientMap.containsKey(key)) {
return referenceClientMap.get(key);
}
// 创建 ExchangeClient 客户端
ExchangeClient exchangeClient = initClient(url);
// 将 ExchangeClient 实例传给 ReferenceCountExchangeClient,这里使用了装饰模式
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
locks.remove(key);
return client;
}
}
复制代码
这里先尝试获取有引用计数功能的共享实例,如果获取不到,则通过initClient(url)
新创建一个。
initClient()
代码语言:javascript复制//DubboProtocol.java
private ExchangeClient initClient(URL url) {
// 获取客户端类型,默认为 netty4
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
// 添加编解码和心跳包参数到 url 中
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 检测客户端类型是否存在,不存在则抛出异常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: ...");
}
ExchangeClient client;
try {
// 获取 lazy 配置,并根据配置值决定创建的客户端类型
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
// 创建懒加载 ExchangeClient 实例
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 创建普通 ExchangeClient 实例
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service...");
}
return client;
}
复制代码
initClient
方法首先获取用户配置的客户端类型,默认为 netty4。然后检测用户配置的客户端类型是否存在,不存在则抛出异常。最后根据 lazy 配置决定创建什么类型的客户端。
接下来看创建client的方法Exchangers.connect(url, requestHandler)
//Exchangers.java
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 获取 Exchanger 实例,默认为 HeaderExchangeClient
return getExchanger(url).connect(url, handler);
}
复制代码
这里通过自适应,调用HeaderExchanger.connect()
//HeaderExchanger.java
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 这里包含了多个调用,分别如下:
// 1. 创建 HeaderExchangeHandler 对象
// 2. 创建 DecodeHandler 对象
// 3. 通过 Transporters 构建 Client 实例
// 4. 创建 HeaderExchangeClient 对象
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
复制代码
这里的调用比较多,我们这里重点看一下 Transporters 的 connect 方法。如下:
代码语言:javascript复制//Transporters.java
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handler 数量大于1,则创建一个 ChannelHandler 分发器
handler = new ChannelHandlerDispatcher(handlers);
}
// 获取 Transporter 自适应拓展类,并调用 connect 方法生成 Client 实例
return getTransporter().connect(url, handler);
}
复制代码
getTransporter()
方法返回的是自适应拓展类,该类会在运行时根据客户端类型加载指定的 Transporter 实现类。若用户未配置客户端类型,则默认加载 NettyTransporter,并调用该类的 connect 方法。如下:
connect()
代码语言:javascript复制//NettyTransporter.java
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
// 创建 NettyClient 对象
return new NettyClient(url, listener);
}
复制代码
到这里就不继续跟下去了,在往下就是通过 Netty 提供的 API 构建 Netty 客户端了,大家有兴趣可以自己看看。
createProxy()
到这里,服务提供者的Invoker已经创建好了,接下来就是创建代理对象。
也就是ReferenceConfig.createProxy()
方法,这里就不多赘述了。
总结
代理对象创建完毕后,dubbo服务引用的服务启动阶段已经完成了,再来回顾下我们都做了哪些事:
- 连接注册中心
- 向注册中心注册自己(服务消费者)
- 订阅注册中心服务提供者,并获取服务提供者url
- 创建Invoker
- 建立通信,连接netty
- 创建代理对象
服务启动之后,接下来就是远程调用阶段,我们将在下篇文章中详细分析。
最后,简单的画个流程图,方便理解。
参考: Dubbo官网