Dubbo源码学习五-服务消费者发现
首先思考服务消费者发现
从类图中,我们可以看到ReferenceBean继承了Referenceconfig,同时实现了FactoryBean、DisposableBean、ApplicationContextAware、InitializingBean,因此可以看到里面会有相应的aware方法、相应的destroy方法、 AfterPropertiesSet方法,这里我们重点关注AfterPropertiesSet方法。因为其是spring留给我们进行扩展的一个通道。
ReferenceBean#afterPropertiesSet()
代码语言:javascript复制public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean,
ApplicationContextAware, InitializingBean, DisposableBean {
/**================重点分析 getObject重点分析====================**/
@Override
@SuppressWarnings({"unchecked"})
public void afterPropertiesSet() throws Exception {
// Initializes Dubbo's Config Beans before @Reference bean autowiring
//初始化dubbo的配置Bean之前@Reference bean注入
prepareDubboConfigBeans();
// lazy init by default.
//如果init表示是否采用饿汉式进行加载,如果设置为false,进行表示采用懒汉式进行初始化
if (init == null) {
init = false;
}
// eager init if necessary.
//饿汉式加载方式
if (shouldInit()) {
getObject();
}
}
一种是使用服务直连的方式引用服务,一种是基于注册中心进行引用。从服务中心获取服务配置,经历invoker创建、代理类创建等步骤 服务引用时机:
1.在Spring容器调用ReferenceBean的afterPropertiesSet方法时引用服务 2.在RefrenceBean对应的服务被注入到其他类中时引用。
区别:
第一种是采用饿汉式的,第二种是采用懒汉式的,默认情况下,Dubbo采用懒汉式引用服务。 如果需要使用饿汉式,可通过配置的init属性开启。 整个过程从Reference的getObject()方法开始
ReferenceBean#getObject()
代码语言:javascript复制//获取bean对象信息
@Override
public Object getObject() {
return get();
}
ReferenceConfig#get()
代码语言:javascript复制//同步方法
//进行初始化
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" url ") has already destroyed!");
}
//检查ref是否为空,如果为空,则进行初始化
if (ref == null) {
//进行初始化
init();
}
return ref;
}
ReferenceConfig#init()
进行初始化,首先进行校验,对是否已经初始化、boostrap是否为空进行校验,同时检查配置信息,本地存根local和stub。检查接口是否为泛型。加载配置信息,将其放入到map中。最终将配置信息放入到serviceMetadata中。创建代理。构建ConsumerModel,进行初始化。
代码语言:javascript复制 //进行初始化
public synchronized void init() {
//如果已经初始化,则直接返回
if (initialized) {
return;
}
//服务器是否为空,如果为空,则获取dubbo服务器实例
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
//进行服务器初始化话
bootstrap.init();
}
//检查配置
checkAndUpdateSubConfigs();
//检查本地存根 local与stub
checkStubAndLocal(interfaceClass);
ConfigValidationUtils.checkMock(interfaceClass, this);
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, CONSUMER_SIDE);
//追加运行时参数信息
ReferenceConfigBase.appendRuntimeParameters(map);
//检查是否为泛型接口
if (!ProtocolUtils.isGeneric(generic)) {
//获取版本信息
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
//获取接口方法列表,添加到map中
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
}
}
//通过class加载配置信息
map.put(INTERFACE_KEY, interfaceName);
AbstractConfig.appendParameters(map, getMetrics());
AbstractConfig.appendParameters(map, getApplication());
AbstractConfig.appendParameters(map, getModule());
// remove 'default.' prefix for configs from ConsumerConfig
// appendParameters(map, consumer, Constants.DEFAULT_KEY);
AbstractConfig.appendParameters(map, consumer);
AbstractConfig.appendParameters(map, this);
MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
//将元数据配置信息放入到map中
if (metadataReportConfig != null && metadataReportConfig.isValid()) {
map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
}
Map<String, AsyncMethodInfo> attributes = null;
if (CollectionUtils.isNotEmpty(getMethods())) {
attributes = new HashMap<>();
//遍历MethodConfig列表
for (MethodConfig methodConfig : getMethods()) {
AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
String retryKey = methodConfig.getName() ".retry";
//检测map是否包含methodName.retry
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
//添加重试次数配置methodName.retries
map.put(methodConfig.getName() ".retries", "0");
}
}
AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
if (asyncMethodInfo != null) {
// consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
attributes.put(methodConfig.getName(), asyncMethodInfo);
}
}
}
//获取服务消费者ip地址
String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
if (StringUtils.isEmpty(hostToRegistry)) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" DUBBO_IP_TO_REGISTRY ", value:" hostToRegistry);
}
map.put(REGISTER_IP_KEY, hostToRegistry);
//存储配置信息到serviceMetadata中
serviceMetadata.getAttachments().putAll(map);
/**=================创建代理 重要===================================**/
ref = createProxy(map);
serviceMetadata.setTarget(ref);
serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
//根据服务名,ReferenceConfig,代理类构建ConsumerModel,并将ConsumerModel存入到ApplicationModel中
ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
consumerModel.setProxyObject(ref);
consumerModel.init(attributes);
initialized = true;
// dispatch a ReferenceConfigInitializedEvent since 2.7.4
dispatch(new ReferenceConfigInitializedEvent(this, invoker));
}
将相关配置信息准备好之后,都会放到ServiceMetadata中,然后进行proxy的创建,创建代理之后,将ref引用信息设置目标对象,同时放入到serviceMetada中,根据服务名,ReferenceConfig,代理类构建ConsumerModel,并将ConsumerModel存入到ApplicationModel中。对引用配置初始化事件进行分发。下面关注怎么创建代理的。
ReferenceConfig#createProxy()
首先根据配置检查是否为本地调用,如果是,则调用InjvmProtocol的refer方法生产InjvmInvoker实例。如果不是,则读取直连配置项,或注册中心,并将读取到的url存储到urls中。然后根据urls元素数量进行后续操作。若urls元素数量为1时,则直接通过Protocol自适应扩展类构建Invoker实例接口。若urls元素数量大于1,即存在多个注册中心或服务直连url,此时先根据url构建invoker,然后再通过Cluster合并多个Invoker,最后调用ProxyFactory生产代理类。Invoker的构建过程过程以及代理类的过程比较重要!
创建代理里面重要的两步:REF_PROTOCOL.refer(interfaceClass,url)、PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic))
创建代理的三种情况:jvm本地服务(生成本地引用url,协议为injvm)、直连服务、远程服务(带注册中心)
创建代理的过程getProxy:首先准备好配置信息,然后对其进行组装,然后采用模板生成代理信息
代码语言:javascript复制/**=============================创建代理 1.REF_PROTOCOL.refer(interfaceClass, url) 2.createProxy(Map<String, String> map)=============================**/
//创建代理:jvm本地服务,直连服务,远程服务
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
//jvm引用 本地服务引用,协议为injvm
if (shouldJvmRefer(map)) {
//生成本地引用url,协议为injvm
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
//获取invoker 构建InjvmInvoker实例
/**====================获取 invoker 重要=====本地refer操作==================**/
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " interfaceClass.getName());
}
//远程引用
} else {
urls.clear();
//url不为空,表明用户可能想进行点对点调用
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
//当需要配置多个url时,可用分号进行分割,这里会进行切分
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
//设置接口名称url路径
url = url.setPath(interfaceName);
}
if (UrlUtils.isRegistry(url)) {
//将map转换为查询字符串,并作为refer参数的值添加到url中
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
// if protocols not injvm checkRegistry
//注册中心引用远程服务
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
//检查注册中心
checkRegistry();
//加载注册中心url
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
//如果注册中心url不为空,则对注册中心url进行遍历,加载监控url
//如果监控url不为空,对其进行编码,将其放入到map中
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
//如果为空,则说明没有配置注册中心,则抛异常
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " interfaceName " on the consumer " NetUtils.getLocalHost() " use dubbo version " Version.getVersion() ", please config <dubbo:registry address="..." /> to your spring config.");
}
}
}
//如果url长度为1,则取第一个
if (urls.size() == 1) {
//调用RegistryProtocol
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
//多个注册中心或多个服务提供者,或者两者混合
} else {
//否者,对urls进行遍历,将信息添加到invoker中
//通过refprotocol调用refer构建Invoker,refprotocol会在运行时
//根据url协议头加载指定的Protocol实例,并调用实例的refer法
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
/**==================进行refer操作,在进行invoker添加 重要 =============**/
//多个注册中心或者多个服务提供者,或者两者混合
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
registryURL = url; // use last registry url
}
}
//如果注册中心url不为空,则注册中心可用
if (registryURL != null) { // registry url is available
// for multi-subscription scenario, use 'zone-aware' policy by default
//对于多订阅方案,默认情况下使用“区域感知”策略
//注册中心不为空,则将使用AvailableCluster
URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
// The invoker wrap relation would be like: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
//invoker包装关系类似:
//区域感知ClusterInvoker(静态目录)->故障转移群集Invoker(注册目录,路由在此处发生)-> Invoker
//创建StaticDirectory实例,并由Cluster对多个Invoker进行合并
//StaticDirectory:将传入的Invoekr列表封装成静态的Directory对象,里面的列表不会改变
/**=======================join方法 需要研究===================**/
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
//如果没有注册中心url,直接invoker
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
}
//不可用时
if (shouldCheck() && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service "
interfaceName
". No provider available for the service "
(group == null ? "" : group "/")
interfaceName
(version == null ? "" : ":" version)
" from the url "
invoker.getUrl()
" to the consumer "
NetUtils.getLocalHost() " use dubbo version " Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " interfaceClass.getName() " from url " invoker.getUrl());
}
/**
* @since 2.7.0
* ServiceData Store
*/
//存储服务数据
String metadata = map.get(METADATA_KEY);
WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
if (metadataService != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataService.publishServiceDefinition(consumerURL);
}
// create service proxy
/**=================创建服务代理 重要 创建代理的过程:首先准备好配置信息,然后对其进行组装,然后采用模板生成代理信息 -==================**/
//创建服务代理
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
protocol#refer(Class type, URL url)
代码语言:javascript复制//SPI扩展
@SPI("dubbo")
public interface Protocol {
//引用服务 这里分析常用的RegisterProtool、AbstractProtocol->dubboProtocol
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
}
RegistryProtocol#refer(Class type, URL url)
代码语言:javascript复制 //进行引用
//首先获取注册中心url信息,然后拿到注册中心实例,匹配注册中心服务类型,如果匹配,进行getInvoker,也即代理信息的生成
//接着将url查询字符串转成map,同时获取group配置,通过SPI加载MergeableCluster实例,并调用doRefer继续执行服务引用逻辑
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//拿到注册中心url信息
url = getRegistryUrl(url);
//拿到注册中心实例
Registry registry = registryFactory.getRegistry(url);
//注册服务的类型如果匹配,也即表示协议匹配,则进行getInvoker操作
if (RegistryService.class.equals(type)) {
//获取invoker,对invoker进行包装,同时生成代理方法,这里的方式类似服务提供者
return proxyFactory.getInvoker((T) registry, type, url);
}
//将url查询字符串转为Map
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
//获取group配置
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
//通过SPI加载MergeableCluster实例,并调用doRefer继续执行服务引用逻辑
/**================重要 ===================**/
return doRefer(getMergeableCluster(), registry, type, url);
}
}
//进行doRefer操作 继续执行服务引用逻辑
return doRefer(cluster, registry, type, url);
}
RegistryFactory#getRegistry(URL url)
代码语言:javascript复制//SPI 默认dubbo
@SPI("dubbo")
public interface RegistryFactory {
//默认dubboProtocol协议,进入AbstractRegistryFactory
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
AbstractRegistryFactory#getRegistry(URL url)
代码语言:javascript复制/**-=================zookeeper注册中心 ==================**/
//进行zookeeper注册中心进行分析
@Override
public Registry getRegistry(URL url) {
//如果已经进行销毁,则进行警告,说服务已经被销毁,返回
if (destroyed.get()) {
LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. "
"Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
return DEFAULT_NOP_REGISTRY;
}
//url信息构建
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
//创建注册中心缓存key
String key = createRegistryCacheKey(url);
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
//访问缓存
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
//缓存未命中,创建Registry实例 重要
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " url);
}
//写入缓存中
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
RegistryProtocol#doRefer
代码语言:javascript复制 //进行Refer操作,此时涉及到Cluster与directory
/**
* doRefer方法创建一个RegistryDirectory实例,然后生成服务者消费者连接,并向注册中心进行注册。注册完毕后,紧接着订阅providers、configurators、roters
* 等节点下的数据。完成订阅后,RegistryDirectory会受到这几个节点下的子节点信息。由于一个服务可能部署在多台服务器上,这样就会在providers产生多个节点,
* 这个时候就需要Cluster将多个服务节点合并为一个,并生成一个invoker。
*/
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//创建RegistryDirectory实例
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
//设置注册中心和协议
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
//生成服务消费者连接
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
//如果需要注册,则进行设置注册消费者url
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
//将信息注册到注册中心
//需要注册才能支持:
// 1.URL设置check = false参数时。 注册失败时,不会在后台引发异常并重试异常。 否则,将引发异常。
// 2.URL设置dynamic = false参数时,它必须永久存储,否则,当注册者的出口异常退出时,应将其自动删除。
// 3.当URL设置category = routers时,表示分类存储,默认分类为提供者,并且可以通过分类部分通知数据。
// 4.重新启动注册表时,网络抖动,数据不会丢失,包括从虚线自动删除数据。
// 5.允许具有相同URL但参数不同的URL共存,它们不能互相覆盖。
registry.register(directory.getRegisteredConsumerUrl());
}
/**====================构建 Router路由 =============**/
//构建路由
directory.buildRouterChain(subscribeUrl);
//进行订阅
//subscribe订阅信息消费url、通知监听、配置监听、订阅url
//toSubscribeUrl:订阅信息:category、providers、configurators、routers
directory.subscribe(toSubscribeUrl(subscribeUrl));
//一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并成一个
/**=====================重要 cluster.join(directory) ==========================**/
Invoker<T> invoker = cluster.join(directory);
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
//进行监听
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, registryInvokerWrapper);
}
//返回注册中心Invoker保证
return registryInvokerWrapper;
}
AbstractProcotol#refer#protocolBindingRefer
代码语言:javascript复制/**==========================重要 protocolBindingRefer(type, url)=================================**/
//获取引用
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException;
DubboProtocol#protocolBindingRefer
代码语言:javascript复制//dubbo协议绑定引用
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
//进行序列化,多种序列化方式
optimizeSerialization(url);
// create rpc invoker.
//创建rpc invoker
/**======================关注getClients(url) 重要 **=================================**/
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
//添加invoker
invokers.add(invoker);
return invoker;
}
DubboProtocol#getClients
代码语言:javascript复制//获取客户端:分两种情况,一种是共享客户端,一种是创建新的客户端
private ExchangeClient[] getClients(URL url) {
// whether to share connection
//是否共享连接
boolean useShareConnect = false;
//获取连接数,默认是0.表示未配置
int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
// if not configured, connection is shared, otherwise, one connection for one service
//如果不配置,连接是被分享的,否者,一个服务提供一个连接
if (connections == 0) {
useShareConnect = true;
/*
* The xml configuration should have a higher priority than properties.
*/
//xml配置应该优先于 配置
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
//获取共享客户端
/**======================重要 getSharedClient(url, connections)=====================**/
shareClients = getSharedClient(url, connections);
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i ) {
if (useShareConnect) {
//获取共享客户端
clients[i] = shareClients.get(i);
} else {
//初始化新的客户端
clients[i] = initClient(url);
}
}
return clients;
}
DubboProtocol#getSharedClient(URL url, int connectNum)
代码语言:javascript复制/**
* Get shared connection
*
* @param url
* @param connectNum connectNum must be greater than or equal to 1
*/
//获取共享连接
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
//获取地址
String key = url.getAddress();
//获取客户端
//ReferenceCountExchangeClient的构造函数:客户端、引用计数、url
List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
//检查客户端是否可以用,如果可用的话,则进行计数,同时返回客户端
if (checkClientCanUse(clients)) {
batchClientRefIncr(clients);
return clients;
}
locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) {
clients = referenceClientMap.get(key);
// dubbo check
if (checkClientCanUse(clients)) {
batchClientRefIncr(clients);
return clients;
}
// connectNum must be greater than or equal to 1
//连接数必须>=1,如果小于1,默认1个
connectNum = Math.max(connectNum, 1);
// If the clients is empty, then the first initialization is
//如果客户端为空,则首先进行初始化
if (CollectionUtils.isEmpty(clients)) {
//创建客户端
clients = buildReferenceCountExchangeClientList(url, connectNum);
//放到map中
referenceClientMap.put(key, clients);
} else {
//否者
for (int i = 0; i < clients.size(); i ) {
ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
// If there is a client in the list that is no longer available, create a new one to replace him.
/**==如果客户端list不可用,则创建一个新的代替它,buildReferenceCountExchangeClient(url)==**/
if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
clients.set(i, buildReferenceCountExchangeClient(url));
continue;
}
//进行引用计数
referenceCountExchangeClient.incrementAndGetCount();
}
}
/*
* I understand that the purpose of the remove operation here is to avoid the expired url key
* always occupying this memory space.
*/
locks.remove(key);
return clients;
}
}
DubboProtocol#buildReferenceCountExchangeClient
代码语言:javascript复制//创建单个客户端
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
//初始化客户端
ExchangeClient exchangeClient = initClient(url);
return new ReferenceCountExchangeClient(exchangeClient);
}
DubboProtocol#initClient
代码语言:javascript复制//创建新的连接
private ExchangeClient initClient(URL url) {
// client type setting.
//获取客户端类型,默认为netty
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
//添加编解码和心跳包参数到url中
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
//自从它有服务性能问题之后BIO不被允许
//检测客户端类型是否存在,不存在则抛出异常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " str ","
" supported client type is " StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// connection should be lazy
//获取lazy配置,并根据配置值决定创建的客户端类型
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
//创建懒加载ExchangeClient实例
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
//创建普通ExchangeClient实例
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" url "): " e.getMessage(), e);
}
return client;
}
在前面我们的服务提供者开启服务的时候,是采用的bind方法,而服务消费者则采用的是connect方法。
Exchangers#connect
代码语言:javascript复制//进行connect,url和handler不能为空,同时将codec的编码信息放入到url中
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler);
}
Exchanger#connect
代码语言:javascript复制@SPI(HeaderExchanger.NAME)
public interface Exchanger {
//进行连接的过程中包含多个调用
//1.创建HeaderExchangeHandler对象
//2.创建DecodeHandler对象
//3.通过Transporters创建client实例
//4.创建HeaderExchangeClient对象
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}
HeaderExchanger#connect
代码语言:javascript复制//进行连接操作,调用Transporters的connect方法
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
HeaderExchanger#connect
代码语言:javascript复制/**================重要=====================**/
//校验url和handlers不为空,如果handler为空,则创建一个通道handler适配器
//如果handler长度为1,则取第一个,如果大于1,创建一个 ChannelHandler分发器
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
//获取Transporter自适应扩展类,并调用connect方法生成客户端实例
return getTransporter().connect(url, handler);
}
Transporter#connect
代码语言:javascript复制@SPI("netty")
public interface Transporter {
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;
}
NettyTransporter#connect
代码语言:javascript复制//Netty客户端
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
NettyClient
代码语言:javascript复制//Netty客户端
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
AbstractNettyClient
代码语言:javascript复制public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
//抽象Endpoint:url、channelHandler、codec、timeout、connectTimeout
super(url, handler);
//是否需要重连接
needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
//初始化Executor
initExecutor(url);
try {
//打开NettyClient
doOpen();
} catch (Throwable t) {
//出现异常,则进行关闭
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " getClass().getSimpleName() " " NetUtils.getLocalAddress()
" connect to the server " getRemoteAddress() ", cause: " t.getMessage(), t);
}
try {
// connect.
//进行连接
connect();
if (logger.isInfoEnabled()) {
logger.info("Start " getClass().getSimpleName() " " NetUtils.getLocalAddress() " connect to the server " getRemoteAddress());
}
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else {
logger.warn("Failed to start " getClass().getSimpleName() " " NetUtils.getLocalAddress()
" connect to the server " getRemoteAddress() " (check == false, ignore and retry later!), cause: " t.getMessage(), t);
}
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " getClass().getSimpleName() " " NetUtils.getLocalAddress()
" connect to the server " getRemoteAddress() ", cause: " t.getMessage(), t);
}
}
进行服务开启doOepn,连接doConnect以及关闭操作close
NettyClient#doOpen
代码语言:javascript复制//打开NettyClient
@Override
protected void doOpen() throws Throwable {
//创建bootstrap,包含keepAlive、tcpNoDelay、connectTimeoutMills、设置Pipeline工厂,获取pipeline
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(CHANNEL_FACTORY);
// config
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getConnectTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}
NettyClient#doConnect
代码语言:javascript复制//进行连接
@Override
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
//进行连接操作
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
if (ret && future.isSuccess()) {
Channel newChannel = future.getChannel();
newChannel.setInterestOps(Channel.OP_READ_WRITE);
try {
// Close old channel
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " oldChannel " on create new netty channel " newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " newChannel ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
}
}
} else if (future.getCause() != null) {
throw new RemotingException(this, "client(url: " getUrl() ") failed to connect to server "
getRemoteAddress() ", error message is:" future.getCause().getMessage(), future.getCause());
} else {
throw new RemotingException(this, "client(url: " getUrl() ") failed to connect to server "
getRemoteAddress() " client-side timeout "
getConnectTimeout() "ms (elapsed: " (System.currentTimeMillis() - start) "ms) from netty client "
NetUtils.getLocalHost() " using dubbo version " Version.getVersion());
}
} finally {
if (!isConnected()) {
future.cancel();
}
}
}
AbstractNettyClient#close
代码语言:javascript复制@Override
public void close() {
try {
super.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
if (executor != null) {
ExecutorUtil.shutdownNow(executor, 100);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
//断开连接
disconnect();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
//进行关闭
doClose();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
进行优雅关闭
代码语言:javascript复制//进行优雅关闭
@Override
public void close(int timeout) {
ExecutorUtil.gracefulShutdown(executor, timeout);
close();
}
/**
* Use the shutdown pattern from:
* https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
*
* @param executor the Executor to shutdown
* @param timeout the timeout in milliseconds before termination
*/
//进行优雅关闭
public static void gracefulShutdown(Executor executor, int timeout) {
if (!(executor instanceof ExecutorService) || isTerminated(executor)) {
return;
}
final ExecutorService es = (ExecutorService) executor;
try {
// Disable new tasks from being submitted
es.shutdown();
} catch (SecurityException ex2) {
return;
} catch (NullPointerException ex2) {
return;
}
try {
// Wait a while for existing tasks to terminate
if (!es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
es.shutdownNow();
}
} catch (InterruptedException ex) {
es.shutdownNow();
Thread.currentThread().interrupt();
}
if (!isTerminated(es)) {
newThreadToCloseExecutor(es);
}
}
下面总结一下
首先我们通过标签找到服务提供者的bean,也即ReferenceBean,看到AfterPropertiesSet方法,从而定位getObject方法,从而看到init()初始化方法,里面有我们需要看到的方法,在创建代理之前,会先将配置信息准备好,放入到serviceMetadata中,创建代理createProxy方法,创建好之后,代理类构建代理ConsumerModel,并将ConsumerModel存入到ApplicationModel中,进行初始化,对引用配置初始化事件进行分发。
而这里最重要的是创建代理createProxy方法:
创建代理里面重要的两个方法:REF_PROTOCOL.refer(interfaceClass,url)、PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic))
而REF_PROTOCOL.refer(interface,url)中会进行三种服务的引用:本地服务(生成本地引用url,协议为injvm)、直连服务、远程服务(带注册中心)。常用的RegisterProtool#refer、AbstractProtocol->dubboProtocol#refer。
RegisterProtocol:首先获取注册中心url信息,然后拿到注册中心实例,匹配注册中心服务类型,如果匹配,进行getInvoker,也即代理信息的生成。接着将url查询字符串转成map,同时获取group配置,通过SPI加载MergeableCluster实例,并调用doRefer继续执行服务引用逻辑。而dorefer中方法创建一个RegistryDirectory实例,然后生成服务者消费者连接,并向注册中心进行注册。注册完毕后,紧接着订阅providers、configurators、roters等节点下的数据。完成订阅后,RegistryDirectory会受到这几个节点下的子节点信息。由于一个服务可能部署在多台服务器上,这样就会在providers产生多个节点,这个时候就需要Cluster将多个服务节点合并为一个,并生成一个invoker。这里涉及到Directory和Router。
AbstractRegistryProtocol:这里采用DubboProtocol看源码。首先进行协议适配,将协议绑定到引用上,这里适配dubbo协议,SPI默认采用dubbo协议。DubboInvoker invoker = new DubboInvoker(serviceType, url, getClients(url), invokers)中的getClient会进行客户端的获取,获取客户端:分两种情况,一种是共享客户端,一种是创建新的客户端。进行共享客户端的获取时,会首先进行检查,然后进行计数,将获取的客户端放入到referenceClientMap中。如果客户端list不可用,则创建一个新的代替它,buildReferenceCountExchangeClient(url),此时会进行initClient操作。此时默认采用Netty创建服务器,在前面我们的服务提供者开启服务的时候,是采用的bind方法,而服务消费者则采用的是connect方法。采用conntect获取NettyClient,从而进行doOpen、doConnect、close操作。
getProxy(invoker,ProtocolUtils.isGeneric(generic))是先准备好配置信息,将其进行组装,然后进行协议适配,主要有两个:JavassistProxyFactory、JDKProxyFactory,适配之后,采用模板进行代理方法的生成。这个和服务提供者类似。