Ribbon使用分析

2021-07-15 10:30:37 浏览数 (1)

Ribbon使用

ribbon在使用上非常简单,仅仅只需要在配置类上加入配置即可

代码语言:javascript复制
    @Bean
    @LoadBalanced
    public RestTemplate restTemplate(){
        return new RestTemplate();
    }

调用时,直接使用在eureka中注册的服务名进行调用,就可以由ribbon来进行负载均衡了

代码语言:javascript复制
    @GetMapping("/checkAndBegin/{userId}")
    public Integer findResumeOpenStateEureka(@PathVariable Long userId) {
//        List<ServiceInstance> list = discoveryClient.getInstances("lagou-service-resume");
//        ServiceInstance serviceInstance = list.get(0);
//        String host = serviceInstance.getHost();
//        int port = serviceInstance.getPort();
        String url = "http://zhao-service-resume/resume/openstate/" userId;
        System.out.println("从eureka中获取了请求地址" url);
        Integer forObject =
                restTemplate.getForObject(url, Integer.class);
        return forObject;
    }

根据要求,zhao-service-resume项目开启多个,并打印请求信息,即可发现负载均衡已经实现 另外目前Ribbon的内置负载均衡策略

目前默认使用的是随机负载均衡RandomRule,默认全局生效,但是可以针对不同的调用服务设置不同的负载均衡策略

代码语言:javascript复制
zhao-service-resume:
 ribbon:
 NFLoadBalancerRuleClassName:
com.netflix.loadbalancer.RandomRule #负载策略调整

同时,可以自定负载均衡策略并配置

Ribbon源码分析

一般而言,自动装配类就是加载配置的入口。

代码语言:javascript复制
@Configuration
@Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)
@RibbonClients
@AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
@AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
@EnableConfigurationProperties({RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class})
public class RibbonAutoConfiguration {
}

通过上述配置,我们可以先看看LoadBalancerAutoConfiguration的具体内容

代码语言:javascript复制
 @LoadBalanced
 @Autowired(required = false)
 private List<RestTemplate> restTemplates = Collections.emptyList();

此处将自动注入添加了@LoadBalanced注解的RestTemplate对象 同时还注入了一个RestTemplate的定制器RestTemplateCustomizer

代码语言:javascript复制
 @Bean
 public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
   final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
  return () -> restTemplateCustomizers.ifAvailable(customizers -> {
            for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                for (RestTemplateCustomizer customizer : customizers) {
                    customizer.customize(restTemplate);
                }
            }
        });
 }

继续进入RestTemplateCustomizer的定制器代码,我们发现在定制器中加入了一个拦截器

代码语言:javascript复制
@Configuration
 @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
 static class LoadBalancerInterceptorConfig {
  @Bean
  public LoadBalancerInterceptor ribbonInterceptor(
    LoadBalancerClient loadBalancerClient,
    LoadBalancerRequestFactory requestFactory) {
   return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
  }

  @Bean
  @ConditionalOnMissingBean
  public RestTemplateCustomizer restTemplateCustomizer(
    final LoadBalancerInterceptor loadBalancerInterceptor) {
   return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
  }
 }

ClientHttpRequestInterceptor的拦截具体内容为,根据获取到的请求路径和请求地址进行负载均衡

代码语言:javascript复制
 @Override
 public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
   final ClientHttpRequestExecution execution) throws IOException {
  final URI originalUri = request.getURI();
  String serviceName = originalUri.getHost();
  Assert.state(serviceName != null, "Request URI does not contain a valid hostname: "   originalUri);
  return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
 }

执行负载均衡的代码

代码语言:javascript复制
 public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
  ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
  Server server = getServer(loadBalancer, hint);
  if (server == null) {
   throw new IllegalStateException("No instances available for "   serviceId);
  }
  RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
    serviceId), serverIntrospector(serviceId).getMetadata(server));

  return execute(serviceId, ribbonServer, request);
 }

从这段代码可以看出,第一行根据配置,选出相应的负载均衡策略。第二行就是根据相应的负载均衡策略选择一个服务端进行服务请求,达到负载均衡的目的 最后在BaseLoadBalancer中执行了根据不同的策略选择服务的操作

代码语言:javascript复制
    public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                return rule.choose(key);
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }

考虑完了上面的主逻辑之后,还有一个问题,就是服务列表是什么时候获取到的。在RibbonAutoConfigration中注入了SpringClientFactory,而SpringClientFactory又注入了RibbonClientConfiguration

代码语言:javascript复制
 public SpringClientFactory() {
  super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name");
 }

RibbonClientConfiguration中进行了注入客户端操作的相关操作,包括负载均衡策略,客户端配置,服务列表等,其中最重要的就是如何获取和更新服务列表

代码语言:javascript复制
 @ConditionalOnMissingBean
 @SuppressWarnings("unchecked")
 public ServerList<Server> ribbonServerList(IClientConfig config) {
  if (this.propertiesFactory.isSet(ServerList.class, name)) {
   return this.propertiesFactory.get(ServerList.class, config, name);
  }
  ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
  serverList.initWithNiwsConfig(config);
  return serverList;
 }

 @Bean
 @ConditionalOnMissingBean
 public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
  return new PollingServerListUpdater(config);
 }

 @Bean
 @ConditionalOnMissingBean
 public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
   ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
   IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
  if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
   return this.propertiesFactory.get(ILoadBalancer.class, config, name);
  }
  return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
    serverListFilter, serverListUpdater);
 }

在ribbonList方法中并未有获取serverList的操作,在ribbonLoadBalancer中进行了使用,那么究竟怎么一回事呢?实际上是在ZoneAwareLoadBalancer的父类DynamicServerListLoadBalancer中进行了重新的赋值并且执行了定时任务进行更新。

代码语言:javascript复制
    void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        enableAndInitLearnNewServersFeature();

        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }

首先通过updateAction.doUpdate();更新,然后通过getRefreshExecutor()进行获取

代码语言:javascript复制
 @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }

0 人点赞