微信公众号:PersistentCoder
一、概述
前边一篇文章分析了Dubbo服务的暴露和注册,那么消费端是如何引用和调用的呢?
从dubbo的架构设计中,我们可以看出服务启动时,除了本地暴露服务之外会把服务注册到注册中心,那么作为消费端,在服务启动的时候则会向注册中心订阅需要调用的服务,然后在调用的时候通过注册中心拿到的地址做负载后选择合适的服务,然后建立连接并实现调用。
消费端和服务端的数据交互是通过dubbo重写的netty实现。本篇文章将详细的分析Dubbo服务的调用原理,为了便于分析和理解,将内容拆分成了服务引用和服务调用两个模块。
以下是分析过程中可能会用到的概念:
- registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService
- cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance
- protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
- exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
- transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
注意
dubbo版本2.7.8
二、服务引用
所谓服务引用,就是消费端应用启动时,将需要调用的服务端接口生成代理,并实例化注入到消费端的服务中。官方给的服务引用时序图如下:
前篇文章分析道DubboComponentScan注解会导入DubboComponentScanRegistrar,它会注册启动过程中用到的基础组件,其中就包括ReferenceAnnotationBeanPostProcessor:
代码语言:javascript复制registerInfrastructureBean(registry, ReferenceAnnotationBeanPostProcessor.BEAN_NAME,
ReferenceAnnotationBeanPostProcessor.class);
ReferenceAnnotationBeanPostProcessor是一个继承自BeanPostProcessor的InstantiationAwareBeanPostProcessor,其父类重写了postProcessPropertyValues方法,会在刷新上下文创建完Bean实例后调用该方法进行属性填充,具体可参考《@Autowired注解原理分析》,看一下AbstractAnnotationBeanPostProcessor的重写实现:
代码语言:javascript复制@Override
public PropertyValues postProcessPropertyValues(
PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException {
InjectionMetadata metadata = findInjectionMetadata(beanName, bean.getClass(), pvs);
try {
metadata.inject(bean, beanName, pvs);
} catch (BeanCreationException ex) {
throw ex;
} catch (Throwable ex) {
throw new BeanCreationException(beanName, "Injection of @" getAnnotationType().getSimpleName()
" dependencies is failed", ex);
}
return pvs;
}
将bean的属性和方法带有的DubboReference和Reference注解封装成InjectionMetadata,然后执行注入,注解寻找和封装逻辑之前有分析过,此处不做分析,方法注解封装成AnnotatedMethodElement,属性注解封装成AnnotatedFieldElement,注入的时候会调用ReferenceAnnotationBeanPostProcessor的doGetInjectedBean获取要注入的bean,看一下实现:
代码语言:javascript复制@Override
protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
InjectionMetadata.InjectedElement injectedElement) throws Exception {
String referencedBeanName = buildReferencedBeanName(attributes, injectedType);
String referenceBeanName = getReferenceBeanName(attributes, injectedType);
ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType);
boolean localServiceBean = isLocalServiceBean(referencedBeanName, referenceBean, attributes);
prepareReferenceBean(referencedBeanName, referenceBean, localServiceBean);
registerReferenceBean(referencedBeanName, referenceBean, attributes, localServiceBean, injectedType);
cacheInjectedReferenceBean(referenceBean, injectedElement);
return referenceBean.get();
}
该方法的作用是构造ReferenceBean实例并返回,从继承关系可以看出它是一个FactoryBean,所以最终返回的对象是其get方法实现。
先看一下ReferenceBean的构造,会调用AnnotatedInterfaceConfigBeanBuilder的build方法:
代码语言:javascript复制public final C build() throws Exception {
checkDependencies();
C configBean = doBuild();
configureBean(configBean);
return configBean;
}
然后调用子类ReferenceBeanBuilder的doBuild方法创建ReferenceBean并返回:
代码语言:javascript复制@Override
protected ReferenceBean doBuild() {
return new ReferenceBean<Object>();
}
继续看ReferenceBean#get实现:
代码语言:javascript复制
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" url ") has already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
init方法会调用createProxy返回接口代理:
代码语言:javascript复制private T createProxy(Map<String, String> map) {
if (shouldJvmRefer(map)) {
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
} else {
urls.clear();
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (UrlUtils.isRegistry(url)) {
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
// if protocols not injvm checkRegistry
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
checkRegistry();
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " interfaceName " on the consumer " NetUtils.getLocalHost() " use dubbo version " Version.getVersion() ", please config <dubbo:registry address="..." /> to your spring config.");
}
}
}
if (urls.size() == 1) {
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// for multi-subscription scenario, use 'zone-aware' policy by default
String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
} else { // not a registry url, must be direct invoke.
String cluster = CollectionUtils.isNotEmpty(invokers)
? (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) : Cluster.DEFAULT)
: Cluster.DEFAULT;
invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
}
}
}
String metadata = map.get(METADATA_KEY);
WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
if (metadataService != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataService.publishServiceDefinition(consumerURL);
}
// create service proxy
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
代码很多,不过逻辑比较清晰。首先根据配置检查是否为本地调用,如果是则调用InjvmProtocol 的 refer 方法生成 InjvmInvoker 实例。如果不是则读取直连配置项,或注册中心 url,并将读取到的 url 存储到 urls 中。然后根据 urls 元素数量进行后续操作。如果urls 元素数量为1,则直接通过 Protocol 自适应拓展类构建 Invoker 实例接口。如果urls 元素数量大于1,即存在多个注册中心或服务直连 url,此时先根据 url 构建 Invoker。然后再通过 Cluster 合并多个 Invoker,最后调用 ProxyFactory 生成代理类。
该方法我们分析两个核心的点,构造invoker和生成代理,分开做一下分析。
构造Invoker
对于非直连的服务调用,会使用RegistryProtocol#refer构造Invoker:
代码语言:javascript复制public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//这里把url的protocol参数从registry替换成了dubbo。为下面的正在初始化做准备
url = getRegistryUrl(url);
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url);
}
}
Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
return doRefer(cluster, registry, type, url);
}
首先为 url 设置协议头,然后根据 url 参数加载注册中心实例,接着调用内部doRefer方法实现:
代码语言:javascript复制private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建 RegistryDirectory 实例
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 设置注册中心和协议
directory.setRegistry(registry);
directory.setProtocol(protocol);
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
// 生成服务消费者链接
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
// 注册服务消费者,在 consumers 目录下新节点
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
// 订阅 providers节点数据,会触发创建连接NettyClient
directory.subscribe(toSubscribeUrl(subscribeUrl));
// 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
Invoker<T> invoker = cluster.join(directory);
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker);
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, registryInvokerWrapper);
}
return registryInvokerWrapper;
}
doRefer 方法创建一个 RegistryDirectory 实例,然后生成服务者消费者链接,并向注册中心进行注册。注册完毕后,紧接着订阅 providers等节点下的数据。完成订阅后,RegistryDirectory 会收到这几个节点下的子节点信息。由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker。
但是这里并没有看到初始化NettyClient和创建连接的代码,重点在于directory.subscribe,此流程会触发NettyClient初始化和连接,看一下实现:
代码语言:javascript复制public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
registry.subscribe(url, this);
}
此处的registry类型是ListenerRegistryWrapper,继续看其subscribe实现:
代码语言:javascript复制@Override
public void subscribe(URL url, NotifyListener listener) {
try {
registry.subscribe(url, listener);
} finally {
if (CollectionUtils.isNotEmpty(listeners)) {
RuntimeException exception = null;
for (RegistryServiceListener registryListener : listeners) {
if (registryListener != null) {
try {
registryListener.onSubscribe(url);
} catch (RuntimeException t) {
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}
然后会调用到FailbackRegistry的subscribe方法:
代码语言:javascript复制@Override
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
}
}
doSubscribe方法调用取决于用什么注册中心,我们使用Nacos分析,那么看一下NacosRegistry的doSubscribe实现:
代码语言:javascript复制private void doSubscribe(final URL url, final NotifyListener listener, final Set<String> serviceNames) {
execute(namingService -> {
if (isServiceNamesWithCompatibleMode(url)) {
List<Instance> allCorrespondingInstanceList = Lists.newArrayList();
for (String serviceName : serviceNames) {
List<Instance> instances = namingService.getAllInstances(serviceName,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP));
NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
allCorrespondingInstanceList.addAll(instances);
}
notifySubscriber(url, listener, allCorrespondingInstanceList);
for (String serviceName : serviceNames) {
subscribeEventListener(serviceName, url, listener);
}
} else {
List<Instance> instances = new LinkedList<>();
for (String serviceName : serviceNames) {
instances.addAll(namingService.getAllInstances(serviceName
, getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)));
notifySubscriber(url, listener, instances);
subscribeEventListener(serviceName, url, listener);
}
}
});
}
从Nacos的namespace获取接口的所有节点实例,然后通知订阅者(初始化),经过NacosRegistry和FailbackRegistry,最终调用到AbstractRegistry的notify方法:
代码语言:javascript复制protected void notify(URL url, NotifyListener listener, List<URL> urls) {
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
listener.notify(categoryList);
saveProperties(url);
}
}
url是消费端的url,urls是服务提供者的url,listener是前边提到的RegistryDirectory,所以listener.notify会调用到RegistryDirectory的notify实现:
代码语言:javascript复制@Override
public synchronized void notify(List<URL> urls) {
//省略
refreshOverrideAndInvoker(providerURLs);
}
然后经过几层转换会调用到toInvokers方法:
代码语言:javascript复制private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
Set<String> keys = new HashSet<>();
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
for (URL providerUrl : urls) {
// 省略
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
if (url.hasParameter(DISABLED_KEY)) {
enabled = !url.getParameter(DISABLED_KEY, false);
} else {
enabled = url.getParameter(ENABLED_KEY, true);
}
if (enabled) {
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
}
}
keys.clear();
return newUrlInvokerMap;
}
这里又出现了protocol.refer,由于前边把url的protocol参数从registry替换成了dubbo,所以此处会调用DubboProtocol的refer实现:
代码语言:javascript复制public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return new AsyncToSyncInvoker(this.protocolBindingRefer(type, url));
}
调用protocolBindingRefer方法构造并返回Invoker,看protocolBindingRefer实现:
代码语言:javascript复制@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
方法看起来比较简单,不过这里有一个调用需要我们注意一下,即 getClients。这个方法用于获取客户端实例,实例类型为 ExchangeClient。ExchangeClient 实际上并不具备通信能力,它需要基于更底层的客户端实例进行通信。比如 NettyClient、MinaClient 等,默认情况下,Dubbo 使用 NettyClient 进行通信。接下来,我们简单看一下 getClients 方法的逻辑。
代码语言:javascript复制private ExchangeClient[] getClients(URL url) {
// 是否共享连接
boolean service_share_connect = false;
// 获取连接数,默认为0,表示未配置
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// 如果未配置 connections,则共享连接
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i ) {
if (service_share_connect) {
// 获取共享客户端
clients[i] = getSharedClient(url);
} else {
// 初始化新的客户端
clients[i] = initClient(url);
}
}
return clients;
}
这里根据 connections 数量决定是获取共享客户端还是创建新的客户端实例,默认情况下,使用共享客户端实例。getSharedClient 方法中也会调用 initClient 方法,直接看initClient实现。
代码语言:javascript复制private ExchangeClient initClient(URL url) {
// 获取客户端类型,默认为 netty
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
// 添加编解码和心跳包参数到 url 中
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 检测客户端类型是否存在,不存在则抛出异常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: ...");
}
ExchangeClient client;
try {
// 获取 lazy 配置,并根据配置值决定创建的客户端类型
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
// 创建懒加载 ExchangeClient 实例
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 创建普通 ExchangeClient 实例
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service...");
}
return client;
}
initClient方法首先获取用户配置的客户端类型,默认为 netty。然后检测用户配置的客户端类型是否存在,不存在则抛出异常。最后根据 lazy 配置决定创建什么类型的客户端。这里的 LazyConnectExchangeClient 代码并不复杂,会在request方法被调用时通过Exchangers的connect方法创建ExchangeClient客户端,此处就不展开分析了。下面我们分析一下 Exchangers的connect 方法。
代码语言:javascript复制public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
//省略
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 获取 Exchanger 实例,默认为 HeaderExchangeClient
return getExchanger(url).connect(url, handler);
}
getExchanger会通过SPI加载HeaderExchanger实例,方法比较简单。接下来分析 HeaderExchanger.connect 的实现。
代码语言:javascript复制public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 这里包含了多个调用,分别如下:
// 1. 创建 HeaderExchangeHandler 对象
// 2. 创建 DecodeHandler 对象
// 3. 通过 Transporters 构建 Client 实例
// 4. 创建 HeaderExchangeClient 对象
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
调用比较多,我们这里重点看一下 Transporters 的 connect 方法:
代码语言:javascript复制public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
//省略
// 获取 Transporter 自适应拓展类,并调用 connect 方法生成 Client 实例
return getTransporter().connect(url, handler);
}
getTransporter方法返回的是自适应拓展类,该类会在运行时根据客户端类型加载指定的Transporter实现类。若用户未配置客户端类型,则默认加载 NettyTransporter,并调用该类的 connect 方法。
代码语言:javascript复制public Client connect(URL url, ChannelHandler listener) throws RemotingException {
// 创建 NettyClient 对象
return new NettyClient(url, listener);
}
然后会调用NettyClient以及父类AbstractClient的构造函数做一些配置初始化,并且会调用NettyClient的doOpen方法开启客服端和打开连接(代码省略)。
代码语言:javascript复制@Override
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(NIO_EVENT_LOOP_GROUP)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(socketChannelClass());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));
}
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
if(socksProxyHost != null) {
int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
ch.pipeline().addFirst(socks5ProxyHandler);
}
}
});
}
DubboProtocol的引用已经分析完了,RegistryProtocol的refer,doRefer 方法创建一个 RegistryDirectory 实例,然后生成服务者消费者链接,并向注册中心进行注册。注册完毕后,紧接着订阅 providers等节点下的数据。完成订阅后,RegistryDirectory 会收到这几个节点下的子节点信息。由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker。也就是说同一个服务存在多个实例的情况下,会封装成Cluster类型的Invoker,里边会封装负载均衡逻辑,负载处理后选择合适的Invoker进行调用。
生成代理
Invoker 创建完毕后,接下来要做的事情是为服务接口生成代理对象。有了代理对象,即可进行远程调用,回到ReferenceConfig#createProxy方法,最后会调用PROXY_FACTORY.getProxy生成代理并返回:
代码语言:javascript复制public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
Class<?>[] interfaces = null;
// 获取接口列表
String config = invoker.getUrl().getParameter("interfaces");
if (config != null && config.length() > 0) {
// 切分接口列表
String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length 2];
// 设置服务接口类和 EchoService.class 到 interfaces 中
interfaces[0] = invoker.getInterface();
interfaces[1] = EchoService.class;
for (int i = 0; i < types.length; i ) {
// 加载接口类
interfaces[i 1] = ReflectUtils.forName(types[i]);
}
}
}
if (interfaces == null) {
interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
}
// 为 http 和 hessian 协议提供泛化调用支持,参考 pull request #1827
if (!invoker.getInterface().equals(GenericService.class) && generic) {
int len = interfaces.length;
Class<?>[] temp = interfaces;
// 创建新的 interfaces 数组
interfaces = new Class<?>[len 1];
System.arraycopy(temp, 0, interfaces, 0, len);
// 设置 GenericService.class 到数组中
interfaces[len] = GenericService.class;
}
// 调用重载方法
return getProxy(invoker, interfaces);
}
public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
getProxy(Invoker, Class<?>[]) 这个方法是一个抽象方法,dubbo默认使用Javassist生成代理,下面我们到 JavassistProxyFactory 类中看一下该方法的实现代码。
代码语言:javascript复制public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
// 生成 Proxy 子类(Proxy 是抽象类)。并调用 Proxy 子类的 newInstance 方法创建 Proxy 实例
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
将invoker封装成InvokerInvocationHandler生成代理类实现并返回,服务调用的时候会调用其对InvocationHandler的invoke实现。
服务引用的时序图大致如下:
服务引用完成后consumer持有封装后的Invoker,并且里边多个Invoker和服务端NettyServer建立了连接。
三、服务调用
服务消费者通过代理对象 Proxy 发起远程调用,接着通过网络客户端 Client 将编码后的请求发送给服务提供方的网络层上,也就是 Server。Server 在收到请求后,首先要做的事情是对数据包进行解码。然后将解码后的请求发送至分发器 Dispatcher,再由分发器将请求派发到指定的线程池上,最后由线程池调用具体的服务。这就是一个远程调用请求的发送与接收过程。
dubbo默认使用javassist生成代理对象,那么consumer调用服务的时候,会通过Proxy持有的InvocationHandler发起,看下InvokerInvocationHandler的invoke方法实现:
代码语言:javascript复制public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// 拦截定义在 Object 类中的方法(未被子类重写),比如 wait/notify
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
// 如果 toString、hashCode 和 equals 等方法被子类重写了,这里也直接调用
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// 将 method 和 args 封装到 RpcInvocation 中,并执行后续的调用
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
此处的Invoker就是前边服务引用中生成的ClusterInvoker,Dubbo 主要提供了以下几种ClusterInvoker实现方式:
- FailoverClusterInvoker - 失败自动切换
- FailfastClusterInvoker - 快速失败
- FailsafeClusterInvoker - 失败安全
- Failback ClusterInvoker - 失败自动恢复
- Forking Cluster - 并行调用多个服务提供者
- ZoneAwareClusterInvoker - 多注册中心
dubbo默认使用FailoverClusterInvoker。此处的invoke调用会调用到AbstractClusterInvoker的invoke 方法:
代码语言:javascript复制@Override
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// 绑定 attachments 到 invocation 中.
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
// 列举 Invoker
List<Invoker<T>> invokers = list(invocation);
// 加载 LoadBalance
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 调用 doInvoke 进行后续操作
return doInvoke(invocation, invokers, loadbalance);
}
// 抽象方法,由子类实现
protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
LoadBalance loadbalance) throws RpcException;
AbstractClusterInvoker的invoke方法主要用于列举Invoker,以及加载LoadBalance。最后再调用模板方法doInvoke进行后续操作。我们来看一下FailoverClusterInvoker的doInvoke逻辑:
代码语言:javascript复制@Override
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
// 获取重试次数
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) 1;
if (len <= 0) {
len = 1;
}
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
// 循环调用,失败重试
for (int i = 0; i < len; i ) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
// 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
// 通过调用 list 可得到最新可用的 Invoker 列表
copyInvokers = list(invocation);
// 对 copyinvokers 进行判空检查
checkInvokers(copyInvokers, invocation);
}
// 通过负载均衡选择 Invoker
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
// 添加到 invoker 到 invoked 列表中
invoked.add(invoker);
// 设置 invoked 到 RPC 上下文中
RpcContext.getContext().setInvokers((List) invoked);
try {
// 调用目标 Invoker 的 invoke 方法
return invoker.invoke(invocation);
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
// 若重试失败,则抛出异常
throw new RpcException(le);
}
doInvoke方法首先是获取重试次数,然后根据重试次数进行循环调用,失败后进行重试。在 for 循环内,首先是通过负载均衡组件选择一个 Invoker,然后再通过这个Invoker的invoke 方法进行远程调用。如果失败了,记录下异常,并进行重试。重试时会再次调用父类的list方法列举Invoker。
这里的invoke会调用到AbstractInvoker的invoke方法:
代码语言:javascript复制public abstract class AbstractInvoker<T> implements Invoker<T> {
public Result invoke(Invocation inv) throws RpcException {
RpcInvocation invocation = (RpcInvocation) inv;
// 设置 Invoker
invocation.setInvoker(this);
if (attachment != null && attachment.size() > 0) {
// 设置 attachment
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
// 添加 contextAttachments 到 RpcInvocation#attachment 变量中
invocation.addAttachments(contextAttachments);
}
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
// 设置异步信息到 RpcInvocation#attachment 中
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
// 抽象方法,由子类实现
return doInvoke(invocation);
} catch (InvocationTargetException e) {
// ...
} catch (RpcException e) {
// ...
} catch (Throwable e) {
return new RpcResult(e);
}
}
protected abstract Result doInvoke(Invocation invocation) throws Throwable;
// 省略其他方法
}
doInvoke执行后续的调用。doInvoke 是一个抽象方法,需要由子类实现,看一下DubboInvoker的doInvoke实现。
代码语言:javascript复制@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
// 设置 path 和 version 到 attachment 中
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
// 从 clients 数组中获取 ExchangeClient
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = calculateTimeout(invocation, methodName);
// 无返回值
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
//有返回值
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, e);
}
}
request方法其实就是要调用NettyClient的channel发送请求了,会调用到AbstractClient的send方法:
代码语言:javascript复制@Override
public void send(Object message, boolean sent) throws RemotingException {
if (needReconnect && !isConnected()) {
connect();
}
Channel channel = getChannel();
//TODO Can the value returned by getChannel() be null? need improvement.
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" getUrl());
}
channel.send(message, sent);
}
先调用子类NettyClient的getChannel实现获取NettyChannel,然后使用NettyChannel发送数据,NettyChannel是对Netty原生Channel的封装,具体发送数据要委托给原生Channel执行。
到这里消费端的请求调用已经发送出去了,等服务端接收到请求处理完之后消费端则返回调用,请求调用的时序如下:
代码语言:javascript复制 —> InvokerInvocationHandler#invoke(Object, Method, Object[])
—> MockClusterInvoker#invoke(Invocation)
—> AbstractClusterInvoker#invoke(Invocation)
—> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
—> Filter#invoke(Invoker, Invocation) // 包含多个 Filter 调用
—> ListenerInvokerWrapper#invoke(Invocation)
—> AbstractInvoker#invoke(Invocation)
—> DubboInvoker#doInvoke(Invocation)
—> ReferenceCountExchangeClient#request(Object, int)
—> HeaderExchangeClient#request(Object, int)
—> HeaderExchangeChannel#request(Object, int)
—> AbstractPeer#send(Object)
—> AbstractClient#send(Object, boolean)
—> NettyChannel#send(Object, boolean)
—> NioClientSocketChannel#write(Object)
DubboInvoker服务调用后封装成CompletableFuture,底层由HeaderExchangeChannel封装成DefaultFuture来实现,DefaultFuture被创建时,会传入一个 Request 对象。此时 DefaultFuture 可从 Request 对象中获取调用编号,并将调用编号与DefaultFuture映射关系存入到静态 Map 中。线程池中的线程在收到Response 对象后,会根据 Response 对象中的调用编号取出相应的DefaultFuture 对象,然后再将Response对象设置到DefaultFuture对象中。最后再唤醒用户线程,这样用户线程即可从 DefaultFuture 对象中获取调用结果了。过程如下:
服务调用的整体时序图大致如下:
四、总结
本篇文章详细分析了dubbo消费端的服务引用和服务调用,回顾一下整个过程,可以简单描述为,消费端应用启动时,构造服务端接口代理,包装Invoker信息以及建立与服务端的Netty连接,然后在调用远程服务时,从集群Invoker中通过负载均衡找到合适的Invoker,通过Netty的Channel发送请求数据,然后服务端接收到请求数据,经过编解码,业务处理以及数据出站后回写到消费端的Channel,消费端把数据解析封装成指定的返回数据类型。至于服务端的请求数据接收和业务逻辑处理不在本篇文章分析范围,后续会单独拉出来做分析。
参考
https://github.com/apache/dubbo
https://dubbo.apache.org/zh/docs/v2.7/dev/source/refer-service/
https://dubbo.apache.org/zh/docs/v2.7/dev/source/service-invoking-process/
https://dubbo.apache.org/zh/docs/v2.7/dev/source/cluster/