ribbon源码

2020-09-27 17:01:02 浏览数 (1)

一. ribbon的使用

通过前面对微服务ribbon的学习, 知道了ribbon的基本使用方式。

比如:我的order服务,想要调用stock减库存的操作。 应该怎么实现呢?

第一步:引入ribbon

代码语言:javascript复制
@LoadBalanced
@Bean
public RestTemplate getRestTemplate() {
    return new RestTemplate();
}

这里通过@LoadBalance注解, 引入了ribbon, 自动实现ribbon的负载均衡策略

第二步:写接口

如上图, 通过restTemplate手动指定stock服务,并调用其接口. 我们看到在域名部分,我们写的是服务名. 其实调用了ribbon的负载均衡策略以后, 我们大概可以知道, 它是将

http://stock 变成了 http://ip:port/的形式. 这其实就是ribbon的原理. 那么他是如何来选择ip和port的呢? 这就是具体的实现. 是使用轮训的方式找到ip, 还是使用随机的方式找到ip

代码语言:javascript复制
学习源码的方法一
1. 以ribbon为例, 先学会使用ribbon, 知道ribbon具体有哪些功能, 知道其效果
2. 猜测ribbon是如何实现的, 也就是说, 如果是我们自己来实现ribbon的负载均衡功能, 我们要怎么做?
3. 看源码, 对比自己的思想和源码的异同. 

这是一种有自己思考的学习方式. 对于学习源码来说也会觉得更有趣

其实,如果想在项目中使用ribbon, 这两步基础就ok了, 那么, 他到底是在底层如何运转的呢? 来看看ribbon的实现.

二. ribbon源码入口

1. 就从@LoadBalanced这个注解入手

点击进入到@LoadBalanced的源码

这个源码就是定义了一个注解,没有特殊的含义. 这是一个接口, 那么注解是在哪里被实现的呢?那么,就需要查源码调用了

2. 查找loadBalanced的实现类

怎么找实现类呢?入口在哪里?

方法一: 入口通常在META-INF/spring.factories文件里.里面找到引入了LoadBalanced 类的初始化类.

通过观察, 发现和LoadBalanced有关的自动配置类有两个, 二第一个关联性更大, 因为名字基本一样. 所以, 先定位到第一个

方法二: 纯经验猜测.

首先找到LoadBalanced注解所在的包, 然后看看里面有没有和LoadBalanced有关系的AutoConfiguration配置了, 这就是靠猜了

然后, 我们可以很容易的就看到LoadBalancerAutoConfiguration类. 这个是凭经验找到了, 接下来, 验证一下是不是这个类.

三. LoadBalancerAutoConfiguration自动配置类

看一看,它里面都注入了哪些东西呢?

首先, 初始化了一个LoadBalancerInterceptor拦截器,

代码语言:javascript复制
@Bean
public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
     return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}

这是一个拦截器, 也就是负载均衡具体实现的拦截器, 他是通过哪个拦截器实现的呢? 这里面是使用LoadBalancerInterceptor实现的. 其实, 这里面是真正实现负载均衡功能的地方

先简单看一下, 下面来看看另一个初始化方法

第二. 初始化RestTemplateCustomeizer方法

代码语言:javascript复制
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
     return (restTemplate) -> {
         List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
         list.add(loadBalancerInterceptor);
         restTemplate.setInterceptors(list);
     };
}

首先看这个方法的入参, 方法的入参是谁呢? 就是第一步初始化的LoadBalancerInterceptor. 在@Bean注解中, 入参前面省略了@AutoWired, 也就是说, 相当于自动引入了LoadBalancerInterceptor类.

然后再来看返回值, 返回值是一个RestTemplateCustomizer, 这是一个接口, 里面就定义了一个方法

代码语言:javascript复制
public interface RestTemplateCustomizer {
    void customize(RestTemplate restTemplate);
}

而下面这段代码返回的一定是一个RestTemplateCustomeizer, 那么return的内容就是customize(RestTemplate restTemplate)的具体实现了

代码语言:javascript复制
return (restTemplate) -> {
    List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
    list.add(loadBalancerInterceptor);
    restTemplate.setInterceptors(list);
};

public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {....}具体实现是什么呢?

就是将我们上面定义的拦截器添加到restTemplate中. restTemplate哪里来的呢? 我们可以看到最上面定义了这个

代码语言:javascript复制
@LoadBalanced
@Autowired(
     required = false
    )
private List<RestTemplate> restTemplates = Collections.emptyList();

他的含义是: 为每一个加了@LoadBalanced注解的RestTemplate, 都将其添加到restTemplates集合中. 然后对这个集合进行处理

第三步: 初始化SmartInitializingSingleton类

代码语言:javascript复制
  @Bean
    public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
        return () -> {
            restTemplateCustomizers.ifAvailable((customizers) -> {
                Iterator var2 = this.restTemplates.iterator();

                while(var2.hasNext()) {
                    RestTemplate restTemplate = (RestTemplate)var2.next();
                    Iterator var4 = customizers.iterator();

                    while(var4.hasNext()) {
                        RestTemplateCustomizer customizer = (RestTemplateCustomizer)var4.next();
                        customizer.customize(restTemplate);
                    }
                }

            });
        };
    }

这个类的入参是final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers, 也就是RestTemplate的定制器. 是一个list列表集合

具体实现是: 循环遍历this.restTemplates.iterator(), 也就是初始化时带有@LoadBalanced的RestTemplate, 然后执行里面的定制内容customize.

大概知道了自动配置里面引入了哪些类, 其中拦截器的实现是具体ribbon逻辑实现部分, 所以, 下面我们来看LoadBalancerInterceptor

四. LoadBalancerInterceptor拦截器实现类

代码语言:javascript复制
小贴士: 看源码技巧

这里有一个规则, 如果看过滤器, 那么其他方法都不重要, 主要看filter的具体实现. 看拦截器, 其他都不重要, 主要看intercept方法的实现

直接看intercept方法的实现

代码语言:javascript复制
package org.springframework.cloud.client.loadbalancer;

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    ......
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
        URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: "   originalUri);
        return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
    }
}

具体实现: 第一步: 获取请求的uri, 我们这里的是http://stock/stock/reduct/count/1/2

第二步: 获取服务的host域名, 这里对应的就是stock.

     第三步: this.loadBalancer.execute(serviceName, ......). 这里传入了服务名, 我们猜测一下实现. 首先会根据服务名获取集群, 然后在根据负载均衡策略找到ip, 然后调用http请求, 发送到指定ip

前两步不说了, 不那么重要, 来看第三步. 具体实现

代码语言:javascript复制
  public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
        ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
        Server server = this.getServer(loadBalancer, hint);
        if (server == null) {
            throw new IllegalStateException("No instances available for "   serviceId);
        } else {
            RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
            return this.execute(serviceId, (ServiceInstance)ribbonServer, (LoadBalancerRequest)request);
        }
    }

这里代码很短, 简单看一下

  • 第一步: 获取负载均衡器,
    • ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
    • 负载均衡器猜一下干嘛的, 一会我们选出了3个或者5个节点, 要使用负载均衡器来看看, 到底选择哪一个节点发送请求
  • Server server = this.getServer(loadBalancer, hint);
    • 将LoadBalancer作为参数传入, 最终过滤选择出一个server, 将请求发送到这个server上.

我们来看第一步: 获取负载均衡器 ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);

在这一步走到getInstance(name, ILoadBalancer.class);根据名字获取一个负载均衡器. 返回的是一个ILoadBalancer, 我们知道这是一个接口, 那么他到底是什么类型的负载均衡器呢?

我们不知道, 那么能知道的是他一定是在某个地方初始化的时候, 指定了使用哪一个实现类. 而通常这用定义实现类的地方在Configuration的@Bean中.

根据这个思路, 我们在RibbonLoadBalancerClient包下找一找有没有类似的Configuration. 根据名字猜测, 最终找到了

代码语言:javascript复制
RibbonClientConfiguration

而这里面刚好有ILoadBalancer的实现

代码语言:javascript复制
    @Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
            IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
            return this.propertiesFactory.get(ILoadBalancer.class, config, name);
        }
        return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
    }

棕色部分的代码是去application.yml属性值取, 如果有特别指定使用哪个ILoadBalancer类, 那么优先使用配置中的, 如果配置没有, 则使用new ZoneAwareLoadBalancer<>(...)类.

好了, 我们找到这里就可以了, 先不看ZoneAwareLoadBalancer的具体实现.

接下来看这一步: Server server = this.getServer(loadBalancer, hint); 获取服务

代码语言:javascript复制
package org.springframework.cloud.netflix.ribbon;

public class RibbonLoadBalancerClient implements LoadBalancerClient {
        ......

    protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
        if (loadBalancer == null) {
            return null;
        }
        // Use 'default' on a null hint, or just pass it on?
        return loadBalancer.chooseServer(hint != null ? hint : "default");
    }
}

走到这段源码, 我们就知道了, 要低啊用LoadBalancer的chooseServer方法. 而在上一步,我们知道这个LoadBalancer的实现类是ZoneAwareLoadBalancer, 所以, 我们可以直接到ZoneAwareLoadBalancer里面找到chooseServer方法了.

这里指向的是亚马逊的区域, 在中国只有一个区, 所以, 这里的数量始终是1, 所以, 最后走的是else分支

调用return super.chooseServer(key);方法

在super的chooseServer(key)方法里, 其他都不重要, 重要的是this.rule.choose(key)方法

这里的rule是一个接口protected IRule rule; 联想一下, 现在有负载均衡器了, 那么还要有负载均衡规则, 而rule正是具体实现的负载均衡规则.

根据经验我们知道, 这个IRule接口一定是在某个地方通过@Bean被初始化了,

又是根据经验, 我们猜到这个初始化的位置, 应该和刚才ILoadBalancer在同一个地方, 因为他们是同一个功能的代码, 那么如果是我写的话, 我会把同一个功能的代码的初始化放在一个地方.

所以, 来看看初始化ILoadBalancer的类RibbonClientConfiguration有没有, 搜索一下IRule ,果然找到了

代码语言:javascript复制
    @Bean
    @ConditionalOnMissingBean
    public IRule ribbonRule(IClientConfig config) {
        if (this.propertiesFactory.isSet(IRule.class, name)) {
            return this.propertiesFactory.get(IRule.class, config, name);
        }
        ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
        rule.initWithNiwsConfig(config);
        return rule;
    }

棕色部分的代码是先从application.yml配置文件中读取用户指定的IRule实现类. 如果没有, 就是用默认的ZoneAvoidanceRule实现类.

代码语言:javascript复制
Ribbon的三大组件
1. Rule
2. Ping
3. LoadBalancer

这是ribbon三个最重要的组件, 他们三个都是在RibbonClientConfiguration被初始化的. 
代码语言:javascript复制
   @Bean
    @ConditionalOnMissingBean
    public IRule ribbonRule(IClientConfig config) {
        if (this.propertiesFactory.isSet(IRule.class, name)) {
            return this.propertiesFactory.get(IRule.class, config, name);
        }
        ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
        rule.initWithNiwsConfig(config);
        return rule;
    }

    @Bean
    @ConditionalOnMissingBean
    public IPing ribbonPing(IClientConfig config) {
        if (this.propertiesFactory.isSet(IPing.class, name)) {
            return this.propertiesFactory.get(IPing.class, config, name);
        }
        return new DummyPing();
    }

   @Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
            IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
            return this.propertiesFactory.get(ILoadBalancer.class, config, name);
        }
        return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
    }

下面看看ZoneAvoidanceRule的实现, 为什么看看实现呢, 要了解他的父类继承关系, 因为很可能在调用的时候, 不是调用的它本身, 而是调用的父类

我们记住zoneAvoidanceRule的一个父类是PredicateBaseRule.

下面我们在回到this.rule.choose(key)这个方法上来, 这回我们就知道这里调用的choose()是哪一个类下面的了, 他是PredicateBaseRule下面的.

代码语言:javascript复制
  @Override
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }

很显然这里采用的是轮训策略选择服务器. 具体的轮训策略是如何选择的呢? 具体来看看chooseRoundRobinAfterFiltering()方法

代码语言:javascript复制
/**
     * Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key. 
     */
    public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
     // 获取全部可用的服务
        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {
            return Optional.absent();
        }
     // 采用的服务策略.
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }
代码语言:javascript复制
  private int incrementAndGetModulo(int modulo) {
        for (;;) {
            int current = nextIndex.get();
            int next = (current   1) % modulo;
            if (nextIndex.compareAndSet(current, next) && current < modulo)
                return current;
        }
    }

有效的服务数 1取模, 为了防止并发, 这里使用了CAS的思想, 比较并赋值. 如果赋值失败, 会再次进入for循环, 知道成功为止

在选择轮训策略的时候, chooseRoundRobinAfterFiltering(lb.getAllServers(), key); 我们传进来了一个参数 ,lb.getAllServers(), 获取负载均衡中的所有服务.这都有哪些服务呢?往前推理, 应该是有某个地方传入了这个参数, 或者通过某个参数计算得到了服务列表. ---> 这时我们也不知道在哪里, 那就看看构造方法吧, 看谁的构造方法呢, 负载均衡器的构造方法

前面已经知道负载均衡器使用的是ZoneAwareLoadBalancer, 调用了父类的构造方法

代码语言:javascript复制
public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
                                 IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
                                 ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
    }

下面来看父类的构造方法

代码语言:javascript复制
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        restOfInit(clientConfig);
    }

一般情况下, 看到init方法和start方法, 都要进去看看, 这里通常都是重点, 里面有内容

代码语言:javascript复制
     void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        enableAndInitLearnNewServersFeature();

        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }

启动, 初始化服务, 而且还要不断你的学习. 这里的学习是什么意思呢? 其实就是不停的问问nacos, 服务列表有更新么?有更新,我就去拉取过来, 更新本地的服务.

具体的enableAndInitLearnNewServersFeature()方法是什么呢

代码语言:javascript复制
  public void enableAndInitLearnNewServersFeature() {
        LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
        serverListUpdater.start(updateAction);
    }

updateAction是要更新的动作, 也就是最终更新操作是在updateAction中执行的, 来看看为什么这么说呢

代码语言:javascript复制
  protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };

这里updateAction这个变量是UpdateAction的一个实例对象, 这个对象有一个方法doUpdate(), 而doUpdate()方法中调用了updateListOfServers()方法.

代码语言:javascript复制
  @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

更新服务. 这是具体的更新服务的方法

这个更新操作是什么时候被执行的呢?serverListUpdater.start(updateAction); 在这里, 一看见start, 知道是个类似于定时任务的方法, 很重要. 进去看看

代码语言:javascript复制
@Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }

updateAction.doUpdate();就是上面执行的方法 , 这个方法的返回值是一个Runnable线程, 然后继续往下看, 将runnable线程传给了一个定时任务. 定时执行更新操作.

上面的操作其实就是上图, ribbon和nacos同步服务集群数据的过程. 我们知道, 在nacos中有一个注册表用来存储服务注册过来的信息, 项目启动后, 这些元数据信息会同步回传给ribbon, ribbon会在本地维护一个注册表. 但这个注册表可能随时变化, 所以, 需要定期去同步更新服务数据. 所以.

1. 初始化的时候去nacos去服务注册表数据

2. 定时任务同步获取注册表数据

代码语言:javascript复制
@VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

更新服务的三个重要的方法

1. getUpdatedListOfServers : 查询nacos中的服务实例, 在查询之前更新, 然后返回最新的服务列表

2. getFilteredListOfServers: 过滤服务实例, 将满足条件的服务实例过滤, 不满足条件的去掉

3. updateAllServerList: 更新服务实例

0 人点赞