Spring Cloud Ribbon 全解 (3) - 基本组件实现源码(1)

2021-04-12 15:00:27 浏览数 (1)

本文基于SpringCloud-Dalston.SR5

上一篇我们了解到Ribbon主要由如下几个组件组成:

  1. 所有Ribbon负载均衡器需要实现的接口IClient
  2. 服务实例列表维护机制实现的接口ServerList
  3. 负载均衡数据记录LoadBalancerStats
  4. 负责选取Server的接口ILoadBalancer
  5. 负载均衡选取规则实现的接口IRule
  6. 检查实例是否存活实现的接口IPing
  7. 服务实例列表更新机制实现的接口ServerListUpdater
  8. 服务实例列表过滤机制ServerListFilter

我们会逐个分析

1. 所有Ribbon负载均衡器需要实现的接口IClient

对于这个IClient,之前我们说到执行器逻辑,例如重试还有异常处理,都在这里处理。我们看他的默认抽象类实现AbstractLoadBalancerAwareClient:

AbstractLoadBalancerAwareClient.java

代码语言:javascript复制
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    //获取重试处理器,这个由其他实现类动态实现
    RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);
    //构造LoadBalancerCommand,RxJava风格
    LoadBalancerCommand command = LoadBalancerCommand.builder()
            .withLoadBalancerContext(this)
            .withRetryHandler(handler)
            .withLoadBalancerURI(request.getUri())
            .build();

    try {

        return command.submit(
            new ServerOperation() {
                @Override
                public Observable call(Server server) {
                    //修改原始url为实际的url
                    URI finalUri = reconstructURIWithServer(server, request.getUri());
                    S requestForServer = (S) request.replaceUri(finalUri);
                    try {
                        //执行请求
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } 
                    catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } catch (Exception e) {
        Throwable t = e.getCause();
        if (t instanceof ClientException) {
            throw (ClientException) t;
        } else {
            throw new ClientException(e);
        }
    }

}

public abstract RequestSpecificRetryHandler getRequestSpecificRetryHandler(S request, IClientConfig requestConfig);

这个构造的LoadBalancerCommand是一个RxJava风格的,它包含了重试和异常处理机制:

LoadBalancerCommand.java

代码语言:javascript复制
//返回一个只包含一个Server的Observable,但是每次从负载均衡器中获取一个
private Observable selectServer() {
    return Observable.create(new OnSubscribe() {
        @Override
        public void call(Subscribersuper Server> next) {
            try {
                Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                next.onNext(server);
                next.onCompleted();
            } catch (Exception e) {
                next.onError(e);
            }
        }
    });
}
public Observable submit(final ServerOperation operation) {
    final ExecutionInfoContext context = new ExecutionInfoContext();

    if (listenerInvoker != null) {
        try {
            listenerInvoker.onExecutionStart();
        } catch (AbortExecutionException e) {
            return Observable.error(e);
        }
    }

    //获取在每个服务实例重试的的次数
    final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
    //最多尝试几个服务实例
    final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

    //对于每个服务实例的调用逻辑
    //默认field server是null,通过selectServer()方法获取一个Server
    Observable o = 
            (server == null ? selectServer() : Observable.just(server))
            .concatMap(new Func1>() {
                @Override
                //对于每个Server,按顺序映射为对于每个Server包含重试逻辑的请求调用
                public Observable call(Server server) {
                    //设置上下文
                    context.setServer(server);
                    final ServerStats stats = loadBalancerContext.getServerStats(server);

                    //每个Server包含重试逻辑的请求调用
                    Observable o = Observable
                            .just(server)
                            .concatMap(new Func1>() {
                                @Override
                                public Observable call(final Server server) {
                                    context.incAttemptCount();
                                    //增加Server正在处理的请求计数
                                    loadBalancerContext.noteOpenConnection(stats);

                                    //监听器
                                    if (listenerInvoker != null) {
                                        try {
                                            listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                        } catch (AbortExecutionException e) {
                                            return Observable.error(e);
                                        }
                                    }

                                    //计时器
                                    final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                    //operation.call(server)就是刚刚分析的AbstractLoadBalancerAwareClient传过来的ServerOperation,就是直接对这个Server调用请求
                                    //doOnEach的操作就是记录请求前后的一些数据用于负载均衡数据统计
                                    return operation.call(server).doOnEach(new Observer() {
                                        private T entity;
                                        @Override
                                        public void onCompleted() {
                                            //记录请求完成
                                            recordStats(tracer, stats, entity, null);
                                        }

                                        @Override
                                        public void onError(Throwable e) {
                                            //记录请求结束
                                            recordStats(tracer, stats, null, e);
                                            logger.debug("Got error {} when executed on server {}", e, server);
                                            //发生了错误,通知listener
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                            }
                                        }

                                        @Override
                                        public void onNext(T entity) {
                                            //因为只有调用请求成功只有一个结果(只有一个请求), 这里的entity就是结果,只要收到结果就代表请求成功
                                            this.entity = entity;
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                            }
                                        }                            

                                        private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                            tracer.stop();
                                            loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                        }
                                    });
                                }
                            });

                    if (maxRetrysSame > 0)
                        //是否retry
                        o = o.retry(retryPolicy(maxRetrysSame, true));
                    return o;
                }
            });

    if (maxRetrysNext > 0 && server == null)
        //是否retry,如果retry回调用selectServer()返回下一个Server
        o = o.retry(retryPolicy(maxRetrysNext, false));

    //异常处理
    return o.onErrorResumeNext(new Func1>() {
        @Override
        public Observable call(Throwable e) {
            if (context.getAttemptCount() > 0) {
                //如果超过重试次数,则抛异常
                if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext   1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                            "Number of retries on next server exceeded max "   maxRetrysNext
                              " retries, while making a call for: "   context.getServer(), e);
                }
                else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame   1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                            "Number of retries exceeded max "   maxRetrysSame
                              " retries, while making a call for: "   context.getServer(), e);
                }
            }
            if (listenerInvoker != null) {
                listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
            }
            return Observable.error(e);
        }
    });
}

2. 服务实例列表维护机制实现的接口ServerList

AbstractServerList.java

其实这个抽象类一是在实现ServerList接口的同时,实现了IClientConfigAware这个接口,代表是可配置的。 同时,提供了一个生成默认ServerListFilter(这个Filter的实现类是由NIWSServerListFilterClassName这个配置决定,默认是ZoneAffinityServerListFilter)的方法

代码语言:javascript复制
public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {   

    public AbstractServerListFilter getFilterImpl(IClientConfig niwsClientConfig) throws ClientException{
        try {
            String niwsServerListFilterClassName = niwsClientConfig
                    .getProperty(
                            CommonClientConfigKey.NIWSServerListFilterClassName,
                            ZoneAffinityServerListFilter.class.getName())
                    .toString();

            AbstractServerListFilter abstractNIWSServerListFilter = 
                    (AbstractServerListFilter) ClientFactory.instantiateInstanceWithClientConfig(niwsServerListFilterClassName, niwsClientConfig);
            return abstractNIWSServerListFilter;
        } catch (Throwable e) {
            throw new ClientException(
                    ClientException.ErrorType.CONFIGURATION,
                    "Unable to get an instance of CommonClientConfigKey.NIWSServerListFilterClassName. Configured class:"
                              niwsClientConfig
                                    .getProperty(CommonClientConfigKey.NIWSServerListFilterClassName), e);
        }
    }
}

ConfigurationBasedServerList.java

这个是默认的实现,如果没有特殊配置,ServerList的实现类就是ConfigurationBasedServerList;这个实际上就是从配置中读取ServerList,这个配置可以是动态配置,例如是Archaius

代码语言:javascript复制
public class ConfigurationBasedServerList extends AbstractServerList<Server>  {

    private IClientConfig clientConfig;

    @Override
    public List getInitialListOfServers() {
        return getUpdatedListOfServers();
    }

    @Override
    public List getUpdatedListOfServers() {
        String listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers);
        return derive(listOfServers);
    }

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        this.clientConfig = clientConfig;
    }
    //可以看出这个配置就是以逗号分隔的字符串
    private List derive(String value) {
        List list = Lists.newArrayList();
        if (!Strings.isNullOrEmpty(value)) {
            for (String s: value.split(",")) {
                list.add(new Server(s.trim()));
            }
        }
        return list;
    }
}

DiscoveryEnabledNIWSServerList.java

这个就是从Eureka上面获取Server列表的类,构造的时候需要传入相关配置以及最重要的EurekaClient的Provider来获取合适的EurekaClient以便于获取Server列表。

实现ServerList接口的方法都是基于obtainServersViaDiscovery这个方法:

代码语言:javascript复制
@Override
public List getInitialListOfServers(){
    return obtainServersViaDiscovery();
}

@Override
public List getUpdatedListOfServers(){
    return obtainServersViaDiscovery();
}

private List obtainServersViaDiscovery() {
    List serverList = new ArrayList();

    //如果EurekaClient没有被初始化,则日志报警并返回空的列表
    if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
        logger.warn("EurekaClient has not been initialized yet, returning an empty list");
        return new ArrayList();
    }
    EurekaClient eurekaClient = eurekaClientProvider.get();

    //这里的vipAddresses其实就是微服务名称的各种形式,但是注意,它们代表的是同一个微服务
    if (vipAddresses!=null){
        for (String vipAddress : vipAddresses.split(",")) {
            // if targetRegion is null, it will be interpreted as the same region of client
            List listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
            for (InstanceInfo ii : listOfInstanceInfo) {
                if (ii.getStatus().equals(InstanceStatus.UP)) {

                    //是否覆盖port
                    if(shouldUseOverridePort){
                        if(logger.isDebugEnabled()){
                            logger.debug("Overriding port on client name: "   clientName   " to "   overridePort);
                        }

                        //这里复制一份是因为不希望其他的地方修改原有的实例信息
                        InstanceInfo copy = new InstanceInfo(ii);

                        if(isSecure){
                            ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                        }else{
                            ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                        }
                    }

                    DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                    des.setZone(DiscoveryClient.getZone(ii));
                    serverList.add(des);
                }
            }

            //如果有一个vipAddress有服务列表,我们就不用获取剩余的了
            if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                break; 
            }
        }
    }
    return serverList;
}

到这里我们可以看出,Ribbon和Eureka的配合其实就是Ribbon从Eureka中利用微服务名称获取Server列表;那么这个列表是如何更新的呢,在Eureka的章节我们提到过,Ribbon定时从EurekaClient获取服务实例列表更新,这就涉及到了下一个我们要讲到的Ribbon元素 - 服务实例列表更新机制实现的接口ServerListUpdater

0 人点赞