Spring Cloud Ribbon 全解 (5) - 基本组件实现源码(3)

2021-04-12 15:05:07 浏览数 (1)

本文基于SpringCloud-Dalston.SR5

我们继续分析如下组件:

  1. 所有Ribbon负载均衡器需要实现的接口IClient
  2. 服务实例列表维护机制实现的接口ServerList
  3. 负载均衡数据记录LoadBalancerStats
  4. 负责选取Server的接口ILoadBalancer
  5. 负载均衡选取规则实现的接口IRule
  6. 检查实例是否存活实现的接口IPing
  7. 服务实例列表更新机制实现的接口ServerListUpdater
  8. 服务实例列表过滤机制ServerListFilter

5. 负载均衡选取规则实现的接口IRule

我们这里只看默认的实现ZoneAvoidanceRule相关的

IRule离不开负载均衡数据,这个数据如之前所说,是ILoadBalancer的实现BaseLoadBalancer一部分。所以对于IRule的抽象类,需要设置ILoadBalancer来获取负载均衡统计数据:

AbstractLoadBalancerRule

代码语言:javascript复制
public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {

    private ILoadBalancer lb;

    @Override
    public void setLoadBalancer(ILoadBalancer lb){
        this.lb = lb;
    }

    @Override
    public ILoadBalancer getLoadBalancer(){
        return lb;
    }      
}

RoundRobinRule.java

是最常见最基本的负载均衡规则-轮询的实现:

代码语言:javascript复制
public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        log.warn("no load balancer");
        return null;
    }

    Server server = null;
    int count = 0;
    while (server == null && count   < 10) {
        List reachableServers = lb.getReachableServers();
        List allServers = lb.getAllServers();
        int upCount = reachableServers.size();
        int serverCount = allServers.size();

        //如果没有Server或者没有UP状态的Server
        if ((upCount == 0) || (serverCount == 0)) {
            log.warn("No up servers available from load balancer: "   lb);
            return null;
        }

        int nextServerIndex = incrementAndGetModulo(serverCount);
        server = allServers.get(nextServerIndex);

        if (server == null) {
            /* Transient. */
            Thread.yield();
            continue;
        }

        //检查当前轮询的Server是否是Alive并且准备好服务的状态
        if (server.isAlive() && (server.isReadyToServe())) {
            return (server);
        }

        // Next.
        server = null;
    }

    if (count >= 10) {
        log.warn("No available alive servers after 10 tries from load balancer: "
                  lb);
    }
    return server;
}

总结起来就是从所有Server中轮询出下一个可用的

ClientConfigEnabledRoundRobinRule.java

这个其实就是包含RoundRobinRule同时添加了一些配置方法的

ZoneAvoidanceRule.java

之前在ZoneAwareLoadBalancer的如何筛选的AvailableZone部分,已经介绍了这个Rule的核心逻辑,其实就是获取可用区域之后,在其中的Server轮询

6. 检查实例是否存活实现的接口IPing

NoOpPing.java

默认IPing的实现就是这个,固定返回true

代码语言:javascript复制
public class NoOpPing implements IPing {

    @Override
    public boolean isAlive(Server server) {
        return true;
    }

}

PingConstant.java

其实就是固定返回true或者false

代码语言:javascript复制
public class PingConstant implements IPing {
    boolean constant = true;

    public void setConstant(String constantStr) {
            constant = (constantStr != null) && (constantStr.toLowerCase().equals("true"));
    }

    public void setConstant(boolean constant) {
            this.constant = constant;
    }

    public boolean getConstant() {
            return constant;
    }

    public boolean isAlive(Server server) {
            return constant;
    }
}

PingUrl.java

即向Server的url发送一次get请求,若成功(http相应状态码为200),则返回true

代码语言:javascript复制
public boolean isAlive(Server server) {
    String urlStr = "";
    if (this.isSecure) {
        urlStr = "https://";
    } else {
        urlStr = "http://";
    }

    urlStr = urlStr   server.getId();
    urlStr = urlStr   this.getPingAppendString();
    boolean isAlive = false;
    HttpClient httpClient = new DefaultHttpClient();
    HttpUriRequest getRequest = new HttpGet(urlStr);
    String content = null;

    try {
        HttpResponse response = httpClient.execute(getRequest);
        content = EntityUtils.toString(response.getEntity());
        isAlive = response.getStatusLine().getStatusCode() == 200;
        if (this.getExpectedContent() != null) {
            LOGGER.debug("content:"   content);
            if (content == null) {
                isAlive = false;
            } else if (content.equals(this.getExpectedContent())) {
                isAlive = true;
            } else {
                isAlive = false;
            }
        }
    } catch (IOException var11) {
        var11.printStackTrace();
    } finally {
        getRequest.abort();
    }

    return isAlive;
}

AbstractLoadBalancerPing.java

这个抽象类的目的在于提供基于不同LoadBalancer的Ping实现

代码语言:javascript复制
public abstract class AbstractLoadBalancerPing implements IPing, IClientConfigAware{

    AbstractLoadBalancer lb;

    @Override
    public boolean isAlive(Server server) {
        return true;
    }

    public void setLoadBalancer(AbstractLoadBalancer lb){
        this.lb = lb;
    }

    public AbstractLoadBalancer getLoadBalancer(){
        return lb;
    }

}

DummyPing.java

基于LoadBalancer但是直接返回true

代码语言:javascript复制
public class DummyPing extends AbstractLoadBalancerPing {

    public DummyPing() {
    }

    public boolean isAlive(Server server) {
        return true;
    }

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
    }
}

NIWSDiscoveryPing.java

这个是针对Eureka作为发现服务的中间件环境下的ping,就是检查对应实例的InstantceStatus是否为UP

代码语言:javascript复制
public boolean isAlive(Server server) {
    boolean isAlive = true;
    if (server!=null && server instanceof DiscoveryEnabledServer){
        DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;                
        InstanceInfo instanceInfo = dServer.getInstanceInfo();
        if (instanceInfo!=null){                    
            InstanceStatus status = instanceInfo.getStatus();
            if (status!=null){
                isAlive = status.equals(InstanceStatus.UP);
            }
        }
    }
    return isAlive;
}

7. 服务实例列表更新机制实现的接口ServerListUpdater

PollingServerListUpdater.java

这个是ServerListUpdater的默认基本实现,如果你不配置,那么默认的ServerListUpdater就是这个,基本组成:

代码语言:javascript复制
//核心调度线程池LazyHolder._serverListRefreshExecutor(这个LazyHolder只是一个加了些配置包装和进程启动还有进程停止的)
private static ScheduledThreadPoolExecutor getRefreshExecutor() {
    return LazyHolder._serverListRefreshExecutor;
}
//Updater是否是激活的状态位
private final AtomicBoolean isActive = new AtomicBoolean(false);
//上次更新时间
private volatile long lastUpdated = System.currentTimeMillis();
//第一次更新延迟
private final long initialDelayMs;
//定时更新延迟
private final long refreshIntervalMs;
//定时调度的返回,为了能停止所以记录到这个变量
private volatile ScheduledFuture scheduledFuture;

其实他就是一个定时读取ServerList的刷新机制,我们来回忆下之前在Eureka章节中提到过的EurekaServer -> 服务消费者EurekaClient,SpringCloud环境下服务消费者调用一般用Ribbon做负载均衡,从Eureka所有服务所有实例缓存到Ribbon某个服务所有实例缓存,也是有定时任务,每隔Ribbon**服务实例列表刷新时间同步**,这个服务实例列表刷新时间ribbon.ServerListRefreshInterval就是配置这里的refreshIntervalMs

代码语言:javascript复制
@Override
public synchronized void start(final UpdateAction updateAction) {
    //保证只能启动一次
    if (isActive.compareAndSet(false, true)) {
        //定时更新任务定义,其实就是执行updateAction以及更新最后更新时间
        final Runnable wrapperRunnable = new Runnable() {
            @Override
            public void run() {
                if (!isActive.get()) {
                    if (scheduledFuture != null) {
                        scheduledFuture.cancel(true);
                    }
                    return;
                }
                try {
                    updateAction.doUpdate();
                    lastUpdated = System.currentTimeMillis();
                } catch (Exception e) {
                    logger.warn("Failed one update cycle", e);
                }
            }
        };

        //调度,记录
        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                wrapperRunnable,
                initialDelayMs,
                refreshIntervalMs,
                TimeUnit.MILLISECONDS
        );
    } else {
        logger.info("Already active, no-op");
    }
}

@Override
public synchronized void stop() {
    //stop就是停止调度
    if (isActive.compareAndSet(true, false)) {
        if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
        }
    } else {
        logger.info("Not active, no-op");
    }
}

@Override
public String getLastUpdate() {
    return new Date(lastUpdated).toString();
}

@Override
public long getDurationSinceLastUpdateMs() {
    return System.currentTimeMillis() - lastUpdated;
}

@Override
public int getNumberMissedCycles() {
    if (!isActive.get()) {
        return 0;
    }
    //通过时间差计算
    return (int) ((int) (System.currentTimeMillis() - lastUpdated) / refreshIntervalMs);
}

@Override
public int getCoreThreads() {
    if (isActive.get()) {
        if (getRefreshExecutor() != null) {
            return getRefreshExecutor().getCorePoolSize();
        }
    }
    return 0;
}

EurekaNotificationServerListUpdater.java

这个类是针对Eureka的Server列表更新,他并不是自己主动定时更新,而是在EurekaClient读取完服务列表之后,会trigger一个CacheRefreshedEvent:

EurekaClient更新服务列表的代码:

代码语言:javascript复制
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    //获取服务列表的代码略

    // Notify about cache refresh before updating the instance remote status
    onCacheRefreshed();

    // registry was fetched successfully, so return true
    return true;
}
protected void onCacheRefreshed() {
    fireEvent(new CacheRefreshedEvent());
}

EurekaNotificationServerListUpdater的ServerList更新机制就是在有这个CacheRefreshedEvent之后,触发从EurekaClient本地缓存中读取,而不是像PollingServerListUpdater定时主动获取。

代码语言:javascript复制
@Override
public synchronized void start(final UpdateAction updateAction) {
    if (isActive.compareAndSet(false, true)) {
        this.updateListener = new EurekaEventListener() {
            @Override
            public void onEvent(EurekaEvent event) {
                //监听CacheRefreshedEvent
                if (event instanceof CacheRefreshedEvent) {
                    //如果已经在更新,就不再触发了
                    if (!updateQueued.compareAndSet(false, true)) {  // if an update is already queued
                        logger.info("an update action is already queued, returning as no-op");
                        return;
                    }

                    try {
                        //更新,异步执行updateAction,执行完后更新lastUpdated
                        refreshExecutor.submit(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    updateAction.doUpdate();
                                    lastUpdated.set(System.currentTimeMillis());
                                } catch (Exception e) {
                                    logger.warn("Failed to update serverList", e);
                                } finally {
                                    updateQueued.set(false);
                                }
                            }
                        });  // fire and forget
                    } catch (Exception e) {
                        logger.warn("Error submitting update task to executor, skipping one round of updates", e);
                        updateQueued.set(false);  // if submit fails, need to reset updateQueued to false
                    }
                }
            }
        };
        if (eurekaClient == null) {
            eurekaClient = eurekaClientProvider.get();
        }
        if (eurekaClient != null) {
            eurekaClient.registerEventListener(updateListener);
        } else {
            logger.error("Failed to register an updateListener to eureka client, eureka client is null");
            throw new IllegalStateException("Failed to start the updater, unable to register the update listener due to eureka client being null.");
        }
    } else {
        logger.info("Update listener already registered, no-op");
    }
}

@Override
public synchronized void stop() {
    if (isActive.compareAndSet(true, false)) {
        if (eurekaClient != null) {
            eurekaClient.unregisterEventListener(updateListener);
        }
    } else {
        logger.info("Not currently active, no-op");
    }
}

@Override
public String getLastUpdate() {
    return new Date(lastUpdated.get()).toString();
}

@Override
public long getDurationSinceLastUpdateMs() {
    return System.currentTimeMillis() - lastUpdated.get();
}

@Override
public int getNumberMissedCycles() {
    return 0;
}

@Override
public int getCoreThreads() {
    if (isActive.get()) {
        if (refreshExecutor != null && refreshExecutor instanceof ThreadPoolExecutor) {
            return ((ThreadPoolExecutor) refreshExecutor).getCorePoolSize();
        }
    }
    return 0;
}

8.服务实例列表过滤机制ServerListFilter

AbstractServerListFilter.java

所有的Filter都需要LoadBalancerStats来过滤。这个LoadBalancerStats就是记录每次负载均衡的结果成功还是失败还有每个Server被调用情况的统计数据。

代码语言:javascript复制
public abstract class AbstractServerListFilter<T extends Server> implements ServerListFilter<T> {

    private volatile LoadBalancerStats stats;

    public void setLoadBalancerStats(LoadBalancerStats stats) {
        this.stats = stats;
    }

    public LoadBalancerStats getLoadBalancerStats() {
        return stats;
    }

}

ZoneAffinityServerListFilter.java

这个是默认的ServerListFilter,如果不进行配置,那么就是这个实现。

这个Filter根据Zone进行过滤,过滤掉不在同一个Zone的Server:

代码语言:javascript复制
@Override
public List getFilteredListOfServers(List servers) {
    if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
        //利用ZoneAffinityPredicate进行过滤
        List filteredServers = Lists.newArrayList(Iterables.filter(
                servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
        //检查是否应该过滤        
        if (shouldEnableZoneAffinity(filteredServers)) {
            return filteredServers;
        } else if (zoneAffinity) {
            overrideCounter.increment();
        }
    }
    return servers;
}

ZoneAffinityPredicate就是检查Zone是否与当前配置中的Zone一致:

代码语言:javascript复制
public class ZoneAffinityPredicate extends AbstractServerPredicate {

    private final String zone = ConfigurationManager.getDeploymentContext().getValue(ContextKey.zone);

    public ZoneAffinityPredicate() {        
    }

    @Override
    public boolean apply(PredicateKey input) {
        Server s = input.getServer();
        String az = s.getZone();
        if (az != null && zone != null && az.toLowerCase().equals(zone.toLowerCase())) {
            return true;
        } else {
            return false;
        }
    }
}

在什么情况下,不过滤呢?

代码语言:javascript复制
private boolean shouldEnableZoneAffinity(List filtered) {    
    //不启用Zone感知的情况下,就不过滤,默认就是不启用
    if (!zoneAffinity && !zoneExclusive) {
        return false;
    }
    if (zoneExclusive) {
        return true;
    }
    LoadBalancerStats stats = getLoadBalancerStats();
    if (stats == null) {
        return zoneAffinity;
    } else {
        logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
        ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
        double loadPerServer = snapshot.getLoadPerServer();
        int instanceCount = snapshot.getInstanceCount();            
        int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();

        //在被断路的Server占比超过一定百分比或者可用Server小于一定个数的时候,每个Server负载超过一定值的时候,就不过滤了
        if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get() 
                || loadPerServer >= activeReqeustsPerServerThreshold.get()
                || (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
            logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}", 
                    new Object[] {(double) circuitBreakerTrippedCount / instanceCount,  loadPerServer, instanceCount - circuitBreakerTrippedCount});
            return false;
        } else {
            return true;
        }

    }
}

ServerListSubsetFilter.java

在Server个数超过一定量(例如好几百个Server)的时候,我们也许并不想在这几百个Server负载均衡,我们每次随机取一个子集做负载均衡,这个Filter就是ServerListSubsetFilter,同时,一些不太健康的或者比较忙的Server也会被剔除。

主要有下面四个配置进行筛选:

代码语言:javascript复制
//取子集的大小
private DynamicIntProperty sizeProp =
        new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE   ".ServerListSubsetFilter.size", 20);
//至少要排除的Server所占百分比
private DynamicFloatProperty eliminationPercent =
        new DynamicFloatProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE   ".ServerListSubsetFilter.forceEliminatePercent", 0.1f);
//当在这个Server上负载均衡调用失败次数达到一定次数的时候就去掉这个Server
private DynamicIntProperty eliminationFailureCountThreshold =
        new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE   ".ServerListSubsetFilter.eliminationFailureThresold", 0);
//当在这个Server上面的连接数超过一定个数的时候就去掉这个Server
private DynamicIntProperty eliminationConnectionCountThreshold =
        new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE   ".ServerListSubsetFilter.eliminationConnectionThresold", 0);

0 人点赞