本文基于SpringCloud-Dalston.SR5
我们继续分析如下组件:
- 所有Ribbon负载均衡器需要实现的接口IClient
- 服务实例列表维护机制实现的接口ServerList
- 负载均衡数据记录LoadBalancerStats
- 负责选取Server的接口ILoadBalancer
- 负载均衡选取规则实现的接口IRule
- 检查实例是否存活实现的接口IPing
- 服务实例列表更新机制实现的接口ServerListUpdater
- 服务实例列表过滤机制ServerListFilter
5. 负载均衡选取规则实现的接口IRule
我们这里只看默认的实现ZoneAvoidanceRule相关的
IRule离不开负载均衡数据,这个数据如之前所说,是ILoadBalancer的实现BaseLoadBalancer一部分。所以对于IRule的抽象类,需要设置ILoadBalancer来获取负载均衡统计数据:
AbstractLoadBalancerRule
代码语言:javascript复制public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {
private ILoadBalancer lb;
@Override
public void setLoadBalancer(ILoadBalancer lb){
this.lb = lb;
}
@Override
public ILoadBalancer getLoadBalancer(){
return lb;
}
}
RoundRobinRule.java
是最常见最基本的负载均衡规则-轮询的实现:
代码语言:javascript复制public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
while (server == null && count < 10) {
List reachableServers = lb.getReachableServers();
List allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
//如果没有Server或者没有UP状态的Server
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " lb);
return null;
}
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
//检查当前轮询的Server是否是Alive并且准备好服务的状态
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
lb);
}
return server;
}
总结起来就是从所有Server中轮询出下一个可用的
ClientConfigEnabledRoundRobinRule.java
这个其实就是包含RoundRobinRule同时添加了一些配置方法的
ZoneAvoidanceRule.java
之前在ZoneAwareLoadBalancer的如何筛选的AvailableZone部分,已经介绍了这个Rule的核心逻辑,其实就是获取可用区域之后,在其中的Server轮询
6. 检查实例是否存活实现的接口IPing
NoOpPing.java
默认IPing的实现就是这个,固定返回true
代码语言:javascript复制public class NoOpPing implements IPing {
@Override
public boolean isAlive(Server server) {
return true;
}
}
PingConstant.java
其实就是固定返回true或者false
代码语言:javascript复制public class PingConstant implements IPing {
boolean constant = true;
public void setConstant(String constantStr) {
constant = (constantStr != null) && (constantStr.toLowerCase().equals("true"));
}
public void setConstant(boolean constant) {
this.constant = constant;
}
public boolean getConstant() {
return constant;
}
public boolean isAlive(Server server) {
return constant;
}
}
PingUrl.java
即向Server的url发送一次get请求,若成功(http相应状态码为200),则返回true
代码语言:javascript复制public boolean isAlive(Server server) {
String urlStr = "";
if (this.isSecure) {
urlStr = "https://";
} else {
urlStr = "http://";
}
urlStr = urlStr server.getId();
urlStr = urlStr this.getPingAppendString();
boolean isAlive = false;
HttpClient httpClient = new DefaultHttpClient();
HttpUriRequest getRequest = new HttpGet(urlStr);
String content = null;
try {
HttpResponse response = httpClient.execute(getRequest);
content = EntityUtils.toString(response.getEntity());
isAlive = response.getStatusLine().getStatusCode() == 200;
if (this.getExpectedContent() != null) {
LOGGER.debug("content:" content);
if (content == null) {
isAlive = false;
} else if (content.equals(this.getExpectedContent())) {
isAlive = true;
} else {
isAlive = false;
}
}
} catch (IOException var11) {
var11.printStackTrace();
} finally {
getRequest.abort();
}
return isAlive;
}
AbstractLoadBalancerPing.java
这个抽象类的目的在于提供基于不同LoadBalancer
的Ping实现
public abstract class AbstractLoadBalancerPing implements IPing, IClientConfigAware{
AbstractLoadBalancer lb;
@Override
public boolean isAlive(Server server) {
return true;
}
public void setLoadBalancer(AbstractLoadBalancer lb){
this.lb = lb;
}
public AbstractLoadBalancer getLoadBalancer(){
return lb;
}
}
DummyPing.java
基于LoadBalancer但是直接返回true
代码语言:javascript复制public class DummyPing extends AbstractLoadBalancerPing {
public DummyPing() {
}
public boolean isAlive(Server server) {
return true;
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
NIWSDiscoveryPing.java
这个是针对Eureka作为发现服务的中间件环境下的ping,就是检查对应实例的InstantceStatus是否为UP
代码语言:javascript复制public boolean isAlive(Server server) {
boolean isAlive = true;
if (server!=null && server instanceof DiscoveryEnabledServer){
DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;
InstanceInfo instanceInfo = dServer.getInstanceInfo();
if (instanceInfo!=null){
InstanceStatus status = instanceInfo.getStatus();
if (status!=null){
isAlive = status.equals(InstanceStatus.UP);
}
}
}
return isAlive;
}
7. 服务实例列表更新机制实现的接口ServerListUpdater
PollingServerListUpdater.java
这个是ServerListUpdater的默认基本实现,如果你不配置,那么默认的ServerListUpdater就是这个,基本组成:
代码语言:javascript复制//核心调度线程池LazyHolder._serverListRefreshExecutor(这个LazyHolder只是一个加了些配置包装和进程启动还有进程停止的)
private static ScheduledThreadPoolExecutor getRefreshExecutor() {
return LazyHolder._serverListRefreshExecutor;
}
//Updater是否是激活的状态位
private final AtomicBoolean isActive = new AtomicBoolean(false);
//上次更新时间
private volatile long lastUpdated = System.currentTimeMillis();
//第一次更新延迟
private final long initialDelayMs;
//定时更新延迟
private final long refreshIntervalMs;
//定时调度的返回,为了能停止所以记录到这个变量
private volatile ScheduledFuture scheduledFuture;
其实他就是一个定时读取ServerList的刷新机制,我们来回忆下之前在Eureka章节中提到过的EurekaServer -> 服务消费者EurekaClient,SpringCloud环境下服务消费者调用一般用Ribbon做负载均衡,从Eureka所有服务所有实例缓存到Ribbon某个服务所有实例缓存,也是有定时任务,每隔Ribbon**服务实例列表刷新时间同步**,这个服务实例列表刷新时间ribbon.ServerListRefreshInterval
就是配置这里的refreshIntervalMs
@Override
public synchronized void start(final UpdateAction updateAction) {
//保证只能启动一次
if (isActive.compareAndSet(false, true)) {
//定时更新任务定义,其实就是执行updateAction以及更新最后更新时间
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
//调度,记录
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
@Override
public synchronized void stop() {
//stop就是停止调度
if (isActive.compareAndSet(true, false)) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
} else {
logger.info("Not active, no-op");
}
}
@Override
public String getLastUpdate() {
return new Date(lastUpdated).toString();
}
@Override
public long getDurationSinceLastUpdateMs() {
return System.currentTimeMillis() - lastUpdated;
}
@Override
public int getNumberMissedCycles() {
if (!isActive.get()) {
return 0;
}
//通过时间差计算
return (int) ((int) (System.currentTimeMillis() - lastUpdated) / refreshIntervalMs);
}
@Override
public int getCoreThreads() {
if (isActive.get()) {
if (getRefreshExecutor() != null) {
return getRefreshExecutor().getCorePoolSize();
}
}
return 0;
}
EurekaNotificationServerListUpdater.java
这个类是针对Eureka的Server列表更新,他并不是自己主动定时更新,而是在EurekaClient读取完服务列表之后,会trigger一个CacheRefreshedEvent:
EurekaClient更新服务列表的代码:
代码语言:javascript复制private boolean fetchRegistry(boolean forceFullRegistryFetch) {
//获取服务列表的代码略
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// registry was fetched successfully, so return true
return true;
}
protected void onCacheRefreshed() {
fireEvent(new CacheRefreshedEvent());
}
EurekaNotificationServerListUpdater的ServerList更新机制就是在有这个CacheRefreshedEvent之后,触发从EurekaClient本地缓存中读取,而不是像PollingServerListUpdater定时主动获取。
代码语言:javascript复制@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
this.updateListener = new EurekaEventListener() {
@Override
public void onEvent(EurekaEvent event) {
//监听CacheRefreshedEvent
if (event instanceof CacheRefreshedEvent) {
//如果已经在更新,就不再触发了
if (!updateQueued.compareAndSet(false, true)) { // if an update is already queued
logger.info("an update action is already queued, returning as no-op");
return;
}
try {
//更新,异步执行updateAction,执行完后更新lastUpdated
refreshExecutor.submit(new Runnable() {
@Override
public void run() {
try {
updateAction.doUpdate();
lastUpdated.set(System.currentTimeMillis());
} catch (Exception e) {
logger.warn("Failed to update serverList", e);
} finally {
updateQueued.set(false);
}
}
}); // fire and forget
} catch (Exception e) {
logger.warn("Error submitting update task to executor, skipping one round of updates", e);
updateQueued.set(false); // if submit fails, need to reset updateQueued to false
}
}
}
};
if (eurekaClient == null) {
eurekaClient = eurekaClientProvider.get();
}
if (eurekaClient != null) {
eurekaClient.registerEventListener(updateListener);
} else {
logger.error("Failed to register an updateListener to eureka client, eureka client is null");
throw new IllegalStateException("Failed to start the updater, unable to register the update listener due to eureka client being null.");
}
} else {
logger.info("Update listener already registered, no-op");
}
}
@Override
public synchronized void stop() {
if (isActive.compareAndSet(true, false)) {
if (eurekaClient != null) {
eurekaClient.unregisterEventListener(updateListener);
}
} else {
logger.info("Not currently active, no-op");
}
}
@Override
public String getLastUpdate() {
return new Date(lastUpdated.get()).toString();
}
@Override
public long getDurationSinceLastUpdateMs() {
return System.currentTimeMillis() - lastUpdated.get();
}
@Override
public int getNumberMissedCycles() {
return 0;
}
@Override
public int getCoreThreads() {
if (isActive.get()) {
if (refreshExecutor != null && refreshExecutor instanceof ThreadPoolExecutor) {
return ((ThreadPoolExecutor) refreshExecutor).getCorePoolSize();
}
}
return 0;
}
8.服务实例列表过滤机制ServerListFilter
AbstractServerListFilter.java
所有的Filter都需要LoadBalancerStats来过滤。这个LoadBalancerStats就是记录每次负载均衡的结果成功还是失败还有每个Server被调用情况的统计数据。
代码语言:javascript复制public abstract class AbstractServerListFilter<T extends Server> implements ServerListFilter<T> {
private volatile LoadBalancerStats stats;
public void setLoadBalancerStats(LoadBalancerStats stats) {
this.stats = stats;
}
public LoadBalancerStats getLoadBalancerStats() {
return stats;
}
}
ZoneAffinityServerListFilter.java
这个是默认的ServerListFilter,如果不进行配置,那么就是这个实现。
这个Filter根据Zone进行过滤,过滤掉不在同一个Zone的Server:
代码语言:javascript复制@Override
public List getFilteredListOfServers(List servers) {
if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
//利用ZoneAffinityPredicate进行过滤
List filteredServers = Lists.newArrayList(Iterables.filter(
servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
//检查是否应该过滤
if (shouldEnableZoneAffinity(filteredServers)) {
return filteredServers;
} else if (zoneAffinity) {
overrideCounter.increment();
}
}
return servers;
}
ZoneAffinityPredicate就是检查Zone是否与当前配置中的Zone一致:
代码语言:javascript复制public class ZoneAffinityPredicate extends AbstractServerPredicate {
private final String zone = ConfigurationManager.getDeploymentContext().getValue(ContextKey.zone);
public ZoneAffinityPredicate() {
}
@Override
public boolean apply(PredicateKey input) {
Server s = input.getServer();
String az = s.getZone();
if (az != null && zone != null && az.toLowerCase().equals(zone.toLowerCase())) {
return true;
} else {
return false;
}
}
}
在什么情况下,不过滤呢?
代码语言:javascript复制private boolean shouldEnableZoneAffinity(List filtered) {
//不启用Zone感知的情况下,就不过滤,默认就是不启用
if (!zoneAffinity && !zoneExclusive) {
return false;
}
if (zoneExclusive) {
return true;
}
LoadBalancerStats stats = getLoadBalancerStats();
if (stats == null) {
return zoneAffinity;
} else {
logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
double loadPerServer = snapshot.getLoadPerServer();
int instanceCount = snapshot.getInstanceCount();
int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
//在被断路的Server占比超过一定百分比或者可用Server小于一定个数的时候,每个Server负载超过一定值的时候,就不过滤了
if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get()
|| loadPerServer >= activeReqeustsPerServerThreshold.get()
|| (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}",
new Object[] {(double) circuitBreakerTrippedCount / instanceCount, loadPerServer, instanceCount - circuitBreakerTrippedCount});
return false;
} else {
return true;
}
}
}
ServerListSubsetFilter.java
在Server个数超过一定量(例如好几百个Server)的时候,我们也许并不想在这几百个Server负载均衡,我们每次随机取一个子集做负载均衡,这个Filter就是ServerListSubsetFilter,同时,一些不太健康的或者比较忙的Server也会被剔除。
主要有下面四个配置进行筛选:
代码语言:javascript复制//取子集的大小
private DynamicIntProperty sizeProp =
new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE ".ServerListSubsetFilter.size", 20);
//至少要排除的Server所占百分比
private DynamicFloatProperty eliminationPercent =
new DynamicFloatProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE ".ServerListSubsetFilter.forceEliminatePercent", 0.1f);
//当在这个Server上负载均衡调用失败次数达到一定次数的时候就去掉这个Server
private DynamicIntProperty eliminationFailureCountThreshold =
new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE ".ServerListSubsetFilter.eliminationFailureThresold", 0);
//当在这个Server上面的连接数超过一定个数的时候就去掉这个Server
private DynamicIntProperty eliminationConnectionCountThreshold =
new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE ".ServerListSubsetFilter.eliminationConnectionThresold", 0);