什么是灰度发布
介绍灰度发布流程之前我先一句话介绍一下什么是灰度发布。灰度发布就是,线上app无需停机就可以保证运行的是经过测试的稳定版本,且我们在冒烟测试时也不会影响到线上App的运行。
为什么我们要搞灰度发布
线上的服务每次都是我来构建,我可以非常负责任的讲,冒烟测试时不重新发布的几率很小,而且很多时候需要我去定位线上问题,这个过程我不知道你们痛不痛苦,反正我是很痛苦。换个角度分析,如果我是正在使用App的用户我会吐槽:这是什么牛马App,一天能卡个好几次,一次卡个几分钟,这还用个大锤子!为了能让用户有更好的体验,也为了我不在那么痛苦所以我们急需要一款灰度发布系统。
灰度发布系统怎么搞
一个很简单的理论,同时准备两份服务,让符合规则的请求路由到灰度接口,不符合规则的路由到之前发布的服务就好了哇~
代码实现
熟悉SpringCloudGateway的同学对于gateway的路由配置不会很陌生,以下面的基础配置为例简单的讲解一下
代码语言:javascript复制spring:
cloud:
gateway:
default-filters:
- DedupeResponseHeader=Access-Control-Allow-Credentials Access-Control-Allow-Origin
routes:
- id: web-server # 路由的唯一ID,配合业务命名不重复即可
uri: lb://web-server
predicates: # 断言
- Path=/web-api/api3/** # 路径断言,路径匹配测进行路由
filters:
- RewritePath=/web-api/api/(?<segment>/?.*),/api/${segment}
当Gateway识别到请求符合某个断言后,就会将请求路由到该组断言对应的uri下。让符合规则的请求路由到灰度接口,不符合规则的路由到之前发布的服务就好了哇~,对于这个简单的需求我们只要在path断言不变的前提下在增加一个管理规则的断言不就可以了吗,以指定请求头包含指定value的规则为例,我们就可以自定义如下的断言。
自定义Gateway断言工厂
自定义Gateway的断言工厂那是相当的easy呀,你只需要复制我下面的代码修改下apply方法中规则为你想要的规则即可,你要是不信你也可以随便点进一个Gateway官方定义的任意一个断言工厂,官方的代码就是这么写的。
代码语言:javascript复制
import com.alibaba.nacos.api.utils.StringUtils;
import lombok.*;
import org.springframework.cloud.gateway.handler.predicate.AbstractRoutePredicateFactory;
import org.springframework.cloud.gateway.handler.predicate.GatewayPredicate;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
/**
* @author jiangtongxue
* @date 2022/3/17 16:10
*/
@Component
public class HeaderUsernameRoutePredicateFactory extends AbstractRoutePredicateFactory<HeaderUsernameRoutePredicateFactory.Config> {
public static final String USERNAME = "canaryFlag";
public HeaderUsernameRoutePredicateFactory() {
super(Config.class);
}
@Override
public ShortcutType shortcutType() {
return ShortcutType.GATHER_LIST;
}
@Override
public List<String> shortcutFieldOrder() {
return Collections.singletonList("canaryFlag");
}
@Override
public Predicate<ServerWebExchange> apply(Config config) {
List<String> usernames = Arrays.asList(config.getCanaryFlag().split(","));
return new GatewayPredicate() {
@Override
public boolean test(ServerWebExchange serverWebExchange) {
String username = serverWebExchange.getRequest().getHeaders().getFirst(USERNAME);
if (!StringUtils.isBlank(username)) {
return usernames.contains(username);
}
return false;
}
@Override
public String toString() {
return String.format("Header: canaryFlag=%s", config.canaryFlag);
}
};
}
@NoArgsConstructor
@Getter
@Setter
@ToString
public static class Config {
String canaryFlag;
}
}
使用自定义的断言工厂
注意关注我上面自定义断言的类名,把RoutePredicateFactory后缀去掉就是断言的使用方法。
代码语言:javascript复制spring:
cloud:
gateway:
default-filters:
- DedupeResponseHeader=Access-Control-Allow-Credentials Access-Control-Allow-Origin
routes:
- id: web-server
uri: lb://web-server
predicates: # 断言
- Path=/web-api/api3/**
- HeaderUsername=jiangtongxue,chuitongxue #使用自定义的断言工厂
filters:
- RewritePath=/web-api/api/(?<segment>/?.*),/api/${segment}
有个小问题
如果是单机的小服务,我们可以修改url来映射到不同的接口,但是对于微服务的集群我们要怎么搞嘞。
自定义Gateway全局过滤器借助Nacos的元数据进行负载均衡
自定义全局过滤器也是very的easy呀,我们只需要新建一个类继承GlobalFilter和Ordered接口就好啦,getOrder返回的值越大,执行的越靠后。
代码语言:javascript复制
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.http.HttpHeaders;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* @author jiangtongxue
* @date 2022/3/17 16:16
*/
@Slf4j
public class GrayRoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private final ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
private final String serviceId;
private final AtomicInteger position;
public GrayRoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
this.position = new AtomicInteger(new Random().nextInt());
}
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
HttpHeaders headers = (HttpHeaders) request.getContext();
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next().map(list -> getInstanceResponse(list, headers));
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, HttpHeaders headers) {
List<ServiceInstance> serviceInstances = instances.stream()
.filter(instance -> {
//根据请求头中的版本号信息,选取注册中心中的相应服务实例
String version = headers.getFirst("Version");
if (version != null) {
return version.equals(instance.getMetadata().get("version"));
} else {
return true;
}
}).collect(Collectors.toList());
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " serviceId);
}
return new EmptyResponse();
}
int pos = Math.abs(this.position.incrementAndGet());
if (serviceInstances.size() == ) {
return new EmptyResponse();
}
ServiceInstance instance = serviceInstances.get(pos % serviceInstances.size());
return new DefaultResponse(instance);
}
}
代码语言:javascript复制
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultRequest;
import org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;
import org.springframework.cloud.gateway.support.DelegatingServiceInstance;
import org.springframework.cloud.gateway.support.NotFoundException;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.core.Ordered;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.net.URI;
/**
* @author jiangtongxue
* @date 2022/3/17 16:18
*/
@Slf4j
@Component
public class GrayReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {
private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = ;
private final LoadBalancerClientFactory clientFactory;
public GrayReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory) {
this.clientFactory = clientFactory;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = (URI) exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = (String) exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
if (url != null && ("gray-lb".equals(url.getScheme()) || "gray-lb".equals(schemePrefix))) {
ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
if (log.isTraceEnabled()) {
log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() " url before: " url);
}
return this.choose(exchange).doOnNext((response) -> {
if (!response.hasServer()) {
throw NotFoundException.create(true, "Unable to find instance for " url.getHost());
} else {
URI uri = exchange.getRequest().getURI();
String overrideScheme = null;
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance((ServiceInstance) response.getServer(), overrideScheme);
URI requestUrl = this.reconstructURI(serviceInstance, uri);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " requestUrl);
}
exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
}
}).then(chain.filter(exchange));
} else {
return chain.filter(exchange);
}
}
private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {
URI uri = (URI) exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
assert uri != null;
GrayRoundRobinLoadBalancer loadBalancer = new GrayRoundRobinLoadBalancer(clientFactory.getLazyProvider(uri.getHost(), ServiceInstanceListSupplier.class), uri.getHost());
return loadBalancer.choose(this.createRequest(exchange));
}
@SuppressWarnings("rawtypes")
private Request createRequest(ServerWebExchange exchange) {
HttpHeaders headers = exchange.getRequest().getHeaders();
return new DefaultRequest<>(headers);
}
protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
}
@Override
public int getOrder() {
return LOAD_BALANCER_CLIENT_FILTER_ORDER;
}
}
最终配置
代码语言:javascript复制spring:
cloud:
gateway:
default-filters:
- DedupeResponseHeader=Access-Control-Allow-Credentials Access-Control-Allow-Origin
routes:
- id: user-route-gray
uri: gray-lb://pins-platform
predicates:
- HeaderUsername=jiangtongxue,chuitongxue #使用自定义的断言工厂
- Path=/gray/platform-api/**
filters:
- AddRequestHeader=Version,canary
- RewritePath=/gray/platform-api/(?<segment>/?.*),/platform-api/${segment}
- id: user-route
uri: gray-lb://pins-platform
predicates:
- Path=/gray/platform-api/**
filters:
- AddRequestHeader=Version,release
- RewritePath=/gray/platform-api/(?<segment>/?.*),/platform-api/${segment}
最后只要是请求中具有canaryFlag的请求头且值为jiangtongxue或者chuitongxue的请求就会路由到release版本的服务集群上~
更严峻的挑战
内部的RPC请求也要进行灰度路由,所以还要重新内部服务调用的负载均衡算法。