Ribbon分析

2021-07-13 11:46:44 浏览数 (1)

### Ribbon使用

ribbon在使用上非常简单,仅仅只需要在配置类上加入配置即可

```

@Bean

@LoadBalanced

public RestTemplate restTemplate(){

return new RestTemplate();

}

```

调用时,直接使用在eureka中注册的服务名进行调用,就可以由ribbon来进行负载均衡了

```

@GetMapping("/checkAndBegin/{userId}")

public Integer findResumeOpenStateEureka(@PathVariable Long userId) {

// List<ServiceInstance> list = discoveryClient.getInstances("lagou-service-resume");

// ServiceInstance serviceInstance = list.get(0);

// String host = serviceInstance.getHost();

// int port = serviceInstance.getPort();

String url = "http://zhao-service-resume/resume/openstate/" userId;

System.out.println("从eureka中获取了请求地址" url);

Integer forObject =

restTemplate.getForObject(url, Integer.class);

return forObject;

}

```

根据要求,zhao-service-resume项目开启多个,并打印请求信息,即可发现负载均衡已经实现

另外目前Ribbon的内置负载均衡策略

![file](http://cdn.kobefan.cn/FqgCxl5Ur3i22NadrVI2YrxMs9pB)

目前默认使用的是随机负载均衡RandomRule,默认全局生效,但是可以针对不同的调用服务设置不同的负载均衡策略

```

zhao-service-resume:

ribbon:

NFLoadBalancerRuleClassName:

com.netflix.loadbalancer.RandomRule #负载策略调整

```

同时,可以自定负载均衡策略并配置

### Ribbon源码分析

一般而言,自动装配类就是加载配置的入口。

```

@Configuration

@Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)

@RibbonClients

@AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")

@AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})

@EnableConfigurationProperties({RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class})

public class RibbonAutoConfiguration {

}

```

通过上述配置,我们可以先看看LoadBalancerAutoConfiguration的具体内容

```

@LoadBalanced

@Autowired(required = false)

private List<RestTemplate> restTemplates = Collections.emptyList();

```

此处将自动注入添加了@LoadBalanced注解的RestTemplate对象

同时还注入了一个RestTemplate的定制器RestTemplateCustomize

```

@Bean

public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(

final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {

return () -> restTemplateCustomizers.ifAvailable(customizers -> {

for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {

for (RestTemplateCustomizer customizer : customizers) {

customizer.customize(restTemplate);

}

}

});

}

```

继续进入RestTemplateCustomizer的定制器代码,我们发现在定制器中加入了一个拦截器

```

@Configuration

@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")

static class LoadBalancerInterceptorConfig {

@Bean

public LoadBalancerInterceptor ribbonInterceptor(

LoadBalancerClient loadBalancerClient,

LoadBalancerRequestFactory requestFactory) {

return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);

}

@Bean

@ConditionalOnMissingBean

public RestTemplateCustomizer restTemplateCustomizer(

final LoadBalancerInterceptor loadBalancerInterceptor) {

return restTemplate -> {

List<ClientHttpRequestInterceptor> list = new ArrayList<>(

restTemplate.getInterceptors());

list.add(loadBalancerInterceptor);

restTemplate.setInterceptors(list);

};

}

}

```

ClientHttpRequestInterceptor的拦截具体内容为,根据获取到的请求路径和请求地址进行负载均衡

```

@Override

public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,

final ClientHttpRequestExecution execution) throws IOException {

final URI originalUri = request.getURI();

String serviceName = originalUri.getHost();

Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " originalUri);

return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));

}

```

执行负载均衡的代码

```

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {

ILoadBalancer loadBalancer = getLoadBalancer(serviceId);

Server server = getServer(loadBalancer, hint);

if (server == null) {

throw new IllegalStateException("No instances available for " serviceId);

}

RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,

serviceId), serverIntrospector(serviceId).getMetadata(server));

return execute(serviceId, ribbonServer, request);

}

```

从这段代码可以看出,第一行根据配置,选出相应的负载均衡策略。第二行就是根据相应的负载均衡策略选择一个服务端进行服务请求,达到负载均衡的目的

最后在BaseLoadBalancer中执行了根据不同的策略选择服务的操作

```

public Server chooseServer(Object key) {

if (counter == null) {

counter = createCounter();

}

counter.increment();

if (rule == null) {

return null;

} else {

try {

return rule.choose(key);

} catch (Exception e) {

logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);

return null;

}

}

}

```

考虑完了上面的主逻辑之后,还有一个问题,就是服务列表是什么时候获取到的。

在RibbonAutoConfigration中注入了SpringClientFactory,而SpringClientFactory又注入了RibbonClientConfiguration

```

public SpringClientFactory() {

super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name");

}

```

RibbonClientConfiguration中进行了注入客户端操作的相关操作,包括负载均衡策略,客户端配置,服务列表等,其中最重要的就是如何获取和更新服务列表

```

@ConditionalOnMissingBean

@SuppressWarnings("unchecked")

public ServerList<Server> ribbonServerList(IClientConfig config) {

if (this.propertiesFactory.isSet(ServerList.class, name)) {

return this.propertiesFactory.get(ServerList.class, config, name);

}

ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();

serverList.initWithNiwsConfig(config);

return serverList;

}

@Bean

@ConditionalOnMissingBean

public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {

return new PollingServerListUpdater(config);

}

@Bean

@ConditionalOnMissingBean

public ILoadBalancer ribbonLoadBalancer(IClientConfig config,

ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,

IRule rule, IPing ping, ServerListUpdater serverListUpdater) {

if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {

return this.propertiesFactory.get(ILoadBalancer.class, config, name);

}

return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,

serverListFilter, serverListUpdater);

}

```

在ribbonList方法中并未有获取serverList的操作,在ribbonLoadBalancer中进行了使用,那么究竟怎么一回事呢?实际上是在ZoneAwareLoadBalancer的父类DynamicServerListLoadBalancer中进行了重新的赋值并且执行了定时任务进行更新。

```

void restOfInit(IClientConfig clientConfig) {

boolean primeConnection = this.isEnablePrimingConnections();

// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()

this.setEnablePrimingConnections(false);

enableAndInitLearnNewServersFeature();

updateListOfServers();

if (primeConnection && this.getPrimeConnections() != null) {

this.getPrimeConnections()

.primeConnections(getReachableServers());

}

this.setEnablePrimingConnections(primeConnection);

LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());

}

```

首先通过updateAction.doUpdate();更新,然后通过getRefreshExecutor()进行获取

```

@Override

public synchronized void start(final UpdateAction updateAction) {

if (isActive.compareAndSet(false, true)) {

final Runnable wrapperRunnable = new Runnable() {

@Override

public void run() {

if (!isActive.get()) {

if (scheduledFuture != null) {

scheduledFuture.cancel(true);

}

return;

}

try {

updateAction.doUpdate();

lastUpdated = System.currentTimeMillis();

} catch (Exception e) {

logger.warn("Failed one update cycle", e);

}

}

};

scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(

wrapperRunnable,

initialDelayMs,

refreshIntervalMs,

TimeUnit.MILLISECONDS

);

} else {

logger.info("Already active, no-op");

}

}

```

0 人点赞