1 Consumer发起请求
在前文的分析中可知消费端注册的ReferenceBean实现了FactoryBean接口,获取实例时是调用其getObject方法获取到真正的实例,在getObject中使用Spring的ProxyFactory生成代理对象,如果是接口的话,默认使用jdk的动态代理来实现,得到的实例如下
可以看到demoService就是使用jdk的动态代理生成的对象,jdk的动态代理需要实现InvocationHandler接口,调用代理对象的方法时,统一走到其invoke方法
代码语言:javascript复制public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object oldProxy = null;
boolean setProxyContext = false;
TargetSource targetSource = this.advised.targetSource;
Object target = null;
try {
......
Object retVal;
......
// 获取代理的目标对象
target = targetSource.getTarget();
Class<?> targetClass = (target != null ? target.getClass() : null);
// 获取有没有切面方法
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
if (chain.isEmpty()) {
// 没有切面就利用反射直接执行target的method方法
Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
}
else {
// 有切面就封装一个MethodInvocation,在方法前或者方法后执行切面的方法
MethodInvocation invocation =
new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
retVal = invocation.proceed();
}
......
return retVal;
}
finally {
......
}
}
发起调用的是在AopUtils.invokeJoinpointUsingReflection方法
代码语言:javascript复制public static Object invokeJoinpointUsingReflection(@Nullable Object target, Method method, Object[] args)
throws Throwable {
// Use reflection to invoke the method.
try {
ReflectionUtils.makeAccessible(method);
return method.invoke(target, args);
}
catch (InvocationTargetException ex) {
throw ex.getTargetException();
}
catch (IllegalArgumentException ex) {
throw new AopInvocationException("AOP configuration seems to be invalid: tried calling method ["
method "] on target [" target "]", ex);
}
catch (IllegalAccessException ex) {
throw new AopInvocationException("Could not access method [" method "]", ex);
}
}
核心代码就一行method.invoke(target, args),target也是一个代理对象,Dubbo默认是使用javassist生成,所以最终会走到InvokerInvocationHandler的invoke方法
代码语言:javascript复制public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return invoker.toString();
} else if ("$destroy".equals(methodName)) {
invoker.destroy();
return null;
} else if ("hashCode".equals(methodName)) {
return invoker.hashCode();
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return invoker.equals(args[0]);
}
RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method.getName(), invoker.getInterface().getName(), protocolServiceKey, method.getParameterTypes(), args);
if (serviceModel instanceof ConsumerModel) {
rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel);
rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method));
}
return InvocationUtil.invoke(invoker, rpcInvocation);
}
核心代码也是就一行,先构造一个RpcInvocation,然后再调用InvocationUtil.invoke方法,这个方法的第一个参数invoker即为上篇文章最后展示的最终生成的MigrationInvoker的实例,看下InvocationUtil.invoke方法
代码语言:javascript复制public static Object invoke(Invoker<?> invoker, RpcInvocation rpcInvocation) throws Throwable {
......
if (ProfilerSwitch.isEnableSimpleProfiler()) {
ProfilerEntry parentProfiler = Profiler.getBizProfiler();
ProfilerEntry bizProfiler;
if (parentProfiler != null) {
bizProfiler = Profiler.enter(parentProfiler,
"Receive request. Client invoke begin. ServiceKey: " serviceKey " MethodName:" rpcInvocation.getMethodName());
} else {
bizProfiler = Profiler.start("Receive request. Client invoke begin. ServiceKey: " serviceKey " " "MethodName:" rpcInvocation.getMethodName());
}
rpcInvocation.put(Profiler.PROFILER_KEY, bizProfiler);
try {
return invoker.invoke(rpcInvocation).recreate();
} finally {
Profiler.release(bizProfiler);
......
}
}
return invoker.invoke(rpcInvocation).recreate();
}
Profiler是请求耗时统计,此处为SimpleProfiler,默认开启,然后再调用invoker.invoke(rpcInvocation).recreate()方法,先看下invoke(rpcInvocation)方法
代码语言:javascript复制public Result invoke(Invocation invocation) throws RpcException {
if (currentAvailableInvoker != null) {
if (step == APPLICATION_FIRST) {
// call ratio calculation based on random value
if (promotion < 100 && ThreadLocalRandom.current().nextDouble(100) > promotion) {
// fall back to interface mode
return invoker.invoke(invocation);
}
// check if invoker available for each time
return decideInvoker().invoke(invocation);
}
return currentAvailableInvoker.invoke(invocation);
}
switch (step) {
case APPLICATION_FIRST:
currentAvailableInvoker = decideInvoker();
break;
case FORCE_APPLICATION:
currentAvailableInvoker = serviceDiscoveryInvoker;
break;
case FORCE_INTERFACE:
default:
currentAvailableInvoker = invoker;
}
return currentAvailableInvoker.invoke(invocation);
}
这里主要是对Dubbo2.x和Dubbo3.x的invoker做一些适配,因为在 Dubbo 3 之前地址注册模型是以接口级粒度注册到注册中心的,而 Dubbo 3 全新的应用级注册模型注册到注册中心的粒度是应用级的。从注册中心的实现上来说是几乎不一样的,这导致了对于从接口级注册模型获取到的 invokers 是无法与从应用级注册模型获取到的 invokers 进行合并的。为了帮助用户从接口级往应用级迁移,所以Dubbo 3 设计了 Migration 机制,这里先不研究Migration的机制,将dubbo.application.service-discovery.migrations属性设置成FORCE_INTERFACE,因为provider也是按照interface注册的,所以上诉代码会走到第一个currentAvailableInvoker.invoke(invocation)方法,看看其逻辑
代码语言:javascript复制public Result invoke(Invocation invocation) throws RpcException {
if (peerFlag) {
// If it's a point-to-point direct connection, invoke the original Invoker
return invoker.invoke(invocation);
}
if (isInjvmExported()) {
// If it's exported to the local JVM, invoke the corresponding Invoker
return injvmInvoker.invoke(invocation);
}
// Otherwise, delegate the invocation to the original Invoker
return invoker.invoke(invocation);
}
然后会走到ScopeClusterInvoker的invoke方法,如上所示,该方法中没啥特殊逻辑,继续往下看
代码语言:javascript复制public Result invoke(Invocation invocation) throws RpcException {
Result result;
String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
if (ConfigUtils.isEmpty(value)) {
//no mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith(FORCE_KEY)) {
if (logger.isWarnEnabled()) {
logger.warn(CLUSTER_FAILED_MOCK_REQUEST,"force mock","","force-mock: " invocation.getMethodName() " force-mock enabled , url : " getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
//fix:#4585
if (result.getException() != null && result.getException() instanceof RpcException) {
RpcException rpcException = (RpcException) result.getException();
if (rpcException.isBiz()) {
throw rpcException;
} else {
result = doMockInvoke(invocation, rpcException);
}
}
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
if (logger.isWarnEnabled()) {
logger.warn(CLUSTER_FAILED_MOCK_REQUEST,"failed to mock invoke","","fail-mock: " invocation.getMethodName() " fail-mock enabled , url : " getUrl(),e);
}
result = doMockInvoke(invocation, e);
}
}
return result;
}
之后就走到了MockClusterInvoker的invoke方法中,该方法中如果需要mock,则直接构造一个mockResult,否则继续调用Invoker,mock就不看了,继续看Invoker的逻辑
代码语言:javascript复制public Result invoke(Invocation invocation) throws RpcException {
return filterInvoker.invoke(invocation);
}
这里就来到了ClusterFilterInvoker的invoke方法,啥也没做直接调用filterInvoker.invoke
代码语言:javascript复制public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = filterInvoker.invoke(invocation);
asyncResult.whenCompleteWithContext((r, t) -> {
RuntimeException filterRuntimeException = null;
for (int i = filters.size() - 1; i >= 0; i--) {
FILTER filter = filters.get(i);
try {
InvocationProfilerUtils.releaseDetailProfiler(invocation);
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {
if (listener != null) {
if (t == null) {
listener.onResponse(r, filterInvoker, invocation);
} else {
listener.onError(t, filterInvoker, invocation);
}
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof FILTER.Listener) {
FILTER.Listener listener = (FILTER.Listener) filter;
if (t == null) {
listener.onResponse(r, filterInvoker, invocation);
} else {
listener.onError(t, filterInvoker, invocation);
}
}
} catch (RuntimeException runtimeException) {
LOGGER.error(CLUSTER_EXECUTE_FILTER_EXCEPTION, "the custom filter is abnormal", "", String.format("Exception occurred while executing the %s filter named %s.", i, filter.getClass().getSimpleName()));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("Whole filter list is: %s", filters.stream().map(tmpFilter -> tmpFilter.getClass().getSimpleName()).collect(Collectors.toList())));
}
filterRuntimeException = runtimeException;
t = runtimeException;
}
}
if (filterRuntimeException != null) {
throw filterRuntimeException;
}
});
return asyncResult;
}
然后是ClusterCallbackRegistrationInvoker的invoke方法,也是啥也没做直接调用filterInvoker.invoke
代码语言:javascript复制public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Filter " filter.getClass().getName() " invoke.");
asyncResult = filter.invoke(nextNode, invocation);
} catch (Exception e) {
InvocationProfilerUtils.releaseDetailProfiler(invocation);
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
try {
Filter.Listener listener = listenableFilter.listener(invocation);
if (listener != null) {
listener.onError(e, originalInvoker, invocation);
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof FILTER.Listener) {
FILTER.Listener listener = (FILTER.Listener) filter;
listener.onError(e, originalInvoker, invocation);
}
throw e;
} finally {
}
return asyncResult;
}
然后是CopyOfClusterFilterChainNode的invoker方法,从这里开始Filter的处理,并且如果开启了DetailProfilter,在这里设置Filter请求耗时的统计,Filter的处理逻辑就不仔细看了
代码语言:javascript复制public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 每个提供者的invoker,从属性directory中获取
List<Invoker<T>> copyInvokers = invokers;
// invoker非空校验
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
// 获取调用的次数,失败了重试
int len = calculateInvokeTimes(methodName);
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i ) {
if (i > 0) {
// 如果i>0,说明之前的调用失败了,正在重试,重试之前重新获取一遍所有的invoker
checkWhetherDestroyed();
copyInvokers = list(invocation);
// invoker非空校验
checkInvokers(copyInvokers, invocation);
}
// 选择一个invoker,并且把选过的invoker加入到invoked列表,防止重试的时候选到一样的节点
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getServiceContext().setInvokers((List) invoked);
boolean success = false;
try {
Result result = invokeWithContext(invoker, invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn(CLUSTER_FAILED_MULTIPLE_RETRIES,"failed to retry do invoke","","Although retry the method " methodName
" in the service " getInterface().getName()
" was successful by the provider " invoker.getUrl().getAddress()
", but there have been failed providers " providers
" (" providers.size() "/" copyInvokers.size()
") from the registry " directory.getUrl().getAddress()
" on the consumer " NetUtils.getLocalHost()
" using the dubbo version " Version.getVersion() ". Last error is: "
le.getMessage(),le);
}
success = true;
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
if (!success) {
providers.add(invoker.getUrl().getAddress());
}
}
}
// 如果重试都出错,直接抛出异常
throw new RpcException(le.getCode(), "Failed to invoke the method "
methodName " in the service " getInterface().getName()
". Tried " len " times of the providers " providers
" (" providers.size() "/" copyInvokers.size()
") from the registry " directory.getUrl().getAddress()
" on the consumer " NetUtils.getLocalHost() " using the dubbo version "
Version.getVersion() ". Last error is: "
le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
Filerter的逻辑处理完成之后,最后走到FailoverClusterInvoker的doInvoke方法,如上所示,其实就是根据路由策略选一个节点调用,失败选择另外一个节点重试,这里选出来的invoker也是一个带有Filter的invoker,待Filter的逻辑处理完了之后在其最后调用的是DubboInvoker.invoke方法,其实是其父类的invoke方法,如下
代码语言:javascript复制public Result invoke(Invocation inv) throws RpcException {
// if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
if (isDestroyed()) {
logger.warn(PROTOCOL_FAILED_REQUEST, "", "", "Invoker for service " this " on consumer " NetUtils.getLocalHost() " is destroyed, " ", dubbo version is " Version.getVersion() ", this invoker should not be used any longer");
}
RpcInvocation invocation = (RpcInvocation) inv;
// prepare rpc invocation
prepareInvocation(invocation);
// do invoke rpc invocation and return async result
AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);
// wait rpc result if sync
waitForResultIfSync(asyncResult, invocation);
return asyncResult;
}
来看看doInvokeAndReturn方法,最终其实调用子类DubboInvoker.doInvoke方法
代码语言:javascript复制protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
......
// 获取client
ExchangeClient currentClient;
List<? extends ExchangeClient> exchangeClients = clientsProvider.getClients();
if (exchangeClients.size() == 1) {
currentClient = exchangeClients.get(0);
} else {
currentClient = exchangeClients.get(index.getAndIncrement() % exchangeClients.size());
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
......
// 封装Request请求
Request request = new Request();
if (payload != null) {
request.setPayload(payload);
}
request.setData(inv);
request.setVersion(Version.getProtocolVersion());
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
request.setTwoWay(false);
currentClient.send(request, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
// 先看看有返回值的情况,获取执行请求的executor,返回一个Future
// 可以看出,不论是同步调用还是异步调用,都是返回的Future
request.setTwoWay(true);
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(request, timeout, executor).thenApply(AppResponse.class::cast);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " invocation.getMethodName() ", provider: " getUrl() ", cause: " e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " invocation.getMethodName() ", provider: " getUrl() ", cause: " e.getMessage(), e);
}
}
先看看获取执行线程池的方法getCallbackExecutor,这里获取的是回调线程池,即提供者返回数据后的结果处理线程池,而发送请求的是NettyClient的worker线程池
代码语言:javascript复制protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
return new ThreadlessExecutor();
}
return ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()).getExecutor(url);
}
如果是同步调用,每次都新建一个ThreadlessExecutor返回,如果是异步的话,返回的是一个consumer端所有链接共享的CacheThreadPool(这个在2.7.5版本之前是一个链接对应一个线程池)。CacheThreadPool很好理解,就是一个没有队列且对线程数不限制的线程池,那ThreadlessExecutor是个啥呢?顾名思义,ThreadlessExecutor是一个没有线程的线程池,通过execute(Runnable)方法提交给这个执行器的任务不会被调度到特定线程,而其他的Executor就把Runnable交给线程去执行了。这些任务存储在阻塞队列中,只有当thead调用waitAndDrain()方法时才会真正执行。简单来说就是,执行task的thead与调用waitAndDrain()方法的thead完全相同。这是dubbo在2.7.5之后对消费端线程池做的另外一个优化,引用网上的图做个对比
老的线程池模型如下
1、业务线程发出请求,拿到一个 Future 实例。
2、业务线程紧接着调用 future.get 阻塞等待业务结果返回。
3、当业务数据返回后,交由独立的 Consumer 端线程池进行反序列化等处理,并调用 future.set 将反序列化后的业务结果置回。
4、业务线程拿到结果直接返回。
新的线程池模型如下
1、业务线程发出请求,拿到一个 Future 实例。
2、在调用 future.get() 之前,先调用 ThreadlessExecutor.wait(),wait 会使业务线程在一个阻塞队列上等待,直到队列中被加入元素。
3、当业务数据返回后,生成一个 Runnable Task 并放ThreadlessExecutor 队列。
4、业务线程将 Task 取出并在本线程中执行反序列化业务数据并 set 到 Future。
5、业务线程拿到结果直接返回。
可以看到,相比于老的线程池模型,新的线程模型由业务线程自己负责监测并解析返回结果,免去了额外的消费端线程池开销。
再来看看currentClient.request方法,经过一系列调用走到HeaderExchangeChannel.request方法
代码语言:javascript复制public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " request ", cause: The channel " this " is closed!");
}
Request req;
if (request instanceof Request) {
req = (Request) request;
} else {
// create request.
req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
}
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
该方法中直接新建了一个DefaultFuture,然后再调用channel.send将请求发送出去。看看新建的逻辑
代码语言:javascript复制public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
future.setExecutor(executor);
// timeout check
timeoutCheck(future);
return future;
}
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
可以看出,新建的时候将DefaultFuture以request的id为key放入到了FUTURES(Map结构)当中。
这样消费端的请求就发送出去了,消息的编解码部分后面的文章再分析,这里先搞清楚整体的流程。
2 Provider接收请求
接收请求先跳过编解码部分,直接看Netty最后一个handler,NettyServerHandler的处理逻辑
代码语言:javascript复制public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
handler.received(channel, msg);
// trigger qos handler
ctx.fireChannelRead(msg);
}
通过handler的channelRead方法处理,直接调用handler处理,handler.received方法中调用了很多handler处理,也是一个链表结构,跳过不重要的handler,直接看AllChannelHanlder.receive方法
代码语言:javascript复制public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() " error when process received event .", t);
}
}
首先获取处理请求的线程池,然后提交任务,来看看获取线程池的逻辑
代码语言:javascript复制public ExecutorService getSharedExecutorService(Object msg) {
Executor executor = executorSupport.getExecutor(msg);
return executor != null ? (ExecutorService) executor : getSharedExecutorService();
}
获取线程池最终调用getSharedExecutorService方法来获取,首先调用executorSupport.getExecutor方法获取,这个方法中主要就是根据协议去获取对应的线程池以及从exporterMap中根据serviceKey获取到对应的invoker(带有过滤器链的invoker)填入msg中,所以provider端不同的协议使用的是不同的线程池,如果获取不到,就获取全局的共用的线程池。
再来看看提交的任务ChannelEventRunnable的run方法
代码语言:javascript复制public void run() {
InternalThreadLocalMap internalThreadLocalMap = InternalThreadLocalMap.getAndRemove();
try {
if (state == ChannelState.RECEIVED) {
try {
handler.received(channel, message);
} catch (Exception e) {
logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " state " operation error, channel is " channel
", message is " message, e);
}
} else {
......
}
} finally {
InternalThreadLocalMap.set(internalThreadLocalMap);
}
}
又是继续调用handler.receive方法,同样跳过不重要的handler,来到HeaderExchangeHandler.handleRequest方法
代码语言:javascript复制void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
......
// find handler by message class.
Object msg = req.getData();
try {
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn(TRANSPORT_FAILED_RESPONSE, "", "", "Send result to consumer failed, channel is " channel ", msg is " e);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
这个方法中的handler为DubboProtocol中的一个匿名内部类,在provider启动时新建,reply方法处理完成之后由channel返回处理结果,reply方法如下
代码语言:javascript复制public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
(message == null ? null : (message.getClass().getName() ": " message))
", channel: consumer: " channel.getRemoteAddress() " --> provider: " channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
Invoker<?> invoker = inv.getInvoker() == null ? getInvoker(channel, inv) : inv.getInvoker();
// switch TCCL
if (invoker.getUrl().getServiceModel() != null) {
Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader());
}
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(PROTOCOL_FAILED_REFER_INVOKER, "", "", new IllegalStateException("The methodName " inv.getMethodName()
" not found in callback service interface ,invoke will be ignored."
" please update the api interface. url is:"
invoker.getUrl()) " ,invocation is :" inv);
return null;
}
}
RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
核心就是获取到对应的invoker,在获取invoker时,先从Invocation 中的invoker属性获取,因为之前的代码已经填充过该属性,如果没获取到再重新获取一次,然后执行invoke方法,这里的invoker就是provider启动时封装的带有过滤器链处理的invoker,最后一个元素为具体的service的实现类
3 Consumer接收返回结果
消费端接收结果的入口在NettyClientHanlder.channelRead方法(同样编解码的部分先跳过)
代码语言:javascript复制public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
handler.received(channel, msg);
}
同样经过一系列不重要的handler处理之后,直接看AllChannelHanlder.receive方法
代码语言:javascript复制public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() " error when process received event .", t);
}
}
首先获取处理请求的线程池,然后提交任务,来看看获取线程池的逻辑
代码语言:javascript复制public ExecutorService getPreferredExecutorService(Object msg) {
if (msg instanceof Response) {
Response response = (Response) msg;
DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());
// a typical scenario is the response returned after timeout, the timeout response may have completed the future
if (responseFuture == null) {
return getSharedExecutorService();
} else {
ExecutorService executor = responseFuture.getExecutor();
if (executor == null || executor.isShutdown()) {
executor = getSharedExecutorService(msg);
}
return executor;
}
} else {
return getSharedExecutorService(msg);
}
}
先从DeFaultFuture中获取线程池(新建DeFaultFuture时会设置请求处理的线程池),如果获取不到采取获取消费端共用的线程池,同样提交任务之后由ChannelEventRunnable的run方法处理,最终来到来到HeaderExchangeHandler.handleResponse方法
代码语言:javascript复制static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
handleResponse继续调用DefaultFuture.received方法
代码语言:javascript复制public static void received(Channel channel, Response response, boolean timeout) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
// decrease Time
t.cancel();
}
future.doReceived(response);
} else {
logger.warn(PROTOCOL_TIMEOUT_SERVER, "", "", "The timeout response finally returned at "
(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
", response status is " response.getStatus()
(channel == null ? "" : ", channel: " channel.getLocalAddress()
" -> " channel.getRemoteAddress()) ", please check provider side for detailed result.");
}
} finally {
CHANNELS.remove(response.getId());
}
}
然后从FUTURES中获取到对应的DefaultFuture,获取失败记录异常日志,成功则调用future.doReceived方法根据返回结果设置CompletableFuture
代码语言:javascript复制private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else {
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
}
设置完结果之后再回过头来看看DubboInvoker.invoke方法,DubboInvoker是消费端最后发起远程请求的invoker,其实是其父类的invoke方法,如下
代码语言:javascript复制public Result invoke(Invocation inv) throws RpcException {
// if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
if (isDestroyed()) {
logger.warn(PROTOCOL_FAILED_REQUEST, "", "", "Invoker for service " this " on consumer " NetUtils.getLocalHost() " is destroyed, " ", dubbo version is " Version.getVersion() ", this invoker should not be used any longer");
}
RpcInvocation invocation = (RpcInvocation) inv;
// prepare rpc invocation
prepareInvocation(invocation);
// do invoke rpc invocation and return async result
AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);
// wait rpc result if sync
waitForResultIfSync(asyncResult, invocation);
return asyncResult;
}
该方法在doInvokeAndReturn之后获取到一个异步处理结果,然后调用waitForResultIfSync方法
代码语言:javascript复制private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation invocation) {
if (InvokeMode.SYNC != invocation.getInvokeMode()) {
return;
}
try {
/*
* NOTICE!
* must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
* {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
*/
Object timeoutKey = invocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY);
long timeout = RpcUtils.convertToNumber(timeoutKey, Integer.MAX_VALUE);
asyncResult.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RpcException("Interrupted unexpectedly while waiting for remote result to return! method: " invocation.getMethodName() ", provider: " getUrl() ", cause: " e.getMessage(), e);
} catch (ExecutionException e) {
Throwable rootCause = e.getCause();
if (rootCause instanceof TimeoutException) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " invocation.getMethodName() ", provider: " getUrl() ", cause: " e.getMessage(), e);
} else if (rootCause instanceof RemotingException) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " invocation.getMethodName() ", provider: " getUrl() ", cause: " e.getMessage(), e);
} else {
throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " invocation.getMethodName() ", provider: " getUrl() ", cause: " e.getMessage(), e);
}
} catch (java.util.concurrent.TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " invocation.getMethodName() ", provider: " getUrl() ", cause: " e.getMessage(), e);
} catch (Throwable e) {
throw new RpcException(e.getMessage(), e);
}
}
如果是异步调用直接返回,如果是同步调用,则需要调用asyncResult.get方法等待返回结果
代码语言:javascript复制public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
try {
while (!responseFuture.isDone()) {
threadlessExecutor.waitAndDrain();
}
} finally {
threadlessExecutor.shutdown();
}
}
return responseFuture.get(timeout, unit);
}
可以看到,如果执行请求的线程池类型是ThreadlessExecutor ,那么就调用waitAndDrain方法一直等待结果直到完成,最后再调用responseFuture.get获取结果,再来看看waitAndDrain方法,如下
代码语言:javascript复制public void waitAndDrain() throws InterruptedException {
throwIfInterrupted();
Runnable runnable = queue.poll();
if (runnable == null) {
waiter = Thread.currentThread();
try {
while ((runnable = queue.poll()) == null) {
LockSupport.park(this);
throwIfInterrupted();
}
} finally {
waiter = null;
}
}
do {
runnable.run();
} while ((runnable = queue.poll()) != null);
}
当队列中没有任务的时候就park住当前线程(也就是业务线程 ),服务端返回结果,调用execute方法,如下
代码语言:javascript复制public void execute(Runnable runnable) {
RunnableWrapper run = new RunnableWrapper(runnable);
queue.add(run);
if (waiter != SHUTDOWN) {
LockSupport.unpark((Thread) waiter);
} else if (queue.remove(run)) {
throw new RejectedExecutionException();
}
}
execute方法就是简单的将任务入队,然后再唤醒业务线程,业务线程唤醒之后就去执行任务,调用runnable.run()方法设置responseFuture的结果,方法退出。
至此调用接收的完整流程结束。