本文基于SpringCloud-Dalston.SR5
我们继续逐个分析
- 所有Ribbon负载均衡器需要实现的接口IClient
- 服务实例列表维护机制实现的接口ServerList
- 负载均衡数据记录LoadBalancerStats
- 负责选取Server的接口ILoadBalancer
- 负载均衡选取规则实现的接口IRule
- 检查实例是否存活实现的接口IPing
- 服务实例列表更新机制实现的接口ServerListUpdater
- 服务实例列表过滤机制ServerListFilter
4. 负责选取Server的接口ILoadBalancer
ILoadBalancer负责存储并更新服务实例列表,并调用IRule(即根据配置的负载均衡规则)来返回Server以供于服务调用
这里,我们只看默认的ZoneAwareLoadBalancer相关的
AbstractLoadBalancer.java
代码语言:javascript复制public abstract class AbstractLoadBalancer implements ILoadBalancer {
public enum ServerGroup{
ALL,
STATUS_UP,
STATUS_NOT_UP
}
public Server chooseServer() {
return chooseServer(null);
}
public abstract List getServerList(ServerGroup serverGroup);
public abstract LoadBalancerStats getLoadBalancerStats();
}
AbstractLoadBalancer
在原有ILoadBalancer
接口基础上,增加了按照分组获取Server的方法,有ALL,STATUS_UP,STATUS_NOT_UP三种组别。同时还增加了LoadBalancerStats
,记录每次请求的Server
的负载均衡统计数据ServerStat
,以及其他一些的实时记录信息。之后在介绍具体的LoadBalancer实现的时候,会用到
BaseLoadBalancer.java
BaseLoadBalancer
是负载均衡的基本实现,包含如下元素:
1)两个列表:所有Server列表,还有所有Up Server的列表:
代码语言:javascript复制protected volatile List allServerList = Collections.synchronizedList(new ArrayList());
protected volatile List upServerList = Collections.synchronizedList(new ArrayList());
//更新两个列表的锁
protected ReadWriteLock allServerLock = new ReentrantReadWriteLock();
protected ReadWriteLock upServerLock = new ReentrantReadWriteLock();
2)定时PING任务相关的元素,为了定时检查Server是否UP的任务:
代码语言:javascript复制private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
protected IPing ping = null;
protected Timer lbTimer = null;
protected int pingIntervalSeconds = 10;
protected int maxTotalPingTimeSeconds = 5;
protected Comparator serverComparator = new ServerComparator();
protected AtomicBoolean pingInProgress = new AtomicBoolean(false);
一般的在构造一个BaseLoadBalancer
时候,会调用setupPingTask
构造一个定时ping的任务:
void setupPingTask() {
//判断是否需要ping
if (canSkipPing()) {
return;
}
//关闭之前已经开启的定时ping的任务
if (lbTimer != null) {
lbTimer.cancel();
}
//设置PingTask,默认是每10秒一次
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" name,
true);
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
//这个可能有些多余,因为定时任务也是立即执行,就会ping
forceQuickPing();
}
//如果ping为null,或者为DummyPing,就不用定时Ping了
private boolean canSkipPing() {
if (ping == null
|| ping.getClass().getName().equals(DummyPing.class.getName())) {
// default ping, no need to set up timer
return true;
} else {
return false;
}
}
Pinger
包含了如何去Ping所有的Server的逻辑:
class Pinger {
//ping每个server的方式,默认就是普通遍历
private final IPingStrategy pingerStrategy;
public Pinger(IPingStrategy pingerStrategy) {
this.pingerStrategy = pingerStrategy;
}
public void runPinger() throws Exception {
//如果设置失败,则证明,当前Ping的任务正在执行中(执行时间大于定时任务的周期)
if (!pingInProgress.compareAndSet(false, true)) {
return; // Ping in progress - nothing to do
}
// we are "in" - we get to Ping
Server[] allServers = null;
boolean[] results = null;
Lock allLock = null;
Lock upLock = null;
try {
//读取所有Server的列表,需要获取读锁
allLock = allServerLock.readLock();
allLock.lock();
//先读取之后再遍历,减少锁时间
allServers = allServerList.toArray(new Server[allServerList.size()]);
allLock.unlock();
//ping每个Server
int numCandidates = allServers.length;
results = pingerStrategy.pingServers(ping, allServers);
final List newUpList = new ArrayList();
final List changedServers = new ArrayList();
//更新所有Server的状态
for (int i = 0; i < numCandidates; i ) {
boolean isAlive = results[i];
Server svr = allServers[i];
boolean oldIsAlive = svr.isAlive();
svr.setAlive(isAlive);
if (oldIsAlive != isAlive) {
changedServers.add(svr);
logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",
name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
}
if (isAlive) {
newUpList.add(svr);
}
}
//更新Up Server列表需要获取写锁
upLock = upServerLock.writeLock();
upLock.lock();
upServerList = newUpList;
upLock.unlock();
//通知监听Server状态变化的Listener
notifyServerStatusChangeListener(changedServers);
} finally {
//任务结束,需要设置状态位
pingInProgress.set(false);
}
}
}
IPingStrategy目前只有一种默认实现,就是SerialPingStrategy,依次串行遍历Ping每个Server:
代码语言:javascript复制private static class SerialPingStrategy implements IPingStrategy {
@Override
public boolean[] pingServers(IPing ping, Server[] servers) {
int numCandidates = servers.length;
boolean[] results = new boolean[numCandidates];
logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);
for (int i = 0; i < numCandidates; i ) {
results[i] = false; /* Default answer is DEAD. */
try {
//因为在SpringCloud的环境下,默认ping是基于访问本地内存的Eureka缓存的列表,所有串行挨个ping也不会有太大的性能影响
if (ping != null) {
results[i] = ping.isAlive(servers[i]);
}
} catch (Exception e) {
logger.error("Exception while pinging Server: '{}'", servers[i], e);
}
}
return results;
}
}
3)记录每次负载均衡统计数据的LoadBalancerStats 这个暂且不表,在后面具体介绍负载均衡选取规则的时候,会用到这里面的统计数据
代码语言:javascript复制protected LoadBalancerStats lbStats;
4)用于预热连接的逻辑类PrimeConnections
代码语言:javascript复制private PrimeConnections primeConnections;
private volatile boolean enablePrimingConnections = false;
但是这个配置默认是不开启的。
5)监听Server列表变化和Server状态变化的Listener
代码语言:javascript复制private List changeListeners = new CopyOnWriteArrayList();
private List serverStatusListeners = new CopyOnWriteArrayList();
选择Server的方法非常简单,就是调用IRule来选取:
代码语言:javascript复制public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
DynamicServerListLoadBalancer.java
DynamicServerListLoadBalancer在BaseLoadBalancer的基础上,增加了之前提到的:服务实例列表维护机制实现的接口ServerList、服务实例列表更新机制实现的接口ServerListUpdater和服务实例列表过滤机制ServerListFilter。利用这些元素实现服务实例列表的更新
ZoneAwareLoadBalancer.java
ZoneAwareLoadBalancer则是进一步增加了对于Zone的感知。利用一个ConcurrentHashMap来维护不同Zone下的负载均衡数据,并且不同的Zone可以设置不同的Rule:
代码语言:javascript复制 private ConcurrentHashMap<String, BaseLoadBalancer> balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
我们可以通过setRule方法给不同的Zone设置不同的IRule
代码语言:javascript复制public void setRule(IRule rule) {
super.setRule(rule);
if (balancers != null) {
for (String zone: balancers.keySet()) {
balancers.get(zone).setRule(cloneRule(rule));
}
}
}
查看负载均衡方法:
代码语言:javascript复制public Server chooseServer(Object key) {
//如果未启用(默认是启用的,可以通过ZoneAwareNIWSDiscoveryLoadBalancer.enabled修改),或者可用区域不大于1,则调用BaseLoadBalancer的choose方法选取
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
//获取当前负载均衡数据的快照
Map zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
//获取最大负载阈值配置
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." this.getName() ".triggeringLoadPerServerThreshold", 0.2d);
}
//获取熔断实例比例配置
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." this.getName() ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
//获取当前有效的可用区域(利用最大负载阈值配置和熔断实例比例配置)
Set availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
//随机选取zone
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
//在选取的zone下选取server
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
如何筛选的AvailableZone:
代码语言:javascript复制public static Set<String> getAvailableZones(
Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
double triggeringBlackoutPercentage) {
if (snapshot.isEmpty()) {
return null;
}
Set<String> availableZones = new HashSet<String>(snapshot.keySet());
if (availableZones.size() == 1) {
return availableZones;
}
Set<String> worstZones = new HashSet<String>();
double maxLoadPerServer = 0;
boolean limitedZoneAvailability = false;
for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
String zone = zoneEntry.getKey();
ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
int instanceCount = zoneSnapshot.getInstanceCount();
//可用区域数量为0,这个可用区需要被移除
if (instanceCount == 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
double loadPerServer = zoneSnapshot.getLoadPerServer();
//熔断的实例个数/实例数量大于triggeringBlackoutPercentage,这个可用区需要被移除
if (((double) zoneSnapshot.getCircuitTrippedCount())
/ instanceCount >= triggeringBlackoutPercentage
|| loadPerServer < 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
//寻找平均负载最高的可用区,将它添加到worstZones集合
if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
// they are the same considering double calculation
// round error
worstZones.add(zone);
} else if (loadPerServer > maxLoadPerServer) {
maxLoadPerServer = loadPerServer;
worstZones.clear();
worstZones.add(zone);
}
}
}
}
//如果最大负载小于负载阈值,并且没有被移除过可用区,就直接返回当前结果
if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
// zone override is not needed here
return availableZones;
}
String zoneToAvoid = randomChooseZone(snapshot, worstZones);
if (zoneToAvoid != null) {
availableZones.remove(zoneToAvoid);
}
return availableZones;
}