开篇提示:本文的讲解中,ribbon底层依赖于OkHttpClient,配置如下:
代码语言:javascript复制#ribbon配置
ribbon.okhttp.enabled=true
# 请求连接的超时时间 默认的时间为1秒,在RibbonClientConfiguration类
springboot-mybatis.ribbon.ConnectTimeout=2000
# 请求处理的超时时间
springboot-mybatis.ribbon.ReadTimeout=15000
我把ribbon官网上关于ribbon中组件的描述贴出来:
代码语言:javascript复制#决定怎样从ServerList中选择一个Server
Rule - a logic component to determine which server to return from a list
#定时判断服务是否存活
Ping - a component running in background to ensure liveness of servers
#服务列表
ServerList - this can be static or dynamic. If it is dynamic (as used by DynamicServerListLoadBalancer), a background thread will refresh and filter the list at certain interval
当我们使用openfeign作为eureka客户端时,我们在@FeignClient中是不用指定url的,ribbon帮我们做了负载均衡。
回顾一下上篇《再谈openfeign,聊聊它的源代码》,之前提到,不指定url的情况下,如果我们底层httpclient选择了okhttpclient,程序是不会直接调用okhttpclient的,而是会选择使用LoadBalancerFeignClient作为代理去调用,代码如下:
代码语言:javascript复制public Response execute(Request request, Request.Options options) throws IOException {
try {
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
IClientConfig requestConfig = getClientConfig(options, clientName);
return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
requestConfig).toResponse();
}
catch (ClientException e) {
//省略异常处理代码
}
}
上面的executeWithLoadBalancer调用了AbstractLoadBalancerAwareClient的executeWithLoadBalancer方法,代码如下:
代码语言:javascript复制public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
//这里的finalUri是http://192.168.0.118:8083/feign/feignReadTimeout
URI finalUri = reconstructURIWithServer(server, request.getUri());//这个就是一个拼接url的方法,不细讲了
//下面的requestForServer是FeignLoadBalancer,它是AbstractLoadBalancerAwareClient的子类
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} //省略异常处理的代码
}
这个方法我们首先创建了一个LoadBalancerCommand对象,然后用这个对象的submit方法来执行请求。从我们上面的注解中我们可以看到,call方法的server参数里面有我们请求要发往的ip和port,这就是ribbon负载均衡器给我们的。我们看一下下面的UML类图:
从这里可以看到,RibbonClientConfiguration类中定义了RibbonLoadBalancerContext这个bean和ZoneAwareLoadBalancer这个bean的初始化,RibbonLoadBalancerContext包装了ZoneAwareLoadBalancer。
Server来源
LoadBalancerCommand在执行submit方法的时候,会选择一个Server(selectServer方法),这个Server里面有feign要发送http请求的地址和端口。
这里有个selectServer()方法,就是获取Server对象的,我们看一下下面的源代码:
代码语言:javascript复制private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
上面的getServerFromLoadBalancer方法在LoadBalancerContext,代码如下:
代码语言:javascript复制public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
String host = null;
int port = -1;
if (original != null) {
//这里取到是-1
host = original.getHost();
}
if (original != null) {
Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);
//这里返回-1
port = schemeAndPort.second();
}
// Various Supported Cases
// The loadbalancer to use and the instances it has is based on how it was registered
// In each of these cases, the client might come in using Full Url or Partial URL
//这里是DynamicServerListLoadBalancer
ILoadBalancer lb = getLoadBalancer();
if (host == null) {
// Partial URI or no URI Case
// well we have to just get the right instances from lb - or we fall back
//这里的lb是ZoneAwareLoadBalancer
if (lb != null){
Server svc = lb.chooseServer(loadBalancerKey);
if (svc == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Load balancer does not have available server for client: "
clientName);
}
host = svc.getHost();
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :" svc);
}
logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
return svc;
} else {
//省略vipAddresses的逻辑
}
} else {
//省略这部分逻辑
}
// end of creating final URL
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,"Request contains no HOST to talk to");
}
// just verify that at this point we have a full URL
return new Server(host, port);
}
这里的LoadBalancer我们用的是ZoneAwareLoadBalancer,我们看一下LoadBalancer的继承关系,UML类图如下:
chooseServer方法在ZoneAwareLoadBalancer的父类BaseLoadBalancer中,代码如下:
代码语言:javascript复制public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
//内容是BasicCounter{config=MonitorConfig{name=LoadBalancer_ChooseServer, tags=COUNTER, policy=DefaultPublishingPolicy}, count=0}
counter.increment();
if (rule == null) {
return null;
} else {
try {
//这里的rule是ZoneAvoidanceRule,但是choose方法在父类PredicateBasedRule
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
ZoneAvoidanceRule类的继承关系也比较复杂,我们后面再讲。
下面我们看一下PredicateBasedRule中的的choose方法:
代码语言:javascript复制public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
//这里的LoadBalancer是ZoneAwareLoadBalancer
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
到这里,我们都看到了ribbon中一个关键的组件,叫ServerList。
从eureka拉取ServerList
从上节的介绍中我们看到,获取server实际上是从ZoneAwareLoadBalancer的getAllServers,这个方法在它的父类BaseLoadBalancer,代码如下:
代码语言:javascript复制public List<Server> getAllServers() {
return Collections.unmodifiableList(allServerList);
}
这个allServerList还是在ZoneAwareLoadBalancer的父类BaseLoadBalancer,那么它是怎么来的呢?我们下看下面这个UML类图:
其中DiscoveryClient是核心,这里CacheRefreshThread在定时线程池里面执行,会使用EurekaHttpClient定时的从eureka拉取服务列表并更新。这里的核心方法是fetchRegistry,这个方法在DiscoveryClient创建的时候也会调用,而DiscoveryClient的初始化在EurekaClientAutoConfiguration这个配置类里面。
代码语言:javascript复制private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
//去除日志打印
//
getAndStoreFullRegistry();
} else {
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
我们看一下上面方法里面的getAndStoreFullRegistry方法:
代码语言:javascript复制private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
//这个调用EurekaHttpClientDecorator去向eureka发送请求
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration 1)) {
/**
* 返回的Applications放到了localRegionApps上,
* 类型是一个原子引用类AtomicReference<Applications>
*
* 这个filterAndShuffle方法一定要注意,这里调用了Applications的shuffleInstances方法,
* 这个方法把拉取到的服务列表放到了virtualHostNameAppMap,从而给Balancer提供服务
*/
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
EurekaHttpClientDecorator的getApplications方法最终调用了AbstractJerseyEurekaHttpClient类的getApplicationsInternal方法,代码如下:
代码语言:javascript复制private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
ClientResponse response = null;
String regionsParamValue = null;
try {
WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
if (regions != null && regions.length > 0) {
regionsParamValue = StringUtil.join(regions);
webResource = webResource.queryParam("regions", regionsParamValue);
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
//这里发送了一个get请求
response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
Applications applications = null;
if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
applications = response.getEntity(Applications.class);
}
//响应封装了applications
return anEurekaHttpResponse(response.getStatus(), Applications.class)
.headers(headersOf(response))
.entity(applications)
.build();
} finally {
//省略处理代码
}
而ZoneAwareLoadBalancer获取列表的时候,正是从Applications中的virtualHostNameAppMap获取,UML类图如下:
到这里,整个流程就通了,DiscoveryClient类似于生产者,负责从Eureka拉取服务列表并赋值给Applications,而ZoneAwareLoadBalancer则类似于消费者,从Applications获取服务列表。
缓存更新
ribbon的缓存更新有2个地方,一个是我们之前的配置:
代码语言:javascript复制springboot-mybatis.ribbon.ServerListRefreshInterval=3
这个key定义在CommonClientConfigKey类,使用的地方在PollingServerListUpdater,代码如下:
代码语言:javascript复制public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
/**这里就是上面配置的ServerListRefreshInterval**/
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
上面的updateAction的实现在DynamicServerListLoadBalancer类,代码如下:
代码语言:javascript复制protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
/**这里就进入了更新ServerList的代码**/
updateListOfServers();
}
};
另一个更新缓存的地方在DiscoveryClient,代码如下:
代码语言:javascript复制private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
//这个时间默认是30s
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
//定时线程池
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
/**这个是定时线程池要执行的任务**/
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
}
CacheRefreshThread的定义如下:
代码语言:javascript复制class CacheRefreshThread implements Runnable {
public void run() {
//这里进入了更新ServerList的代码
refreshRegistry();
}
}
关于ping
上面讲的ribbon官方文档提到过ping,是用于探测服务列表中的服务是否正常,如果不正常,则从eureka拉取服务列表并更新。
ping任务的初始化在BaseLoadBalancer构造函数中初始化,代码如下:
代码语言:javascript复制public BaseLoadBalancer() {
this.name = DEFAULT_NAME;
this.ping = null;
setRule(DEFAULT_RULE);
//这里开启了ping任务
setupPingTask();
lbStats = new LoadBalancerStats(DEFAULT_NAME);
}
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" name,
true);
//pingIntervalSeconds默认是10ms
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
}
class PingTask extends TimerTask {
public void run() {
try {
new Pinger(pingStrategy).runPinger();
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error pinging", name, e);
}
}
}
我们看一下ping方法的核心逻辑:
代码语言:javascript复制public void runPinger() throws Exception {
if (!pingInProgress.compareAndSet(false, true)) {
return; // Ping in progress - nothing to do
}
// we are "in" - we get to Ping
Server[] allServers = null;
boolean[] results = null;
Lock allLock = null;
Lock upLock = null;
try {
/*
* The readLock should be free unless an addServer operation is
* going on...
*/
allLock = allServerLock.readLock();
allLock.lock();
allServers = allServerList.toArray(new Server[allServerList.size()]);
allLock.unlock();
int numCandidates = allServers.length;
//对所有的 server进行ping操作后返回一个boolean类型数组
results = pingerStrategy.pingServers(ping, allServers);
final List<Server> newUpList = new ArrayList<Server>();
final List<Server> changedServers = new ArrayList<Server>();
for (int i = 0; i < numCandidates; i ) {
boolean isAlive = results[i];
Server svr = allServers[i];
boolean oldIsAlive = svr.isAlive();
svr.setAlive(isAlive);
if (oldIsAlive != isAlive) {
changedServers.add(svr);
logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",
name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
}
if (isAlive) {
//把在线状态的Server加入新的ServerList
newUpList.add(svr);
}
}
upLock = upServerLock.writeLock();
upLock.lock();
//覆盖掉旧的ServerList
upServerList = newUpList;
upLock.unlock();
//这里的ServerStatusChangeListener没有找到实现类
notifyServerStatusChangeListener(changedServers);
} finally {
pingInProgress.set(false);
}
}
可见对于ping失败的Server,会从ServerList移除。
聊聊rule
上面讲chooseServer时我们知道,代码在ZoneAwareLoadBalancer的父类BaseLoadBalancer中,代码如下:
代码语言:javascript复制public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
//内容是BasicCounter{config=MonitorConfig{name=LoadBalancer_ChooseServer, tags=COUNTER, policy=DefaultPublishingPolicy}, count=0}
counter.increment();
if (rule == null) {
return null;
} else {
try {
//这里的rule是ZoneAvoidanceRule,但是choose方法在父类PredicateBasedRule
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
这里的Rule用于选择从服务列表上返回哪个Server。Rule的继承关系如下图所示:
我们来看一下PredicateBasedRule的choose方法,代码如下:
代码语言:javascript复制public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
//这里getPredicate()返回AbstractServerPredicate
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
AbstractServerPredicate的chooseRoundRobinAfterFiltering方法如下:
代码语言:javascript复制public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
//这里的loadBalancerKey是null
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
if (loadBalancerKey == null) {
return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
} else {
List<Server> results = Lists.newArrayList();
for (Server server: servers) {
if (this.apply(new PredicateKey(loadBalancerKey, server))) {
results.add(server);
}
}
return results;
}
}
public static AbstractServerPredicate ofKeyPredicate(final Predicate<PredicateKey> p) {
return new AbstractServerPredicate() {
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP")
public boolean apply(PredicateKey input) {
return p.apply(input);
}
};
}
上面方法的p有2个Predicates,debug的内容如下:
代码语言:javascript复制p = {Predicates$AndPredicate@14490} "Predicates.and(com.netflix.loadbalancer.ZoneAvoidancePredicate@70d2b344,com.netflix.loadbalancer.AvailabilityPredicate@7380394)"
components = {ArrayList@14673} size = 2
0 = {ZoneAvoidancePredicate@14675}
1 = {AvailabilityPredicate@14676}
我们看一下他们的apply方法,先看AvailabilityPredicate:
代码语言:javascript复制public boolean apply(@Nullable PredicateKey input) {
LoadBalancerStats stats = getLBStats();
if (stats == null) {
return true;
}
return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}
private boolean shouldSkipServer(ServerStats stats) {
//根据ServerStats进行判断
if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())
|| stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
return true;
}
return false;
}
我们再看一下ZoneAvoidancePredicate的apply方法,这段代码的注解很清晰,zone数量小于等于一个或者取不到stats返回true,如果可用的zone里面没有这个server,返回失败:
代码语言:javascript复制public boolean apply(@Nullable PredicateKey input) {
if (!ENABLED.get()) {
return true;
}
String serverZone = input.getServer().getZone();
if (serverZone == null) {
// there is no zone information from the server, we do not want to filter
// out this server
return true;
}
LoadBalancerStats lbStats = getLBStats();
if (lbStats == null) {
// no stats available, do not filter
return true;
}
if (lbStats.getAvailableZones().size() <= 1) {
// only one zone is available, do not filter
return true;
}
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
if (!zoneSnapshot.keySet().contains(serverZone)) {
// The server zone is unknown to the load balancer, do not filter it out
return true;
}
logger.debug("Zone snapshots: {}", zoneSnapshot);
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null) {
return availableZones.contains(input.getServer().getZone());
} else {
return false;
}
}
这2个Predicate是在CompositePredicate的建造者中传入的,代码如下:
代码语言:javascript复制Builder(AbstractServerPredicate ...primaryPredicates) {
toBuild = new CompositePredicate();
Predicate<PredicateKey> chain = Predicates.<PredicateKey>and(primaryPredicates);
toBuild.delegate = AbstractServerPredicate.ofKeyPredicate(chain);
}
而调用这个建造者函数的地方是在ZoneAvoidanceRule初始化的时候传入的
代码语言:javascript复制public ZoneAvoidanceRule() {
super();
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}
private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
return CompositePredicate.withPredicates(p1, p2)
.addFallbackPredicate(p2)
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}
当然,Predicate也是支持自定义配置的,感兴趣的大家可以研究,获取配置的代码如下:
代码语言:javascript复制public void initWithNiwsConfig(IClientConfig clientConfig) {
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig);
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig);
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}
ribbon重试
ribbon重试功能对服务的优雅发布有一定好处。这里首先要提示一下,ribbon负载均衡要依赖于spring的retry,需要pom中添加下面依赖:
代码语言:javascript复制<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.2.5.RELEASE</version>
</dependency
重试的配置如下:
代码语言:javascript复制#Ribbon缓存更新周期默认30s,改为3s
springboot-mybatis.ribbon.ServerListRefreshInterval=3
#同一台实例最大重试次数,不包括首次调用
springboot-mybatis.ribbon.MaxAutoRetries=1
#重试负载均衡其他的实例最大重试次数,不包括首次调用
springboot-mybatis.ribbon.MaxAutoRetriesNextServer=1
#对所有操作请求都进行重试,如果改为true,对post也重试
springboot-mybatis.ribbon.OkToRetryOnAllOperations=false
springboot-mybatis.ribbon.retryableStatusCodes=404,408,502,500
下面我给出一个UML类图:
重试的核心代码在LoadBalancerCommand类submit方法,下面的原生注释大家一定能要看一下:
代码语言:javascript复制/**
* Create an {@link Observable} that once subscribed execute network call asynchronously with a server chosen by load balancer.
* If there are any errors that are indicated as retriable by the {@link RetryHandler}, they will be consumed internally by the
* function and will not be observed by the {@link Observer} subscribed to the returned {@link Observable}. If number of retries has
* exceeds the maximal allowed, a final error will be emitted by the returned {@link Observable}. Otherwise, the first successful
* result during execution and retries will be emitted.
*/
public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();
if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
//上面配置的MaxAutoRetries和MaxAutoRetriesNextServer,这里都是1
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
// Use the load balancer
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// Called for each server being selected
public Observable<T> call(Server server) {
context.setServer(server);
final ServerStats stats = loadBalancerContext.getServerStats(server);
// Called for each attempt and retry
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
context.incAttemptCount();
loadBalancerContext.noteOpenConnection(stats);
//省略部分代码
final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
@Override
public void onCompleted() {
recordStats(tracer, stats, entity, null);
// TODO: What to do if onNext or onError are never called?
}
@Override
public void onError(Throwable e) {
recordStats(tracer, stats, null, e);
logger.debug("Got error {} when executed on server {}", e, server);
if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}
@Override
public void onNext(T entity) {
this.entity = entity;
if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}
private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop();
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}
});
}
});
if (maxRetrysSame > 0)
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}
});
//初始化会走到这里
if (maxRetrysNext > 0 && server == null)
//retryPolicy会判断错误码类型
o = o.retry(retryPolicy(maxRetrysNext, false));
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
@Override
public Observable<T> call(Throwable e) {
if (context.getAttemptCount() > 0) {
//重试次数超过后要抛出异常
if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
"Number of retries on next server exceeded max " maxRetrysNext
" retries, while making a call for: " context.getServer(), e);
}
else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
"Number of retries exceeded max " maxRetrysSame
" retries, while making a call for: " context.getServer(), e);
}
}
if (listenerInvoker != null) {
listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
}
return Observable.error(e);
}
});
}
这个方法里面的retryPolicy会捕获异常并判断是不是RetriableException,而RetriableException就是我们配置的4个错误码:404,408,502,500
我们再看下RetryableFeignLoadBalancer的execute方法:
代码语言:javascript复制public RibbonResponse execute(final RibbonRequest request, IClientConfig configOverride)
throws IOException {
final Request.Options options;
if (configOverride != null) {
RibbonProperties ribbon = RibbonProperties.from(configOverride);
options = new Request.Options(
ribbon.connectTimeout(this.connectTimeout),
ribbon.readTimeout(this.readTimeout));
}
else {
options = new Request.Options(this.connectTimeout, this.readTimeout);
}
final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(this.getClientName(), this);
RetryTemplate retryTemplate = new RetryTemplate();
BackOffPolicy backOffPolicy = loadBalancedRetryFactory.createBackOffPolicy(this.getClientName());
retryTemplate.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy);
RetryListener[] retryListeners = this.loadBalancedRetryFactory.createRetryListeners(this.getClientName());
if (retryListeners != null && retryListeners.length != 0) {
retryTemplate.setListeners(retryListeners);
}
//retryPolicy是RibbonLoadBalancedRetryPolicy,里面的lbContext里面有我们的参数
retryTemplate.setRetryPolicy(retryPolicy == null ? new NeverRetryPolicy()
: new FeignRetryPolicy(request.toHttpRequest(), retryPolicy, this, this.getClientName()));
//retryTemplate.execute方法里失败后会直接进行重试,用while循环进行重试
return retryTemplate.execute(new RetryCallback<RibbonResponse, IOException>() {
@Override
public RibbonResponse doWithRetry(RetryContext retryContext) throws IOException {
Request feignRequest = null;
//on retries the policy will choose the server and set it in the context
//extract the server and update the request being made
if (retryContext instanceof LoadBalancedRetryContext) {
ServiceInstance service = ((LoadBalancedRetryContext) retryContext).getServiceInstance();
if (service != null) {
feignRequest = ((RibbonRequest) request.replaceUri(reconstructURIWithServer(new Server(service.getHost(), service.getPort()), request.getUri()))).toRequest();
}
}
if (feignRequest == null) {
feignRequest = request.toRequest();
}
Response response = request.client().execute(feignRequest, options);
/**
* 判断错误码类型是不是我们配置的,是的话抛出RetryableStatusCodeException,
* 这个是RetryableStatusCodeException的子类
*/
if (retryPolicy != null && retryPolicy.retryableStatusCode(response.status())) {
byte[] byteArray = response.body() == null ? new byte[]{} : StreamUtils.copyToByteArray(response.body().asInputStream());
response.close();
throw new RibbonResponseStatusCodeException(RetryableFeignLoadBalancer.this.clientName, response,
byteArray, request.getUri());
}
return new RibbonResponse(request.getUri(), response);
}
}, new LoadBalancedRecoveryCallback<RibbonResponse, Response>() {
@Override
protected RibbonResponse createResponse(Response response, URI uri) {
return new RibbonResponse(uri, response);
}
});
}
总结
ribbon作为openfeign的负载均衡器,是懒加载的,只有openfeign第一次获取服务列表的时候,才会初始化LoadBalancer,也才会初始化ping和缓存刷新任务。
ribbon重试对优雅发布会有作用,所以建议把MaxAutoRetries设置为0,即当前服务失败时直接请求下一个服务。
ribbon的源代码比较复杂,有一些RxJava的基础会容易一些。
ribbon的源代码非常烧脑,有问题欢迎大家交流。