深入剖析ribbon源码

2020-12-16 10:51:40 浏览数 (1)

开篇提示:本文的讲解中,ribbon底层依赖于OkHttpClient,配置如下:

代码语言:javascript复制
#ribbon配置
ribbon.okhttp.enabled=true
# 请求连接的超时时间 默认的时间为1秒,在RibbonClientConfiguration类
springboot-mybatis.ribbon.ConnectTimeout=2000
# 请求处理的超时时间
springboot-mybatis.ribbon.ReadTimeout=15000

我把ribbon官网上关于ribbon中组件的描述贴出来

代码语言:javascript复制
#决定怎样从ServerList中选择一个Server
Rule - a logic component to determine which server to return from a list
#定时判断服务是否存活
Ping - a component running in background to ensure liveness of servers
#服务列表
ServerList - this can be static or dynamic. If it is dynamic (as used by DynamicServerListLoadBalancer), a background thread will refresh and filter the list at certain interval

当我们使用openfeign作为eureka客户端时,我们在@FeignClient中是不用指定url的,ribbon帮我们做了负载均衡。

回顾一下上篇《再谈openfeign,聊聊它的源代码》,之前提到,不指定url的情况下,如果我们底层httpclient选择了okhttpclient,程序是不会直接调用okhttpclient的,而是会选择使用LoadBalancerFeignClient作为代理去调用,代码如下:

代码语言:javascript复制
public Response execute(Request request, Request.Options options) throws IOException {
  try {
    URI asUri = URI.create(request.url());
    String clientName = asUri.getHost();
    URI uriWithoutHost = cleanUrl(request.url(), clientName);
    FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
        this.delegate, request, uriWithoutHost);

    IClientConfig requestConfig = getClientConfig(options, clientName);
    return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
        requestConfig).toResponse();
  }
  catch (ClientException e) {
    //省略异常处理代码
  }
}

上面的executeWithLoadBalancer调用了AbstractLoadBalancerAwareClient的executeWithLoadBalancer方法,代码如下:

代码语言:javascript复制
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

    try {
        return command.submit(
            new ServerOperation<T>() {
                @Override
                public Observable<T> call(Server server) {
                    //这里的finalUri是http://192.168.0.118:8083/feign/feignReadTimeout
                    URI finalUri = reconstructURIWithServer(server, request.getUri());//这个就是一个拼接url的方法,不细讲了
                    //下面的requestForServer是FeignLoadBalancer,它是AbstractLoadBalancerAwareClient的子类
                    S requestForServer = (S) request.replaceUri(finalUri);
                    try {
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } 
                    catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } //省略异常处理的代码
}

这个方法我们首先创建了一个LoadBalancerCommand对象,然后用这个对象的submit方法来执行请求。从我们上面的注解中我们可以看到,call方法的server参数里面有我们请求要发往的ip和port,这就是ribbon负载均衡器给我们的。我们看一下下面的UML类图:

从这里可以看到,RibbonClientConfiguration类中定义了RibbonLoadBalancerContext这个bean和ZoneAwareLoadBalancer这个bean的初始化,RibbonLoadBalancerContext包装了ZoneAwareLoadBalancer。

Server来源

LoadBalancerCommand在执行submit方法的时候,会选择一个Server(selectServer方法),这个Server里面有feign要发送http请求的地址和端口。

这里有个selectServer()方法,就是获取Server对象的,我们看一下下面的源代码:

代码语言:javascript复制
private Observable<Server> selectServer() {
    return Observable.create(new OnSubscribe<Server>() {
        @Override
        public void call(Subscriber<? super Server> next) {
            try {
                Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                next.onNext(server);
                next.onCompleted();
            } catch (Exception e) {
                next.onError(e);
            }
        }
    });
}

上面的getServerFromLoadBalancer方法在LoadBalancerContext,代码如下:

代码语言:javascript复制
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
    String host = null;
    int port = -1;
    if (original != null) {
      //这里取到是-1
        host = original.getHost();
    }
    if (original != null) {
        Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);      
        //这里返回-1    
        port = schemeAndPort.second();
    }

    // Various Supported Cases
    // The loadbalancer to use and the instances it has is based on how it was registered
    // In each of these cases, the client might come in using Full Url or Partial URL
  //这里是DynamicServerListLoadBalancer
    ILoadBalancer lb = getLoadBalancer();
    if (host == null) {
        // Partial URI or no URI Case
        // well we have to just get the right instances from lb - or we fall back
    //这里的lb是ZoneAwareLoadBalancer
        if (lb != null){
            Server svc = lb.chooseServer(loadBalancerKey);
            if (svc == null){
                throw new ClientException(ClientException.ErrorType.GENERAL,
                        "Load balancer does not have available server for client: "
                                  clientName);
            }
            host = svc.getHost();
            if (host == null){
                throw new ClientException(ClientException.ErrorType.GENERAL,
                        "Invalid Server for :"   svc);
            }
            logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
            return svc;
        } else {
        //省略vipAddresses的逻辑
        }
    } else {
      //省略这部分逻辑
    }
    // end of creating final URL
    if (host == null){
        throw new ClientException(ClientException.ErrorType.GENERAL,"Request contains no HOST to talk to");
    }
    // just verify that at this point we have a full URL

    return new Server(host, port);
}

这里的LoadBalancer我们用的是ZoneAwareLoadBalancer,我们看一下LoadBalancer的继承关系,UML类图如下:

chooseServer方法在ZoneAwareLoadBalancer的父类BaseLoadBalancer中,代码如下:

代码语言:javascript复制
public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
  //内容是BasicCounter{config=MonitorConfig{name=LoadBalancer_ChooseServer, tags=COUNTER, policy=DefaultPublishingPolicy}, count=0}
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
        //这里的rule是ZoneAvoidanceRule,但是choose方法在父类PredicateBasedRule
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

ZoneAvoidanceRule类的继承关系也比较复杂,我们后面再讲。

下面我们看一下PredicateBasedRule中的的choose方法:

代码语言:javascript复制
public Server choose(Object key) {
    ILoadBalancer lb = getLoadBalancer();
  //这里的LoadBalancer是ZoneAwareLoadBalancer
    Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
    if (server.isPresent()) {
        return server.get();
    } else {
        return null;
    }       
}

到这里,我们都看到了ribbon中一个关键的组件,叫ServerList。

从eureka拉取ServerList

从上节的介绍中我们看到,获取server实际上是从ZoneAwareLoadBalancer的getAllServers,这个方法在它的父类BaseLoadBalancer,代码如下:

代码语言:javascript复制
public List<Server> getAllServers() {
    return Collections.unmodifiableList(allServerList);
}

这个allServerList还是在ZoneAwareLoadBalancer的父类BaseLoadBalancer,那么它是怎么来的呢?我们下看下面这个UML类图:

其中DiscoveryClient是核心,这里CacheRefreshThread在定时线程池里面执行,会使用EurekaHttpClient定时的从eureka拉取服务列表并更新。这里的核心方法是fetchRegistry,这个方法在DiscoveryClient创建的时候也会调用,而DiscoveryClient的初始化在EurekaClientAutoConfiguration这个配置类里面。

代码语言:javascript复制
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        // If the delta is disabled or if it is the first time, get all
        // applications
        Applications applications = getApplications();

        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            //去除日志打印
      //
            getAndStoreFullRegistry();
        } else {
            getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX   "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }

    // Notify about cache refresh before updating the instance remote status
    onCacheRefreshed();

    // Update remote status based on refreshed data held in the cache
    updateInstanceRemoteStatus();

    // registry was fetched successfully, so return true
    return true;
}

我们看一下上面方法里面的getAndStoreFullRegistry方法:

代码语言:javascript复制
private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    logger.info("Getting all instance registry info from the eureka server");

    Applications apps = null;
  //这个调用EurekaHttpClientDecorator去向eureka发送请求
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());

    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration   1)) {
      /**
      * 返回的Applications放到了localRegionApps上,
      * 类型是一个原子引用类AtomicReference<Applications>
      *
      * 这个filterAndShuffle方法一定要注意,这里调用了Applications的shuffleInstances方法,
      * 这个方法把拉取到的服务列表放到了virtualHostNameAppMap,从而给Balancer提供服务
      */
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}

EurekaHttpClientDecorator的getApplications方法最终调用了AbstractJerseyEurekaHttpClient类的getApplicationsInternal方法,代码如下:

代码语言:javascript复制
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
    ClientResponse response = null;
    String regionsParamValue = null;
    try {
        WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
        if (regions != null && regions.length > 0) {
            regionsParamValue = StringUtil.join(regions);
            webResource = webResource.queryParam("regions", regionsParamValue);
        }
        Builder requestBuilder = webResource.getRequestBuilder();
        addExtraHeaders(requestBuilder);
    //这里发送了一个get请求
        response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);

        Applications applications = null;
        if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
            applications = response.getEntity(Applications.class);
        }
    //响应封装了applications
        return anEurekaHttpResponse(response.getStatus(), Applications.class)
                .headers(headersOf(response))
                .entity(applications)
                .build();
    } finally {
        //省略处理代码
    
}

而ZoneAwareLoadBalancer获取列表的时候,正是从Applications中的virtualHostNameAppMap获取,UML类图如下:

到这里,整个流程就通了,DiscoveryClient类似于生产者,负责从Eureka拉取服务列表并赋值给Applications,而ZoneAwareLoadBalancer则类似于消费者,从Applications获取服务列表。

缓存更新

ribbon的缓存更新有2个地方,一个是我们之前的配置:

代码语言:javascript复制
springboot-mybatis.ribbon.ServerListRefreshInterval=3

这个key定义在CommonClientConfigKey类,使用的地方在PollingServerListUpdater,代码如下:

代码语言:javascript复制
public synchronized void start(final UpdateAction updateAction) {
    if (isActive.compareAndSet(false, true)) {
        final Runnable wrapperRunnable = new Runnable() {
            @Override
            public void run() {
                if (!isActive.get()) {
                    if (scheduledFuture != null) {
                        scheduledFuture.cancel(true);
                    }
                    return;
                }
                try {
                    updateAction.doUpdate();
                    lastUpdated = System.currentTimeMillis();
                } catch (Exception e) {
                    logger.warn("Failed one update cycle", e);
                }
            }
        };

        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                wrapperRunnable,
                initialDelayMs,
        /**这里就是上面配置的ServerListRefreshInterval**/
                refreshIntervalMs,
                TimeUnit.MILLISECONDS
        );
    } else {
        logger.info("Already active, no-op");
    }
}

上面的updateAction的实现在DynamicServerListLoadBalancer类,代码如下:

代码语言:javascript复制
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
    @Override
    public void doUpdate() {
      /**这里就进入了更新ServerList的代码**/
        updateListOfServers();
    }
};

另一个更新缓存的地方在DiscoveryClient,代码如下:

代码语言:javascript复制
private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        //这个时间默认是30s
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
    //定时线程池
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
            /**这个是定时线程池要执行的任务**/
                        new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
}

CacheRefreshThread的定义如下:

代码语言:javascript复制
class CacheRefreshThread implements Runnable {
    public void run() {
      //这里进入了更新ServerList的代码
        refreshRegistry();
    }
}

关于ping

上面讲的ribbon官方文档提到过ping,是用于探测服务列表中的服务是否正常,如果不正常,则从eureka拉取服务列表并更新。

ping任务的初始化在BaseLoadBalancer构造函数中初始化,代码如下:

代码语言:javascript复制
public BaseLoadBalancer() {
    this.name = DEFAULT_NAME;
    this.ping = null;
    setRule(DEFAULT_RULE);
  //这里开启了ping任务
    setupPingTask();
    lbStats = new LoadBalancerStats(DEFAULT_NAME);
}
void setupPingTask() {
    if (canSkipPing()) {
        return;
    }
    if (lbTimer != null) {
        lbTimer.cancel();
    }
    lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-"   name,
            true);
  //pingIntervalSeconds默认是10ms
    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
    forceQuickPing();
}
class PingTask extends TimerTask {
    public void run() {
        try {
          new Pinger(pingStrategy).runPinger();
        } catch (Exception e) {
            logger.error("LoadBalancer [{}]: Error pinging", name, e);
        }
    }
}

我们看一下ping方法的核心逻辑:

代码语言:javascript复制
public void runPinger() throws Exception {
    if (!pingInProgress.compareAndSet(false, true)) { 
        return; // Ping in progress - nothing to do
    }
    
    // we are "in" - we get to Ping

    Server[] allServers = null;
    boolean[] results = null;

    Lock allLock = null;
    Lock upLock = null;

    try {
        /*
         * The readLock should be free unless an addServer operation is
         * going on...
         */
        allLock = allServerLock.readLock();
        allLock.lock();
        allServers = allServerList.toArray(new Server[allServerList.size()]);
        allLock.unlock();

        int numCandidates = allServers.length;
    //对所有的 server进行ping操作后返回一个boolean类型数组
        results = pingerStrategy.pingServers(ping, allServers);

        final List<Server> newUpList = new ArrayList<Server>();
        final List<Server> changedServers = new ArrayList<Server>();

        for (int i = 0; i < numCandidates; i  ) {
            boolean isAlive = results[i];
            Server svr = allServers[i];
            boolean oldIsAlive = svr.isAlive();

            svr.setAlive(isAlive);

            if (oldIsAlive != isAlive) {
                changedServers.add(svr);
                logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
            }

            if (isAlive) {
          //把在线状态的Server加入新的ServerList
                newUpList.add(svr);
            }
        }
        upLock = upServerLock.writeLock();
        upLock.lock();
    //覆盖掉旧的ServerList
        upServerList = newUpList;
        upLock.unlock();
        //这里的ServerStatusChangeListener没有找到实现类
        notifyServerStatusChangeListener(changedServers);
    } finally {
        pingInProgress.set(false);
    }
}

可见对于ping失败的Server,会从ServerList移除。

聊聊rule

上面讲chooseServer时我们知道,代码在ZoneAwareLoadBalancer的父类BaseLoadBalancer中,代码如下:

代码语言:javascript复制
public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
  //内容是BasicCounter{config=MonitorConfig{name=LoadBalancer_ChooseServer, tags=COUNTER, policy=DefaultPublishingPolicy}, count=0}
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
        //这里的rule是ZoneAvoidanceRule,但是choose方法在父类PredicateBasedRule
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

这里的Rule用于选择从服务列表上返回哪个Server。Rule的继承关系如下图所示:

我们来看一下PredicateBasedRule的choose方法,代码如下:

代码语言:javascript复制
public Server choose(Object key) {
    ILoadBalancer lb = getLoadBalancer();
  //这里getPredicate()返回AbstractServerPredicate
    Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
    if (server.isPresent()) {
        return server.get();
    } else {
        return null;
    }       
}

AbstractServerPredicate的chooseRoundRobinAfterFiltering方法如下:

代码语言:javascript复制
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
    //这里的loadBalancerKey是null
    List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
    if (eligible.size() == 0) {
        return Optional.absent();
    }
    return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
    if (loadBalancerKey == null) {
        return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));            
    } else {
        List<Server> results = Lists.newArrayList();
        for (Server server: servers) {
            if (this.apply(new PredicateKey(loadBalancerKey, server))) {
                results.add(server);
            }
        }
        return results;            
    }
}

public static AbstractServerPredicate ofKeyPredicate(final Predicate<PredicateKey> p) {
    return new AbstractServerPredicate() {
        @Override
        @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP")
        public boolean apply(PredicateKey input) {
            return p.apply(input);
        }            
    };        
}

上面方法的p有2个Predicates,debug的内容如下:

代码语言:javascript复制
p = {Predicates$AndPredicate@14490} "Predicates.and(com.netflix.loadbalancer.ZoneAvoidancePredicate@70d2b344,com.netflix.loadbalancer.AvailabilityPredicate@7380394)"
 components = {ArrayList@14673}  size = 2
  0 = {ZoneAvoidancePredicate@14675} 
  1 = {AvailabilityPredicate@14676}

我们看一下他们的apply方法,先看AvailabilityPredicate:

代码语言:javascript复制
public boolean apply(@Nullable PredicateKey input) {
    LoadBalancerStats stats = getLBStats();
    if (stats == null) {
        return true;
    }
    return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}
private boolean shouldSkipServer(ServerStats stats) {  
    //根据ServerStats进行判断      
    if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) 
            || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
        return true;
    }
    return false;
}

我们再看一下ZoneAvoidancePredicate的apply方法,这段代码的注解很清晰,zone数量小于等于一个或者取不到stats返回true,如果可用的zone里面没有这个server,返回失败:

代码语言:javascript复制
public boolean apply(@Nullable PredicateKey input) {
    if (!ENABLED.get()) {
        return true;
    }
    String serverZone = input.getServer().getZone();
    if (serverZone == null) {
        // there is no zone information from the server, we do not want to filter
        // out this server
        return true;
    }
    LoadBalancerStats lbStats = getLBStats();
    if (lbStats == null) {
        // no stats available, do not filter
        return true;
    }
    if (lbStats.getAvailableZones().size() <= 1) {
        // only one zone is available, do not filter
        return true;
    }
    Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
    if (!zoneSnapshot.keySet().contains(serverZone)) {
        // The server zone is unknown to the load balancer, do not filter it out 
        return true;
    }
    logger.debug("Zone snapshots: {}", zoneSnapshot);
    Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
    logger.debug("Available zones: {}", availableZones);
    if (availableZones != null) {
        return availableZones.contains(input.getServer().getZone());
    } else {
        return false;
    }
}

这2个Predicate是在CompositePredicate的建造者中传入的,代码如下:

代码语言:javascript复制
Builder(AbstractServerPredicate ...primaryPredicates) {
    toBuild = new CompositePredicate();
    Predicate<PredicateKey> chain = Predicates.<PredicateKey>and(primaryPredicates);
    toBuild.delegate =  AbstractServerPredicate.ofKeyPredicate(chain);                
}

而调用这个建造者函数的地方是在ZoneAvoidanceRule初始化的时候传入的

代码语言:javascript复制
public ZoneAvoidanceRule() {
    super();
    ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
    AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
    compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}

private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
    return CompositePredicate.withPredicates(p1, p2)
                         .addFallbackPredicate(p2)
                         .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                         .build();
    
}

当然,Predicate也是支持自定义配置的,感兴趣的大家可以研究,获取配置的代码如下:

代码语言:javascript复制
public void initWithNiwsConfig(IClientConfig clientConfig) {
    ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig);
    AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig);
    compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}

ribbon重试

ribbon重试功能对服务的优雅发布有一定好处。这里首先要提示一下,ribbon负载均衡要依赖于spring的retry,需要pom中添加下面依赖:

代码语言:javascript复制
<dependency>
    <groupId>org.springframework.retry</groupId>
    <artifactId>spring-retry</artifactId>
    <version>1.2.5.RELEASE</version>
</dependency

重试的配置如下:

代码语言:javascript复制
#Ribbon缓存更新周期默认30s,改为3s
springboot-mybatis.ribbon.ServerListRefreshInterval=3
#同一台实例最大重试次数,不包括首次调用
springboot-mybatis.ribbon.MaxAutoRetries=1
#重试负载均衡其他的实例最大重试次数,不包括首次调用
springboot-mybatis.ribbon.MaxAutoRetriesNextServer=1
#对所有操作请求都进行重试,如果改为true,对post也重试
springboot-mybatis.ribbon.OkToRetryOnAllOperations=false
springboot-mybatis.ribbon.retryableStatusCodes=404,408,502,500

下面我给出一个UML类图:

重试的核心代码在LoadBalancerCommand类submit方法,下面的原生注释大家一定能要看一下:

代码语言:javascript复制
/**
 * Create an {@link Observable} that once subscribed execute network call asynchronously with a server chosen by load balancer.
 * If there are any errors that are indicated as retriable by the {@link RetryHandler}, they will be consumed internally by the
 * function and will not be observed by the {@link Observer} subscribed to the returned {@link Observable}. If number of retries has
 * exceeds the maximal allowed, a final error will be emitted by the returned {@link Observable}. Otherwise, the first successful
 * result during execution and retries will be emitted.
 */
public Observable<T> submit(final ServerOperation<T> operation) {
    final ExecutionInfoContext context = new ExecutionInfoContext();
    
    if (listenerInvoker != null) {
        try {
            listenerInvoker.onExecutionStart();
        } catch (AbortExecutionException e) {
            return Observable.error(e);
        }
    }
    //上面配置的MaxAutoRetries和MaxAutoRetriesNextServer,这里都是1
    final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
    final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

    // Use the load balancer
    Observable<T> o = 
            (server == null ? selectServer() : Observable.just(server))
            .concatMap(new Func1<Server, Observable<T>>() {
                @Override
                // Called for each server being selected
                public Observable<T> call(Server server) {
                    context.setServer(server);
                    final ServerStats stats = loadBalancerContext.getServerStats(server);
                    
                    // Called for each attempt and retry
                    Observable<T> o = Observable
                            .just(server)
                            .concatMap(new Func1<Server, Observable<T>>() {
                                @Override
                                public Observable<T> call(final Server server) {
                                    context.incAttemptCount();
                                    loadBalancerContext.noteOpenConnection(stats);
                                    //省略部分代码                                                                  
                                    final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                    
                                    return operation.call(server).doOnEach(new Observer<T>() {
                                        private T entity;
                                        @Override
                                        public void onCompleted() {
                                            recordStats(tracer, stats, entity, null);
                                            // TODO: What to do if onNext or onError are never called?
                                        }

                                        @Override
                                        public void onError(Throwable e) {
                                            recordStats(tracer, stats, null, e);
                                            logger.debug("Got error {} when executed on server {}", e, server);
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                            }
                                        }

                                        @Override
                                        public void onNext(T entity) {
                                            this.entity = entity;
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                            }
                                        }                            
                                        
                                        private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                            tracer.stop();
                                            loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                        }
                                    });
                                }
                            });
                    
                    if (maxRetrysSame > 0) 
                        o = o.retry(retryPolicy(maxRetrysSame, true));
                    return o;
                }
            });
    //初始化会走到这里
    if (maxRetrysNext > 0 && server == null) 
      //retryPolicy会判断错误码类型
        o = o.retry(retryPolicy(maxRetrysNext, false));
    
    return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
        @Override
        public Observable<T> call(Throwable e) {
            if (context.getAttemptCount() > 0) {
          //重试次数超过后要抛出异常
                if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext   1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                            "Number of retries on next server exceeded max "   maxRetrysNext
                              " retries, while making a call for: "   context.getServer(), e);
                }
                else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame   1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                            "Number of retries exceeded max "   maxRetrysSame
                              " retries, while making a call for: "   context.getServer(), e);
                }
            }
            if (listenerInvoker != null) {
                listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
            }
            return Observable.error(e);
        }
    });
}

这个方法里面的retryPolicy会捕获异常并判断是不是RetriableException,而RetriableException就是我们配置的4个错误码:404,408,502,500

我们再看下RetryableFeignLoadBalancer的execute方法:

代码语言:javascript复制
public RibbonResponse execute(final RibbonRequest request, IClientConfig configOverride)
    throws IOException {
  final Request.Options options;
  if (configOverride != null) {
    RibbonProperties ribbon = RibbonProperties.from(configOverride);
    options = new Request.Options(
        ribbon.connectTimeout(this.connectTimeout),
        ribbon.readTimeout(this.readTimeout));
  }
  else {
    options = new Request.Options(this.connectTimeout, this.readTimeout);
  }
  final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(this.getClientName(), this);
  RetryTemplate retryTemplate = new RetryTemplate();
  BackOffPolicy backOffPolicy = loadBalancedRetryFactory.createBackOffPolicy(this.getClientName());
  retryTemplate.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy);
  RetryListener[] retryListeners = this.loadBalancedRetryFactory.createRetryListeners(this.getClientName());
  if (retryListeners != null && retryListeners.length != 0) {
    retryTemplate.setListeners(retryListeners);
  }
  //retryPolicy是RibbonLoadBalancedRetryPolicy,里面的lbContext里面有我们的参数
  retryTemplate.setRetryPolicy(retryPolicy == null ? new NeverRetryPolicy()
      : new FeignRetryPolicy(request.toHttpRequest(), retryPolicy, this, this.getClientName()));
  //retryTemplate.execute方法里失败后会直接进行重试,用while循环进行重试
  return retryTemplate.execute(new RetryCallback<RibbonResponse, IOException>() {
    @Override
    public RibbonResponse doWithRetry(RetryContext retryContext) throws IOException {
      Request feignRequest = null;
      //on retries the policy will choose the server and set it in the context
      //extract the server and update the request being made
      if (retryContext instanceof LoadBalancedRetryContext) {
        ServiceInstance service = ((LoadBalancedRetryContext) retryContext).getServiceInstance();
        if (service != null) {
          feignRequest = ((RibbonRequest) request.replaceUri(reconstructURIWithServer(new Server(service.getHost(), service.getPort()), request.getUri()))).toRequest();
        }
      }
      if (feignRequest == null) {
        feignRequest = request.toRequest();
      }
      Response response = request.client().execute(feignRequest, options);
      /**
       * 判断错误码类型是不是我们配置的,是的话抛出RetryableStatusCodeException,
       * 这个是RetryableStatusCodeException的子类
       */
      if (retryPolicy != null && retryPolicy.retryableStatusCode(response.status())) {
        byte[] byteArray = response.body() == null ? new byte[]{} : StreamUtils.copyToByteArray(response.body().asInputStream());
        response.close();
        throw new RibbonResponseStatusCodeException(RetryableFeignLoadBalancer.this.clientName, response,
            byteArray, request.getUri());
      }
      return new RibbonResponse(request.getUri(), response);
    }
  }, new LoadBalancedRecoveryCallback<RibbonResponse, Response>() {
    @Override
    protected RibbonResponse createResponse(Response response, URI uri) {
      return new RibbonResponse(uri, response);
    }
  });
}

总结

ribbon作为openfeign的负载均衡器,是懒加载的,只有openfeign第一次获取服务列表的时候,才会初始化LoadBalancer,也才会初始化ping和缓存刷新任务。

ribbon重试对优雅发布会有作用,所以建议把MaxAutoRetries设置为0,即当前服务失败时直接请求下一个服务。

ribbon的源代码比较复杂,有一些RxJava的基础会容易一些。

ribbon的源代码非常烧脑,有问题欢迎大家交流。

0 人点赞