Spring Cloud Ribbon 全解 (4) - 基本组件实现源码(2)

2021-04-12 15:04:56 浏览数 (1)

本文基于SpringCloud-Dalston.SR5

我们继续逐个分析

  1. 所有Ribbon负载均衡器需要实现的接口IClient
  2. 服务实例列表维护机制实现的接口ServerList
  3. 负载均衡数据记录LoadBalancerStats
  4. 负责选取Server的接口ILoadBalancer
  5. 负载均衡选取规则实现的接口IRule
  6. 检查实例是否存活实现的接口IPing
  7. 服务实例列表更新机制实现的接口ServerListUpdater
  8. 服务实例列表过滤机制ServerListFilter

4. 负责选取Server的接口ILoadBalancer

ILoadBalancer负责存储并更新服务实例列表,并调用IRule(即根据配置的负载均衡规则)来返回Server以供于服务调用

这里,我们只看默认的ZoneAwareLoadBalancer相关的

AbstractLoadBalancer.java

代码语言:javascript复制
public abstract class AbstractLoadBalancer implements ILoadBalancer {

    public enum ServerGroup{
        ALL,
        STATUS_UP,
        STATUS_NOT_UP        
    }

    public Server chooseServer() {
        return chooseServer(null);
    }

    public abstract List getServerList(ServerGroup serverGroup);

    public abstract LoadBalancerStats getLoadBalancerStats();    
}

AbstractLoadBalancer在原有ILoadBalancer接口基础上,增加了按照分组获取Server的方法,有ALL,STATUS_UP,STATUS_NOT_UP三种组别。同时还增加了LoadBalancerStats,记录每次请求的Server的负载均衡统计数据ServerStat,以及其他一些的实时记录信息。之后在介绍具体的LoadBalancer实现的时候,会用到

BaseLoadBalancer.java

BaseLoadBalancer是负载均衡的基本实现,包含如下元素:

1)两个列表:所有Server列表,还有所有Up Server的列表:

代码语言:javascript复制
protected volatile List allServerList = Collections.synchronizedList(new ArrayList());
protected volatile List upServerList = Collections.synchronizedList(new ArrayList());

//更新两个列表的锁
protected ReadWriteLock allServerLock = new ReentrantReadWriteLock();
protected ReadWriteLock upServerLock = new ReentrantReadWriteLock();

2)定时PING任务相关的元素,为了定时检查Server是否UP的任务:

代码语言:javascript复制
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
protected IPing ping = null;

protected Timer lbTimer = null;
protected int pingIntervalSeconds = 10;
protected int maxTotalPingTimeSeconds = 5;
protected Comparator serverComparator = new ServerComparator();
protected AtomicBoolean pingInProgress = new AtomicBoolean(false);

一般的在构造一个BaseLoadBalancer时候,会调用setupPingTask构造一个定时ping的任务:

代码语言:javascript复制
void setupPingTask() {
    //判断是否需要ping
    if (canSkipPing()) {
        return;
    }

    //关闭之前已经开启的定时ping的任务
    if (lbTimer != null) {
        lbTimer.cancel();
    }

    //设置PingTask,默认是每10秒一次
    lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-"   name,
            true);
    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);

    //这个可能有些多余,因为定时任务也是立即执行,就会ping
    forceQuickPing();
}

//如果ping为null,或者为DummyPing,就不用定时Ping了
private boolean canSkipPing() {
    if (ping == null
            || ping.getClass().getName().equals(DummyPing.class.getName())) {
        // default ping, no need to set up timer
        return true;
    } else {
        return false;
    }
}

Pinger包含了如何去Ping所有的Server的逻辑:

代码语言:javascript复制
class Pinger {

    //ping每个server的方式,默认就是普通遍历
    private final IPingStrategy pingerStrategy;

    public Pinger(IPingStrategy pingerStrategy) {
        this.pingerStrategy = pingerStrategy;
    }

    public void runPinger() throws Exception {
        //如果设置失败,则证明,当前Ping的任务正在执行中(执行时间大于定时任务的周期)
        if (!pingInProgress.compareAndSet(false, true)) { 
            return; // Ping in progress - nothing to do
        }

        // we are "in" - we get to Ping

        Server[] allServers = null;
        boolean[] results = null;

        Lock allLock = null;
        Lock upLock = null;

        try {
            //读取所有Server的列表,需要获取读锁
            allLock = allServerLock.readLock();
            allLock.lock();
            //先读取之后再遍历,减少锁时间
            allServers = allServerList.toArray(new Server[allServerList.size()]);
            allLock.unlock();

            //ping每个Server
            int numCandidates = allServers.length;
            results = pingerStrategy.pingServers(ping, allServers);

            final List newUpList = new ArrayList();
            final List changedServers = new ArrayList();

            //更新所有Server的状态
            for (int i = 0; i < numCandidates; i  ) {
                boolean isAlive = results[i];
                Server svr = allServers[i];
                boolean oldIsAlive = svr.isAlive();

                svr.setAlive(isAlive);

                if (oldIsAlive != isAlive) {
                    changedServers.add(svr);
                    logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                        name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
                }

                if (isAlive) {
                    newUpList.add(svr);
                }
            }

            //更新Up Server列表需要获取写锁
            upLock = upServerLock.writeLock();
            upLock.lock();
            upServerList = newUpList;
            upLock.unlock();

            //通知监听Server状态变化的Listener
            notifyServerStatusChangeListener(changedServers);
        } finally {
            //任务结束,需要设置状态位
            pingInProgress.set(false);
        }
    }
}

IPingStrategy目前只有一种默认实现,就是SerialPingStrategy,依次串行遍历Ping每个Server:

代码语言:javascript复制
private static class SerialPingStrategy implements IPingStrategy {
    @Override
    public boolean[] pingServers(IPing ping, Server[] servers) {
        int numCandidates = servers.length;
        boolean[] results = new boolean[numCandidates];

        logger.debug("LoadBalancer:  PingTask executing [{}] servers configured", numCandidates);

        for (int i = 0; i < numCandidates; i  ) {
            results[i] = false; /* Default answer is DEAD. */
            try {
                //因为在SpringCloud的环境下,默认ping是基于访问本地内存的Eureka缓存的列表,所有串行挨个ping也不会有太大的性能影响
                if (ping != null) {
                    results[i] = ping.isAlive(servers[i]);
                }
            } catch (Exception e) {
                logger.error("Exception while pinging Server: '{}'", servers[i], e);
            }
        }
        return results;
    }
}

3)记录每次负载均衡统计数据的LoadBalancerStats 这个暂且不表,在后面具体介绍负载均衡选取规则的时候,会用到这里面的统计数据

代码语言:javascript复制
protected LoadBalancerStats lbStats;

4)用于预热连接的逻辑类PrimeConnections

代码语言:javascript复制
private PrimeConnections primeConnections;
private volatile boolean enablePrimingConnections = false;

但是这个配置默认是不开启的。

5)监听Server列表变化和Server状态变化的Listener

代码语言:javascript复制
private List changeListeners = new CopyOnWriteArrayList();

private List serverStatusListeners = new CopyOnWriteArrayList();

选择Server的方法非常简单,就是调用IRule来选取:

代码语言:javascript复制
public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);

            return null;
        }
    }
}

DynamicServerListLoadBalancer.java

DynamicServerListLoadBalancer在BaseLoadBalancer的基础上,增加了之前提到的:服务实例列表维护机制实现的接口ServerList、服务实例列表更新机制实现的接口ServerListUpdater和服务实例列表过滤机制ServerListFilter。利用这些元素实现服务实例列表的更新

ZoneAwareLoadBalancer.java

ZoneAwareLoadBalancer则是进一步增加了对于Zone的感知。利用一个ConcurrentHashMap来维护不同Zone下的负载均衡数据,并且不同的Zone可以设置不同的Rule:

代码语言:javascript复制
 private ConcurrentHashMap<String, BaseLoadBalancer> balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();

我们可以通过setRule方法给不同的Zone设置不同的IRule

代码语言:javascript复制
public void setRule(IRule rule) {
    super.setRule(rule);
    if (balancers != null) {
        for (String zone: balancers.keySet()) {
            balancers.get(zone).setRule(cloneRule(rule));
        }
    }
}

查看负载均衡方法:

代码语言:javascript复制
public Server chooseServer(Object key) {
    //如果未启用(默认是启用的,可以通过ZoneAwareNIWSDiscoveryLoadBalancer.enabled修改),或者可用区域不大于1,则调用BaseLoadBalancer的choose方法选取
    if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
        logger.debug("Zone aware logic disabled or there is only one zone");
        return super.chooseServer(key);
    }
    Server server = null;
    try {
        LoadBalancerStats lbStats = getLoadBalancerStats();
        //获取当前负载均衡数据的快照
        Map zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
        logger.debug("Zone snapshots: {}", zoneSnapshot);
        //获取最大负载阈值配置
        if (triggeringLoad == null) {
            triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                    "ZoneAwareNIWSDiscoveryLoadBalancer."   this.getName()   ".triggeringLoadPerServerThreshold", 0.2d);
        }
        //获取熔断实例比例配置
        if (triggeringBlackoutPercentage == null) {
            triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                    "ZoneAwareNIWSDiscoveryLoadBalancer."   this.getName()   ".avoidZoneWithBlackoutPercetage", 0.99999d);
        }
        //获取当前有效的可用区域(利用最大负载阈值配置和熔断实例比例配置)
        Set availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
        logger.debug("Available zones: {}", availableZones);
        if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
            //随机选取zone
            String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
            logger.debug("Zone chosen: {}", zone);
            if (zone != null) {
                BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                //在选取的zone下选取server
                server = zoneLoadBalancer.chooseServer(key);
            }
        }
    } catch (Exception e) {
        logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
    }
    if (server != null) {
        return server;
    } else {
        logger.debug("Zone avoidance logic is not invoked.");
        return super.chooseServer(key);
    }
}

如何筛选的AvailableZone:

代码语言:javascript复制
public static Set<String> getAvailableZones(
        Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
        double triggeringBlackoutPercentage) {
    if (snapshot.isEmpty()) {
        return null;
    }
    Set<String> availableZones = new HashSet<String>(snapshot.keySet());
    if (availableZones.size() == 1) {
        return availableZones;
    }
    Set<String> worstZones = new HashSet<String>();
    double maxLoadPerServer = 0;
    boolean limitedZoneAvailability = false;

    for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
        String zone = zoneEntry.getKey();
        ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
        int instanceCount = zoneSnapshot.getInstanceCount();
        //可用区域数量为0,这个可用区需要被移除
        if (instanceCount == 0) {
            availableZones.remove(zone);
            limitedZoneAvailability = true;
        } else {
            double loadPerServer = zoneSnapshot.getLoadPerServer();
            //熔断的实例个数/实例数量大于triggeringBlackoutPercentage,这个可用区需要被移除
            if (((double) zoneSnapshot.getCircuitTrippedCount())
                    / instanceCount >= triggeringBlackoutPercentage
                    || loadPerServer < 0) {
                availableZones.remove(zone);
                limitedZoneAvailability = true;
            } else {
                //寻找平均负载最高的可用区,将它添加到worstZones集合
                if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
                    // they are the same considering double calculation
                    // round error
                    worstZones.add(zone);
                } else if (loadPerServer > maxLoadPerServer) {
                    maxLoadPerServer = loadPerServer;
                    worstZones.clear();
                    worstZones.add(zone);
                }
            }
        }
    }

    //如果最大负载小于负载阈值,并且没有被移除过可用区,就直接返回当前结果
    if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
        // zone override is not needed here
        return availableZones;
    }
    String zoneToAvoid = randomChooseZone(snapshot, worstZones);
    if (zoneToAvoid != null) {
        availableZones.remove(zoneToAvoid);
    }
    return availableZones;

}

0 人点赞