本文基于SpringCloud-Dalston.SR5
上一篇我们了解到Ribbon主要由如下几个组件组成:
- 所有Ribbon负载均衡器需要实现的接口IClient
- 服务实例列表维护机制实现的接口ServerList
- 负载均衡数据记录LoadBalancerStats
- 负责选取Server的接口ILoadBalancer
- 负载均衡选取规则实现的接口IRule
- 检查实例是否存活实现的接口IPing
- 服务实例列表更新机制实现的接口ServerListUpdater
- 服务实例列表过滤机制ServerListFilter
我们会逐个分析
1. 所有Ribbon负载均衡器需要实现的接口IClient
对于这个IClient,之前我们说到执行器逻辑,例如重试还有异常处理,都在这里处理。我们看他的默认抽象类实现AbstractLoadBalancerAwareClient:
AbstractLoadBalancerAwareClient.java
代码语言:javascript复制public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
//获取重试处理器,这个由其他实现类动态实现
RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);
//构造LoadBalancerCommand,RxJava风格
LoadBalancerCommand command = LoadBalancerCommand.builder()
.withLoadBalancerContext(this)
.withRetryHandler(handler)
.withLoadBalancerURI(request.getUri())
.build();
try {
return command.submit(
new ServerOperation() {
@Override
public Observable call(Server server) {
//修改原始url为实际的url
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
//执行请求
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
public abstract RequestSpecificRetryHandler getRequestSpecificRetryHandler(S request, IClientConfig requestConfig);
这个构造的LoadBalancerCommand是一个RxJava风格的,它包含了重试和异常处理机制:
LoadBalancerCommand.java
代码语言:javascript复制//返回一个只包含一个Server的Observable,但是每次从负载均衡器中获取一个
private Observable selectServer() {
return Observable.create(new OnSubscribe() {
@Override
public void call(Subscribersuper Server> next) {
try {
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
public Observable submit(final ServerOperation operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();
if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
//获取在每个服务实例重试的的次数
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
//最多尝试几个服务实例
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
//对于每个服务实例的调用逻辑
//默认field server是null,通过selectServer()方法获取一个Server
Observable o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1>() {
@Override
//对于每个Server,按顺序映射为对于每个Server包含重试逻辑的请求调用
public Observable call(Server server) {
//设置上下文
context.setServer(server);
final ServerStats stats = loadBalancerContext.getServerStats(server);
//每个Server包含重试逻辑的请求调用
Observable o = Observable
.just(server)
.concatMap(new Func1>() {
@Override
public Observable call(final Server server) {
context.incAttemptCount();
//增加Server正在处理的请求计数
loadBalancerContext.noteOpenConnection(stats);
//监听器
if (listenerInvoker != null) {
try {
listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
//计时器
final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
//operation.call(server)就是刚刚分析的AbstractLoadBalancerAwareClient传过来的ServerOperation,就是直接对这个Server调用请求
//doOnEach的操作就是记录请求前后的一些数据用于负载均衡数据统计
return operation.call(server).doOnEach(new Observer() {
private T entity;
@Override
public void onCompleted() {
//记录请求完成
recordStats(tracer, stats, entity, null);
}
@Override
public void onError(Throwable e) {
//记录请求结束
recordStats(tracer, stats, null, e);
logger.debug("Got error {} when executed on server {}", e, server);
//发生了错误,通知listener
if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}
@Override
public void onNext(T entity) {
//因为只有调用请求成功只有一个结果(只有一个请求), 这里的entity就是结果,只要收到结果就代表请求成功
this.entity = entity;
if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}
private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop();
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}
});
}
});
if (maxRetrysSame > 0)
//是否retry
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}
});
if (maxRetrysNext > 0 && server == null)
//是否retry,如果retry回调用selectServer()返回下一个Server
o = o.retry(retryPolicy(maxRetrysNext, false));
//异常处理
return o.onErrorResumeNext(new Func1>() {
@Override
public Observable call(Throwable e) {
if (context.getAttemptCount() > 0) {
//如果超过重试次数,则抛异常
if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
"Number of retries on next server exceeded max " maxRetrysNext
" retries, while making a call for: " context.getServer(), e);
}
else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
"Number of retries exceeded max " maxRetrysSame
" retries, while making a call for: " context.getServer(), e);
}
}
if (listenerInvoker != null) {
listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
}
return Observable.error(e);
}
});
}
2. 服务实例列表维护机制实现的接口ServerList
AbstractServerList.java
其实这个抽象类一是在实现ServerList接口的同时,实现了IClientConfigAware这个接口,代表是可配置的。 同时,提供了一个生成默认ServerListFilter(这个Filter的实现类是由NIWSServerListFilterClassName这个配置决定,默认是ZoneAffinityServerListFilter)的方法
代码语言:javascript复制public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {
public AbstractServerListFilter getFilterImpl(IClientConfig niwsClientConfig) throws ClientException{
try {
String niwsServerListFilterClassName = niwsClientConfig
.getProperty(
CommonClientConfigKey.NIWSServerListFilterClassName,
ZoneAffinityServerListFilter.class.getName())
.toString();
AbstractServerListFilter abstractNIWSServerListFilter =
(AbstractServerListFilter) ClientFactory.instantiateInstanceWithClientConfig(niwsServerListFilterClassName, niwsClientConfig);
return abstractNIWSServerListFilter;
} catch (Throwable e) {
throw new ClientException(
ClientException.ErrorType.CONFIGURATION,
"Unable to get an instance of CommonClientConfigKey.NIWSServerListFilterClassName. Configured class:"
niwsClientConfig
.getProperty(CommonClientConfigKey.NIWSServerListFilterClassName), e);
}
}
}
ConfigurationBasedServerList.java
这个是默认的实现,如果没有特殊配置,ServerList的实现类就是ConfigurationBasedServerList;这个实际上就是从配置中读取ServerList,这个配置可以是动态配置,例如是Archaius
代码语言:javascript复制public class ConfigurationBasedServerList extends AbstractServerList<Server> {
private IClientConfig clientConfig;
@Override
public List getInitialListOfServers() {
return getUpdatedListOfServers();
}
@Override
public List getUpdatedListOfServers() {
String listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers);
return derive(listOfServers);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
this.clientConfig = clientConfig;
}
//可以看出这个配置就是以逗号分隔的字符串
private List derive(String value) {
List list = Lists.newArrayList();
if (!Strings.isNullOrEmpty(value)) {
for (String s: value.split(",")) {
list.add(new Server(s.trim()));
}
}
return list;
}
}
DiscoveryEnabledNIWSServerList.java
这个就是从Eureka上面获取Server列表的类,构造的时候需要传入相关配置以及最重要的EurekaClient的Provider来获取合适的EurekaClient以便于获取Server列表。
实现ServerList接口的方法都是基于obtainServersViaDiscovery这个方法:
代码语言:javascript复制@Override
public List getInitialListOfServers(){
return obtainServersViaDiscovery();
}
@Override
public List getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}
private List obtainServersViaDiscovery() {
List serverList = new ArrayList();
//如果EurekaClient没有被初始化,则日志报警并返回空的列表
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
//这里的vipAddresses其实就是微服务名称的各种形式,但是注意,它们代表的是同一个微服务
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
List listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
//是否覆盖port
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " clientName " to " overridePort);
}
//这里复制一份是因为不希望其他的地方修改原有的实例信息
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
des.setZone(DiscoveryClient.getZone(ii));
serverList.add(des);
}
}
//如果有一个vipAddress有服务列表,我们就不用获取剩余的了
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break;
}
}
}
return serverList;
}
到这里我们可以看出,Ribbon和Eureka的配合其实就是Ribbon从Eureka中利用微服务名称获取Server列表;那么这个列表是如何更新的呢,在Eureka的章节我们提到过,Ribbon定时从EurekaClient获取服务实例列表更新,这就涉及到了下一个我们要讲到的Ribbon元素 - 服务实例列表更新机制实现的接口ServerListUpdater