作者:vivo 互联网服务器团队- Wang Fei、LinYupan
Dubbo泛化调用特性可以在不依赖服务接口API包的场景中发起远程调用, 这种特性特别适合框架集成和网关类应用开发。
本文结合在实际开发过程中所遇到的需要远程调用多个三方系统的问题,阐述了如何利用Dubbo泛化调用来简化开发降低系统耦合性的项目实践,最后对Dubbo泛化调用的原理进行了深度解析。
一、背景
统一配置平台是一个提供终端设备各个模块进行文件配置和文件下发能力的平台,模块开发在后台服务器进行文件配置,然后终端设备可以按照特定规则获取对应的配置文件,文件下发可以按照多种设备维度进行下发,具体项目框架可以参加下图:
现有的下发策略,都是由模块开发在统一配置后台服务器进行下发维度配置,文件是否下发到对应终端设备,由用户在本平台所选择的维度所确定。
但是其他业务方也存在在公司内部的A/B实验平台配置下发规则,来借助统一配置平台每天轮询服务器请求新文件的能力,但是在统一配置平台配置的文件是否能够下发由A/B实验平台来确定,A/B实验平台内会配置对应的规则以及配置统一配置平台对应的文件id,然后统一配置平台就需要针对请求调用A/B实验平台接口来判断文件是否可以下发。
随着公司内部实验平台的增加,越来越多这种由三方平台来决定文件是否下发的对接需求,如何更好更快的应对这种类似的对接需求,是我们需要去深入思考的问题。
二、方案选型
原有统一配置的下发逻辑是先找到所有可以下发的文件,然后判断单个配置文件是否满足设备维度,如果满足则可以下发。现在在对接A/B实验平台以后,文件是否能下发还需要由外部系统来确定,当时设计时考虑过两种方案:
- 方案一:
同样先找到所有可以下发的文件,然后针对单个文件按照①设备维度判断是否匹配,然后②调用A/B实验平台的接口获取这台设备可以下发的文件Id, 再调用③灰度实验平台获取这台设备可以下发的文件id, 最后将前三步获取到的配置文件id进行汇总得到可以下发的文件,如下图所示。
方案一打破了原来文件是否能够下发的判断逻辑,现在除了原有的判断逻辑,还需要额外步骤调用其他系统来追加另外可以下发的文件。并且后续不可避免对接其他三方系统,方案一需要不断增加调用三方接口的逻辑来追加可以下发的文件id。此外常规的dubbo调用在provider端需要引入其他实验系统的二方库以及模型类型,增加了统一配置系统和其他系统的强耦合性。
- 方案二:
利用 Dubbo 泛化调用高级特性抽象一个下发维度(远程调用),专门用于其他想由三方实验系统来决定是否下发文件的场景,如下图所示:
方案二统一抽象一个远程调用下发维度,可以保持原有的判断逻辑,也就是先把系统中所有可以下发的文件先查找出来,然后根据设备维度进行匹配,如果某一个文件配置的是远程调用维度,那么查找这个远程调用维度所包含的函数名称、参数类型数组和参数值对象数组,然后调用三方接口,从而判断这个文件是否可以下发到设备,最终获取到可以下发的文件id列表。
此外,利用Dubbo泛化调用高级特性,调用方并不关心提供者的接口的详细定义,只需要关注调用哪个方法,传什么参数以及接收到什么返回结果即可,这样避免需要依赖服务提供者的二方库以及模型类元,这样可以大大降低consumer端和provider端的耦合性。
综合上面的分析,我们最终确定了方案二采取利用Dubbo泛化调用来抽象一个统一维度的方式,下面来看一下具体的实现。
三、具体实现
1.GenericService是Dubbo提供的泛化接口,用来进行泛化调用。只提供了一个$invoke方法,三个入口参数分别为函数名称、参数类型数组和参数值对象数组。
代码语言:java复制package com.alibaba.dubbo.rpc.service;
/**
* Generic service interface
*
* @export
*/
public interface GenericService {
/**
* Generic invocation
*
* @param method Method name, e.g. findPerson. If there are overridden methods, parameter info is
* required, e.g. findPerson(java.lang.String)
* @param parameterTypes Parameter types
* @param args Arguments
* @return invocation return value
* @throws Throwable potential exception thrown from the invocation
*/
Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException;
- 创建服务引用配置对象ReferenceConfig。
private ReferenceConfig<GenericService> buildReferenceConfig(RemoteDubboRestrictionConfig config) {
ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
referenceConfig.setApplication(applicationConfig);
referenceConfig.setRegistry(registryConfig);
referenceConfig.setInterface(config.getInterfaceName());
referenceConfig.setVersion(config.getVersion());
referenceConfig.setGeneric(Boolean.TRUE.toString());
referenceConfig.setCheck(false);
referenceConfig.setTimeout(DUBBO_INVOKE_TIMEOUT);
referenceConfig.setRetries(DUBBO_INVOKE_RETRIES);
return referenceConfig;
}
3.设置请求参数及服务调用, 这里利用在后台所配置的完整方法名、参数类型数组和参数值数组就可以进行服务调用。
代码语言:java复制public List<Integer> invoke(RemoteDubboRestrictionConfig config, ConfigFileListQuery listQuery) {
//由于ReferenceConfig很重量,里面封装了所有与注册中心及服务提供方连接,所以这里做了缓存
GenericService genericService = prepareGenericService(config);
//构建参数
Map<String, Object> params = buildParams(listQuery);
String method = config.getMethod();
String[] parameterTypeArray = new String[]{Map.class.getName()};
Object[] parameterValueArray = new Object[]{params};
long begin = System.currentTimeMillis();
Assert.notNull(genericService, "cannot find genericService");
//具体调用
Object result = genericService.$invoke(method, parameterTypeArray, parameterValueArray);
if (logger.isDebugEnabled()) {
long duration = System.currentTimeMillis() - begin;
logger.debug("Dubbo调用结果:{}, 耗时: {}", result, duration);
}
return result == null ? Collections.emptyList() : (List<Integer>) result;
}
那么为什么Dubbo泛化调用所涉及的调用方并不关心提供者的接口的详细定义,只需要关注调用哪个方法,传什么参数以及接收到什么返回结果即可呢?
在讲解泛化调用的实现原理之前,先简单讲述一下直接调用的原理。
四、 Dubbo 直接调用相关原理
Dubbo的直接调用相关原理涉及到两个方面:Dubbo服务暴露原理和Dubbo服务消费原理
4.1 Dubbo 服务暴露原理
4.1.1 服务远程暴露的整体流程
在整体上看,Dubbo框架做服务暴露分为两大部分,第一步将持有的服务实例通过代理转 换成Invoker,第二步会把Invoker通过具体的协议(比如Dubbo)转换成Exporter,框架做了 这层抽象也大大方便了功能扩展。
这里的Invoker可以简单理解成一个真实的服务对象实例,是 Dubbo框架实体域,所有模型都会向它靠拢,可向它发起invoke调用。它可能是一个本地的实 现,也可能是一个远程的实现,还可能是一个集群实现。
源代码如下:
代码语言:java复制if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " interfaceClass.getName() " to url " url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " interfaceClass.getName() " url " url " to registry " registryURL);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//向注册中心注册服务信息
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
首先将实现类ref封装为Invoker,之后将invoker转换为exporter,最后将exporter放入缓存Listexporters中。
4.1.2 服务暴露的细节
4.1.2.1 将实现类ref封装为Invoker
代码语言:java复制Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
① dubbo远程暴露的入口在ServiceBean的export()方法,由于servicebean'继承了serviceconfig类,于是真正执行暴露的逻辑是serviceconfig的doExport()方法。
② Dubbo支持相同服务暴露多个协议,比如同时暴露Dubbo和REST协议,也支持多个注册中心,比如zookeeper和nacos,框架内部会依次 对使用的协议都做一次服务暴露,每个协议注册元数据都会写入多个注册中心,具体是执行doExportUrlsFor1Protocol。
③ 然后通过动态代理的方式创建Invoker对象,在服务端生成AbstractProxylnvoker实例,所有真实的方法调用都会委托给代理,然后代理转发给服务实现者 ref 调用;动态代理一般有:JavassistProxyFactory 和 JdkProxyFactory两种方式,这里所选用的JavassistProxyFactory 。
4.1.2.2 将invoker转换为exporter
代码语言:java复制Exporter exporter= protocol.export(wrapperInvoker);
Exporter<?> exporter = protocol.export(wrapperInvoker);
在将服务实例ref转换成Invoker之后,开始执行服务暴露过程。
这里会经过一系列的过滤器链路,最终会通过RegistryProtocol#export 进行更细粒度的控制,比如先进行服务暴露再注册服务元数据。注册中心在做服务暴露时依次 做了以下几件事情:
- 委托具体协议(Dubbo)进行服务暴露,创建NettyServer监听端口和保存服务实例。
- 创建注册中心对象,与注册中心创建TCP连接。
- 注册服务元数据到注册中心。
- 订阅configurators节点,监听服务动态属性变更事件。
- 服务销毁收尾工作,比如关闭端口、反注册服务信息等。
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
//TODO 注册服务元数据
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
这里我们重点讲解委托具体协议进行服务暴露的过程doLocalExport(final InvokeroriginInvoker)。
代码语言:java复制private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
(Exporter) protocol.export(invokerDelegete)方法又会经过一系列的拦截器进行处理,最终调用DubboProtocol的export方法。
代码语言:java复制@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" url.getParameter(Constants.INTERFACE_KEY)
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
这里很重要的一点就是将exporter放到了缓存,这里的key是serviceGroup/serviceName:serviceVersion:port这样形式,这里最后获取到的是com.alibaba.dubbo.demo.DemoService:20880,之后创建DubboExporter。这里的内存缓存exporterMap是很重要的一个属性,在后续消费者调用服务提供者时会被被再次使用到。
至此,服务器提供者的远程暴露流程就基本介绍完毕。
4.2 Dubbo服务消费的实现原理
4.2.1 服务消费的整体流程
在整体上看,Dubbo框架做服务消费也分为两大部分,第一步通过持有远程服务实例生成 Invoker,这个Invoker在客户端是核心的远程代理对象。第二步会把Invoker通过动态代理转换 成实现用户接口的动态代理引用。这里的Invoker承载了网络连接、服务调用和重试等功能,在 客户端,它可能是一个远程的实现,也可能是一个集群实现。
源代码如下:
代码语言:java复制public Object getObject() throws Exception {
return get();
}
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
private void init() {
...
ref = createProxy(map);
}
private T createProxy(Map<String, String> map) {
...
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
}
...
// 创建服务代理
return (T) proxyFactory.getProxy(invoker);
}
4.2.2 服务消费的细节
4.2.2.1 使用Protocol将interfaceClass转化为Invoker
invoker = refprotocol.refer(interfaceClass, url);
① 服务引用的入口点在 ReferenceBean#getObject,由于Referencebean'继承了serviceconfig类,接着会调用Reference的get方法。
② 然后根据引用的接口类型将持有远程服务实例生成 Invoker。
③ 通过一系列的过滤器链,最后调用RegistryProtocol的doRefer方法。
代码语言:java复制private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
registry.register(registeredConsumerUrl);
directory.setRegisteredConsumerUrl(registeredConsumerUrl);
}
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
"," Constants.CONFIGURATORS_CATEGORY
"," Constants.ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
这段逻辑主要完成了注册中心实例的创建,元数据注册到注册中心及订阅的功能。
具体远程Invoker是在哪里创建的呢?客户端调用拦截器又是在哪里构造的呢?
当在directory.subscrib()中 第一次发起订阅时会进行一次数据拉取操作,同时触发RegistryDirectory#notify方法,这里 的通知数据是某一个类别的全量数据,比如providers和routers类别数据。当通知providers数 据时,在RegistryDirectory#toInvokers方法内完成Invoker转换。
代码语言:java复制private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
......
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
.........
if (enabled) {
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" serviceType ",url:(" url ")" t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
核心代码
代码语言:java复制invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
这里会经过一系列的过滤器链,然后最终调用DubboProtocol的refer方法,来创建具体的invoker。
代码语言:java复制@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
这里返回的invoker会用来更新RegistryDirectory的methodInvokerMap 属性,最终在实际调用消费端方法时,会根据method找到对应的invoker列表。
代码语言:java复制private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.methodInvokerMap = null; // Set the method invoker map to null
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
// state change
// If the calculation is wrong, it is not processed.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" invokerUrls.size() ", invoker.size :0. urls :" invokerUrls.toString()));
return;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
4.2.2.2 使用ProxyFactory创建代理
代码语言:java复制(T) proxyFactory.getProxy(invoker)
上述的proxyFactory是ProxyFactory$Adaptive实例,其getProxy内部最终得到是一个被StubProxyFactoryWrapper包装后的JavassistProxyFactory。直接来看JavassistProxyFactory.getProxy方法。
代码语言:java复制public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
【invoker】:MockClusterInvoker实例 【interfaces】:[interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService]
我们最终返回的代理对象其实是一个proxy0对象,当我们调用其sayHello方法时,其调用内部的handler.invoke方法。
代码语言:java复制package com.alibaba.dubbo.common.bytecode;
import com.alibaba.dubbo.demo.DemoService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public class proxy0 implements DemoService {
public static Method[] methods;
private InvocationHandler handler;
public String sayHello(String paramString) {
Object[] arrayOfObject = new Object[1];
arrayOfObject[0] = paramString;
Object localObject = this.handler.invoke(this, methods[0], arrayOfObject);
return (String) localObject;
}
public proxy0() {
}
public proxy0(InvocationHandler paramInvocationHandler) {
this.handler = paramInvocationHandler;
}
}
Dubbo泛化调用一般是服务器提供者都采用直接暴露的形式,消费者端采用服务泛化调用的形式,所以这里重点讨论Dubbo泛化调用与直接调用在消费者端的服务引用和发起消费的区别与联系。
五、Dubbo泛化调用与直接调用的区别与联系
5.1 通过持有远程服务实例生成 Invoker
代码语言:java复制private T createProxy(Map<String, String> map) {
...
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
}
...
// 创建服务代理
return (T) proxyFactory.getProxy(invoker);
}
这里的interfaceClass的来源不一样,createProxy(Mapmap) 是在ReferenceConfig的init()方法中调用的,具体的interfaceClass根据是否是返回调用会有所区别,具体看如下代码:
代码语言:java复制private void init() {
...
if (ProtocolUtils.isGeneric(getGeneric())) {
interfaceClass = GenericService.class;
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, methods);
}
...
ref = createProxy(map);
}
直接调用:interfaceClass→com.alibaba.dubbo.demo.DemoService
泛化调用:interfaceClass→com.alibaba.dubbo.rpc.service.GenericService
最终获取的invoker也不一样
直接调用:
interface com.alibaba.dubbo.demo.DemoService -> dubbo://xx.xx.xx.xx:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&bean.name=com.alibaba.dubbo.demo.DemoService&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=24932&qos.port=33333®ister.ip=xx.xx.xx.xx&remote.timestamp=1640744945905&side=consumer×tamp=1640745033688
泛化调用:
interface com.alibaba.dubbo.rpc.service.GenericService -> dubbo://xx.xx.xx.xx:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=test&bean.name=com.alibaba.dubbo.demo.DemoService&check=false&dubbo=2.0.2&generic=true&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=27952&qos.port=33333®ister.ip=xx.xx.xx.xx&remote.timestamp=1640748337173&side=consumer×tamp=1640748368427
5.2 服务发起消费流程
在4.2.2 服务消费者发起请求细节第①步是将请求参数(方法名,方法参数类型,方法参数值,服务名,附加参数)封装成一个Invocation。
直接调用的RpcInvoaction如下:
RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={}]
泛化调用的RpcInvoaction如下:
RpcInvocation [methodName=$invoke, parameterTypes=[class java.lang.String, class [Ljava.lang.String;, class [Ljava.lang.Object;], arguments=[sayHello, [Ljava.lang.String;@22c1bff0, [Ljava.lang.Object;@30ae230f], attachments={path=com.alibaba.dubbo.demo.DemoService, input=296, dubbo=2.0.2, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0, generic=true}]
我们可以发现这里生成的RpcInvocation对象是有区别的,但是服务提供者暴露的服务是不会发生变化的,所以这里必然有一个转换过程,这里的参数转换关键就在于服务提供者端的GenericImplFilter类。
代码语言:java复制@Activate(group = Constants.PROVIDER, order = -20000)
public class GenericFilter implements Filter {
protected final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
logger.info("----------------GenericFilter-------------------------");
if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !invoker.getInterface().equals(GenericService.class)) {
String name = ((String) inv.getArguments()[0]).trim();
String[] types = (String[]) inv.getArguments()[1];
Object[] args = (Object[]) inv.getArguments()[2];
try {
Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
Class<?>[] params = method.getParameterTypes();
if (args == null) {
args = new Object[params.length];
}
String generic = inv.getAttachment(Constants.GENERIC_KEY);
if (StringUtils.isBlank(generic)) {
generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY);
}
if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)) {
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
} else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
...
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
...
}
Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
if (result.hasException()
&& !(result.getException() instanceof GenericException)) {
return new RpcResult(new GenericException(result.getException()));
}
RpcResult rpcResult;
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
...
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
...
} else {
rpcResult = new RpcResult(PojoUtils.generalize(result.getValue()));
}
rpcResult.setAttachments(result.getAttachments());
return rpcResult;
} catch (NoSuchMethodException e) {
throw new RpcException(e.getMessage(), e);
} catch (ClassNotFoundException e) {
throw new RpcException(e.getMessage(), e);
}
}
return invoker.invoke(inv);
}
}
核心流程:
① 是否是泛化调用判断
代码语言:java复制if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !invoker.getInterface().equals(GenericService.class)) {
② 参数的提取
代码语言:java复制String name = ((String) inv.getArguments()[0]).trim();
String[] types = (String[]) inv.getArguments()[1];
Object[] args = (Object[]) inv.getArguments()[2];
③ 参数的序列化,再构造新的RpcInvocation对象
代码语言:java复制Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
Class<?>[] params = method.getParameterTypes();
if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)) {
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
}
...
Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
序列化前RpcInvocation对象:
RpcInvocation [methodName=$invoke, parameterTypes=[class java.lang.String, class [Ljava.lang.String;, class [Ljava.lang.Object;], arguments=[sayHello, [Ljava.lang.String;@22c1bff0, [Ljava.lang.Object;@30ae230f], attachments={path=com.alibaba.dubbo.demo.DemoService, input=296, dubbo=2.0.2, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0, generic=true}]
序列化后RpcInvocation对象:
RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, input=296, dubbo=2.0.2, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0, generic=true}]
后面的调用逻辑就和直接调用的是一致的了,比如从本地缓存从本地缓存中Map<string, list<invoker>> methodInvokerMap中获取key为sayHello(指定方法名)的List<invoker>,接着进行后续的调用。
那么什么时候触发GenericFilter的invoke方法呢,这里其实就和过滤器的调用链建立有关系了,从GenericFilter类上的注解,我们可以看到@Activate(group = Constants.PROVIDER, order = -20000),说明是在服务提供者端生效的。
另外,服务提供者端是如何知道调用是直接调用还是泛化调用的,这里就涉及到与服务提供者端GenericFilter对应的消费者端的GenericImplFilter类,代码如下:
代码语言:java复制/**
* GenericImplInvokerFilter
*/
@Activate(group = Constants.CONSUMER, value = Constants.GENERIC_KEY, order = 20000)
public class GenericImplFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(GenericImplFilter.class);
private static final Class<?>[] GENERIC_PARAMETER_TYPES = new Class<?>[]{String.class, String[].class, Object[].class};
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String generic = invoker.getUrl().getParameter(Constants.GENERIC_KEY);
if (ProtocolUtils.isGeneric(generic)
&& !Constants.$INVOKE.equals(invocation.getMethodName())
&& invocation instanceof RpcInvocation) {
...
}
if (invocation.getMethodName().equals(Constants.$INVOKE)
&& invocation.getArguments() != null
&& invocation.getArguments().length == 3
&& ProtocolUtils.isGeneric(generic)) {
Object[] args = (Object[]) invocation.getArguments()[2];
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for (Object arg : args) {
if (!(byte[].class == arg.getClass())) {
error(generic, byte[].class.getName(), arg.getClass().getName());
}
}
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
for (Object arg : args) {
if (!(arg instanceof JavaBeanDescriptor)) {
error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName());
}
}
}
((RpcInvocation) invocation).setAttachment(
Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY));
}
return invoker.invoke(invocation);
}
5.3 泛化调用的整体流程图
六、总结
高内聚低耦合是我们架构设计的一个重要目标,而Dubbo的泛化调用特性仅仅只需知道服务的完整接口路径、请求参数类型和请求参数值就可以直接进行调用获取请求结果,能够避免依赖特定三方jar包,从而降低了系统的耦合性。在日常学习和开发的过程中,我们除了需要关注一门技术的常规使用方法以外,还需要关注一些高级特性,从而做出更合适的架构设计。
参考资料:
- 《深入理解Apache Dubbo与实战》 诣极,林琳
- Dubbo源码分析-泛化调用
- Dubbo泛化调用使用及原理解析