一. ribbon的使用
通过前面对微服务ribbon的学习, 知道了ribbon的基本使用方式。
比如:我的order服务,想要调用stock减库存的操作。 应该怎么实现呢?
第一步:引入ribbon
代码语言:javascript复制@LoadBalanced
@Bean
public RestTemplate getRestTemplate() {
return new RestTemplate();
}
这里通过@LoadBalance注解, 引入了ribbon, 自动实现ribbon的负载均衡策略
第二步:写接口
如上图, 通过restTemplate手动指定stock服务,并调用其接口. 我们看到在域名部分,我们写的是服务名. 其实调用了ribbon的负载均衡策略以后, 我们大概可以知道, 它是将
http://stock 变成了 http://ip:port/的形式. 这其实就是ribbon的原理. 那么他是如何来选择ip和port的呢? 这就是具体的实现. 是使用轮训的方式找到ip, 还是使用随机的方式找到ip
代码语言:javascript复制学习源码的方法一
1. 以ribbon为例, 先学会使用ribbon, 知道ribbon具体有哪些功能, 知道其效果
2. 猜测ribbon是如何实现的, 也就是说, 如果是我们自己来实现ribbon的负载均衡功能, 我们要怎么做?
3. 看源码, 对比自己的思想和源码的异同.
这是一种有自己思考的学习方式. 对于学习源码来说也会觉得更有趣
其实,如果想在项目中使用ribbon, 这两步基础就ok了, 那么, 他到底是在底层如何运转的呢? 来看看ribbon的实现.
二. ribbon源码入口
1. 就从@LoadBalanced这个注解入手
点击进入到@LoadBalanced的源码
这个源码就是定义了一个注解,没有特殊的含义. 这是一个接口, 那么注解是在哪里被实现的呢?那么,就需要查源码调用了
2. 查找loadBalanced的实现类
怎么找实现类呢?入口在哪里?
方法一: 入口通常在META-INF/spring.factories文件里.里面找到引入了LoadBalanced 类的初始化类.
通过观察, 发现和LoadBalanced有关的自动配置类有两个, 二第一个关联性更大, 因为名字基本一样. 所以, 先定位到第一个
方法二: 纯经验猜测.
首先找到LoadBalanced注解所在的包, 然后看看里面有没有和LoadBalanced有关系的AutoConfiguration配置了, 这就是靠猜了
然后, 我们可以很容易的就看到LoadBalancerAutoConfiguration类. 这个是凭经验找到了, 接下来, 验证一下是不是这个类.
三. LoadBalancerAutoConfiguration自动配置类
看一看,它里面都注入了哪些东西呢?
首先, 初始化了一个LoadBalancerInterceptor拦截器,
代码语言:javascript复制@Bean
public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
这是一个拦截器, 也就是负载均衡具体实现的拦截器, 他是通过哪个拦截器实现的呢? 这里面是使用LoadBalancerInterceptor实现的. 其实, 这里面是真正实现负载均衡功能的地方
先简单看一下, 下面来看看另一个初始化方法
第二. 初始化RestTemplateCustomeizer方法
代码语言:javascript复制@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return (restTemplate) -> {
List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
首先看这个方法的入参, 方法的入参是谁呢? 就是第一步初始化的LoadBalancerInterceptor. 在@Bean注解中, 入参前面省略了@AutoWired, 也就是说, 相当于自动引入了LoadBalancerInterceptor类.
然后再来看返回值, 返回值是一个RestTemplateCustomizer, 这是一个接口, 里面就定义了一个方法
代码语言:javascript复制public interface RestTemplateCustomizer {
void customize(RestTemplate restTemplate);
}
而下面这段代码返回的一定是一个RestTemplateCustomeizer, 那么return的内容就是customize(RestTemplate restTemplate)的具体实现了
代码语言:javascript复制return (restTemplate) -> {
List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {....}具体实现是什么呢?
就是将我们上面定义的拦截器添加到restTemplate中. restTemplate哪里来的呢? 我们可以看到最上面定义了这个
代码语言:javascript复制@LoadBalanced
@Autowired(
required = false
)
private List<RestTemplate> restTemplates = Collections.emptyList();
他的含义是: 为每一个加了@LoadBalanced注解的RestTemplate, 都将其添加到restTemplates集合中. 然后对这个集合进行处理
第三步: 初始化SmartInitializingSingleton类
代码语言:javascript复制 @Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> {
restTemplateCustomizers.ifAvailable((customizers) -> {
Iterator var2 = this.restTemplates.iterator();
while(var2.hasNext()) {
RestTemplate restTemplate = (RestTemplate)var2.next();
Iterator var4 = customizers.iterator();
while(var4.hasNext()) {
RestTemplateCustomizer customizer = (RestTemplateCustomizer)var4.next();
customizer.customize(restTemplate);
}
}
});
};
}
这个类的入参是final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers, 也就是RestTemplate的定制器. 是一个list列表集合
具体实现是: 循环遍历this.restTemplates.iterator(), 也就是初始化时带有@LoadBalanced的RestTemplate, 然后执行里面的定制内容customize.
大概知道了自动配置里面引入了哪些类, 其中拦截器的实现是具体ribbon逻辑实现部分, 所以, 下面我们来看LoadBalancerInterceptor
四. LoadBalancerInterceptor拦截器实现类
代码语言:javascript复制小贴士: 看源码技巧
这里有一个规则, 如果看过滤器, 那么其他方法都不重要, 主要看filter的具体实现. 看拦截器, 其他都不重要, 主要看intercept方法的实现
直接看intercept方法的实现
代码语言:javascript复制package org.springframework.cloud.client.loadbalancer;
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
......
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " originalUri);
return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
}
具体实现: 第一步: 获取请求的uri, 我们这里的是http://stock/stock/reduct/count/1/2
第二步: 获取服务的host域名, 这里对应的就是stock.
第三步: this.loadBalancer.execute(serviceName, ......). 这里传入了服务名, 我们猜测一下实现. 首先会根据服务名获取集群, 然后在根据负载均衡策略找到ip, 然后调用http请求, 发送到指定ip
前两步不说了, 不那么重要, 来看第三步. 具体实现
代码语言:javascript复制 public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
Server server = this.getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " serviceId);
} else {
RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
return this.execute(serviceId, (ServiceInstance)ribbonServer, (LoadBalancerRequest)request);
}
}
这里代码很短, 简单看一下
- 第一步: 获取负载均衡器,
- ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
- 负载均衡器猜一下干嘛的, 一会我们选出了3个或者5个节点, 要使用负载均衡器来看看, 到底选择哪一个节点发送请求
- Server server = this.getServer(loadBalancer, hint);
- 将LoadBalancer作为参数传入, 最终过滤选择出一个server, 将请求发送到这个server上.
我们来看第一步: 获取负载均衡器 ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
在这一步走到getInstance(name, ILoadBalancer.class);根据名字获取一个负载均衡器. 返回的是一个ILoadBalancer, 我们知道这是一个接口, 那么他到底是什么类型的负载均衡器呢?
我们不知道, 那么能知道的是他一定是在某个地方初始化的时候, 指定了使用哪一个实现类. 而通常这用定义实现类的地方在Configuration的@Bean中.
根据这个思路, 我们在RibbonLoadBalancerClient包下找一找有没有类似的Configuration. 根据名字猜测, 最终找到了
代码语言:javascript复制RibbonClientConfiguration
而这里面刚好有ILoadBalancer的实现
代码语言:javascript复制 @Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
棕色部分的代码是去application.yml属性值取, 如果有特别指定使用哪个ILoadBalancer类, 那么优先使用配置中的, 如果配置没有, 则使用new ZoneAwareLoadBalancer<>(...)类.
好了, 我们找到这里就可以了, 先不看ZoneAwareLoadBalancer的具体实现.
接下来看这一步: Server server = this.getServer(loadBalancer, hint); 获取服务
代码语言:javascript复制package org.springframework.cloud.netflix.ribbon;
public class RibbonLoadBalancerClient implements LoadBalancerClient {
......
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
// Use 'default' on a null hint, or just pass it on?
return loadBalancer.chooseServer(hint != null ? hint : "default");
}
}
走到这段源码, 我们就知道了, 要低啊用LoadBalancer的chooseServer方法. 而在上一步,我们知道这个LoadBalancer的实现类是ZoneAwareLoadBalancer, 所以, 我们可以直接到ZoneAwareLoadBalancer里面找到chooseServer方法了.
这里指向的是亚马逊的区域, 在中国只有一个区, 所以, 这里的数量始终是1, 所以, 最后走的是else分支
调用return super.chooseServer(key);方法
在super的chooseServer(key)方法里, 其他都不重要, 重要的是this.rule.choose(key)方法
这里的rule是一个接口protected IRule rule; 联想一下, 现在有负载均衡器了, 那么还要有负载均衡规则, 而rule正是具体实现的负载均衡规则.
根据经验我们知道, 这个IRule接口一定是在某个地方通过@Bean被初始化了,
又是根据经验, 我们猜到这个初始化的位置, 应该和刚才ILoadBalancer在同一个地方, 因为他们是同一个功能的代码, 那么如果是我写的话, 我会把同一个功能的代码的初始化放在一个地方.
所以, 来看看初始化ILoadBalancer的类RibbonClientConfiguration有没有, 搜索一下IRule ,果然找到了
代码语言:javascript复制 @Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, name)) {
return this.propertiesFactory.get(IRule.class, config, name);
}
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
}
棕色部分的代码是先从application.yml配置文件中读取用户指定的IRule实现类. 如果没有, 就是用默认的ZoneAvoidanceRule实现类.
代码语言:javascript复制Ribbon的三大组件
1. Rule
2. Ping
3. LoadBalancer
这是ribbon三个最重要的组件, 他们三个都是在RibbonClientConfiguration被初始化的.
代码语言:javascript复制 @Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, name)) {
return this.propertiesFactory.get(IRule.class, config, name);
}
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
}
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
if (this.propertiesFactory.isSet(IPing.class, name)) {
return this.propertiesFactory.get(IPing.class, config, name);
}
return new DummyPing();
}
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
下面看看ZoneAvoidanceRule的实现, 为什么看看实现呢, 要了解他的父类继承关系, 因为很可能在调用的时候, 不是调用的它本身, 而是调用的父类
我们记住zoneAvoidanceRule的一个父类是PredicateBaseRule.
下面我们在回到this.rule.choose(key)这个方法上来, 这回我们就知道这里调用的choose()是哪一个类下面的了, 他是PredicateBaseRule下面的.
代码语言:javascript复制 @Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
很显然这里采用的是轮训策略选择服务器. 具体的轮训策略是如何选择的呢? 具体来看看chooseRoundRobinAfterFiltering()方法
代码语言:javascript复制/**
* Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key.
*/
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
// 获取全部可用的服务
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
// 采用的服务策略.
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
代码语言:javascript复制 private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextIndex.get();
int next = (current 1) % modulo;
if (nextIndex.compareAndSet(current, next) && current < modulo)
return current;
}
}
有效的服务数 1取模, 为了防止并发, 这里使用了CAS的思想, 比较并赋值. 如果赋值失败, 会再次进入for循环, 知道成功为止
在选择轮训策略的时候, chooseRoundRobinAfterFiltering(lb.getAllServers(), key); 我们传进来了一个参数 ,lb.getAllServers(), 获取负载均衡中的所有服务.这都有哪些服务呢?往前推理, 应该是有某个地方传入了这个参数, 或者通过某个参数计算得到了服务列表. ---> 这时我们也不知道在哪里, 那就看看构造方法吧, 看谁的构造方法呢, 负载均衡器的构造方法
前面已经知道负载均衡器使用的是ZoneAwareLoadBalancer, 调用了父类的构造方法
代码语言:javascript复制public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}
下面来看父类的构造方法
代码语言:javascript复制public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
一般情况下, 看到init方法和start方法, 都要进去看看, 这里通常都是重点, 里面有内容
代码语言:javascript复制 void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
启动, 初始化服务, 而且还要不断你的学习. 这里的学习是什么意思呢? 其实就是不停的问问nacos, 服务列表有更新么?有更新,我就去拉取过来, 更新本地的服务.
具体的enableAndInitLearnNewServersFeature()方法是什么呢
代码语言:javascript复制 public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
updateAction是要更新的动作, 也就是最终更新操作是在updateAction中执行的, 来看看为什么这么说呢
代码语言:javascript复制 protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
这里updateAction这个变量是UpdateAction的一个实例对象, 这个对象有一个方法doUpdate(), 而doUpdate()方法中调用了updateListOfServers()方法.
代码语言:javascript复制 @VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
更新服务. 这是具体的更新服务的方法
这个更新操作是什么时候被执行的呢?serverListUpdater.start(updateAction); 在这里, 一看见start, 知道是个类似于定时任务的方法, 很重要. 进去看看
代码语言:javascript复制@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
updateAction.doUpdate();就是上面执行的方法 , 这个方法的返回值是一个Runnable线程, 然后继续往下看, 将runnable线程传给了一个定时任务. 定时执行更新操作.
上面的操作其实就是上图, ribbon和nacos同步服务集群数据的过程. 我们知道, 在nacos中有一个注册表用来存储服务注册过来的信息, 项目启动后, 这些元数据信息会同步回传给ribbon, ribbon会在本地维护一个注册表. 但这个注册表可能随时变化, 所以, 需要定期去同步更新服务数据. 所以.
1. 初始化的时候去nacos去服务注册表数据
2. 定时任务同步获取注册表数据
代码语言:javascript复制@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
更新服务的三个重要的方法
1. getUpdatedListOfServers : 查询nacos中的服务实例, 在查询之前更新, 然后返回最新的服务列表
2. getFilteredListOfServers: 过滤服务实例, 将满足条件的服务实例过滤, 不满足条件的去掉
3. updateAllServerList: 更新服务实例