Spring Cloud Gateway 作为新一代网关,在性能上有很大提升,并且附加了诸如限流等实用的功能。
限流
开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。API 网关作为所有请求的入口,请求量大,我们可以通过对并发访问的请求进行限速来保护系统的可用性。
常用的限流算法
常用的限流算法由:漏桶算法和令牌桶算法。
1、漏桶算法
漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率。
可见这里有两个变量,一个是桶的大小,支持流量突发增多时可以存多少的水(burst),另一个是水桶漏洞的大小(rate)。
因为漏桶的漏出速率是固定的参数,所以,即使网络中不存在资源冲突(没有发生拥塞),漏桶算法也不能使流突发(burst)到端口速率。因此,漏桶算法对于存在突发特性的流量来说缺乏效率。
漏桶作为计量工具(The Leaky Bucket Algorithm as a Meter)时,可以用于流量整形(Traffic Shaping)和流量控制(TrafficPolicing),漏桶算法的描述如下:
- 一个固定容量的漏桶,按照常量固定速率流出水滴;
- 如果桶是空的,则不需流出水滴;
- 可以以任意速率流入水滴到漏桶;
- 如果流入水滴超出了桶的容量,则流入的水滴溢出了(被丢弃),而漏桶容量是不变的。
2、令牌桶算法
令牌桶算法(Token Bucket)和 Leaky Bucket 效果一样但方向相反的算法。随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了。新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务。
令牌桶的另外一个好处是可以方便的改变速度。 一旦需要提高速率,则按需提高放入桶中的令牌的速率。 一般会定时(比如100毫秒)往桶中增加一定数量的令牌,有些变种算法则实时的计算应该增加的令牌的数量。
令牌桶算法是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌。令牌桶算法的描述如下:
- 假设限制2r/s,则按照500毫秒的固定速率往桶中添加令牌;
- 桶中最多存放b个令牌,当桶满时,新添加的令牌被丢弃或拒绝;
- 当一个n个字节大小的数据包到达,将从桶中删除n个令牌,接着数据包被发送到网络上;
- 如果桶中的令牌不足n个,则不会删除令牌,且该数据包将被限流(要么丢弃,要么缓冲区等待)。
令牌桶和漏桶对比:
- 令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;
- 漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;
- 令牌桶限制的是平均流入速率(允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌),并允许一定程度突发流量;
- 漏桶限制的是常量流出速率(即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2),从而平滑突发流入速率;
- 令牌桶允许一定程度的突发,而漏桶主要目的是平滑流入速率;
- 两个算法实现可以一样,但是方向是相反的,对于相同的参数得到的限流效果是一样的。
基于Redis的限流方案
Guava RateLimiter只能应用于单进程,计数器限流一般用来限制总并发数,比如数据库连接池、线程池、秒杀的并发数,这种只要全局总请求数或者一定时间段的总请求数设定的阀值则进行限流,只是简单粗暴的总数量限流,而不是平均速率限流。
spring cloud gateway默认基于redis令牌桶算法进行微服务的限流保护,采用RateLimter限流算法来实现。
引入依赖
代码语言:javascript复制<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
配置KeyResolver——RateLimiteConfig.java(接口限流/ip限流/用户限流)
我们可以通过 KeyResolver 来指定限流的 Key
代码语言:javascript复制import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import reactor.core.publisher.Mono;
/**
* 限流配置KeyResolver——有三种写法(接口限流/ip限流/用户限流)
*/
@Configuration
public class RateLimiteConfig {
/**
* 接口限流:根据请求路径限流
* 如果不使用@Primary注解,会报错
* @return
*/
@Bean
@Primary
public KeyResolver pathKeyResolver() {
return exchange -> Mono.just(exchange.getRequest().getPath().toString());
/*//写法2
return new KeyResolver() {
@Override
public Mono<String> resolve(ServerWebExchange exchange) {
return Mono.just(exchange.getRequest().getPath().toString());
}
};*/
}
/**
* 根据请求IP限流
* @return
*/
@Bean
public KeyResolver ipKeyResolver() {
return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName()
);
}
/**
* 根据请求参数中的userId进行限流
* 请求地址写法:http://localhost:8801/rate/123?userId=lisi
* @return
*/
@Bean
public KeyResolver userKeyResolver() {
return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("userId")
);
}
}
配置限流的过滤器信息
- filter 名称必须是 RequestRateLimiter。
- redis-rate-limiter.replenishRate:允许用户每秒处理多少个请求。
- redis-rate-limiter.burstCapacity:令牌桶的容量,允许在 1s 内完成的最大请求数。
- key-resolver:使用 SpEL 按名称引用 bean。
spring:
cloud:
gateway:
routes:
- id: rate-limit-demo
uri: lb://mima-cloud-producer
predicates:
#访问路径:http://localhost:8801/rate/123
- Path=/rate/**
filters:
- name: RequestRateLimiter
args:
# 令牌桶每秒填充平均速率, 允许用户每秒处理多少个请求。
redis-rate-limiter.replenishRate: 1
# 令牌桶的容量,允许在1s内完成的最大请求数。
redis-rate-limiter.burstCapacity: 2
# 使用SpEL表达式从Spring容器中获取Bean对象, 查看RateLimiteConfig实现类中的方法名
key-resolver: "#{@pathKeyResolver}"
#key-resolver: "#{@ipKeyResolver}"
#key-resolver: "#{@userKeyResolver}"
访问接口进行测试,当频繁刷新请求接口时,控制台会报429错误状态码,提示我们请求过多,如下:
代码语言:javascript复制[开始]请求路径:/rate/123
[应答]请求路径:/rate/123耗时:2ms
2020-09-08 16:23:27.253 DEBUG 18512 --- [ioEventLoop-4-1] o.s.w.s.adapter.HttpWebHandlerAdapter : [62eb90e0] Completed 429 TOO_MANY_REQUESTS
2020-09-08 16:23:27.394 DEBUG 18512 --- [ctor-http-nio-2] o.s.w.s.adapter.HttpWebHandlerAdapter : [62eb90e0] HTTP GET "/rate/123"
corsFilter... run
[开始]请求路径:/rate/123
[应答]请求路径:/rate/123耗时:2ms
2020-09-08 16:23:27.397 DEBUG 18512 --- [ioEventLoop-4-1] o.s.w.s.adapter.HttpWebHandlerAdapter : [62eb90e0] Completed 429 TOO_MANY_REQUESTS
2020-09-08 16:23:27.536 DEBUG 18512 --- [ctor-http-nio-2] o.s.w.s.adapter.HttpWebHandlerAdapter : [62eb90e0] HTTP GET "/rate/123"
corsFilter... run
当发生限流时,会向redis中存储两个数据
127.0.0.1:6379> keys * 1) "request_rate_limiter.{localhost}.timestamp" 2) "request_rate_limiter.{localhost}.tokens"
大括号中就是我们的限流 Key,这里是 IP,本地的就是 localhost。
timestamp:存储的是当前时间的秒数,也就是 System.currentTimeMillis()/1000 或者 Instant.now().getEpochSecond()。
tokens:存储的是当前这秒钟对应的可用令牌数量。
熔断回退实战
在 Spring Cloud Gateway 中使用 Hystrix 进行回退需要增加 Hystrix 的依赖,代码如下所示。
代码语言:javascript复制<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
内置了 HystrixGatewayFilterFactory 来实现路由级别的熔断,只需要配置即可实现熔断回退功能。配置方式如下所示。
代码语言:javascript复制- id: user-service
uri: lb://user-service
predicates:
- Path=/user-service/**
filters:
- name: Hystrix
args:
name: fallbackcmd
fallbackUri: forward:/fallback
上面配置了一个 Hystrix 过滤器,该过滤器会使用 Hystrix 熔断与回退,原理是将请求包装成 RouteHystrixCommand 执行,RouteHystrixCommand 继承于 com.netflix.hystrix.HystrixObservableCommand。 fallbackUri 是发生熔断时回退的 URI 地址,目前只支持 forward 模式的 URI。如果服务被降级,该请求会被转发到该 URI 中。 在网关中创建一个回退的接口,用于熔断时处理返回给调用方的信息,代码如下所示。
代码语言:javascript复制@RestController
public class FallbackController {
@GetMapping("/fallback")
public String fallback() {
return "fallback";
}
}
跨域实战
在 Spring Cloud Gateway 中配置跨域有两种方式,分别是代码配置方式和配置文件方式。 代码配置方式配置跨域,具体代码如下所示。
代码语言:javascript复制@Configuration
public class CorsConfig {
@Bean
public WebFilter corsFilter() {
return (ServerWebExchange ctx, WebFilterChain chain) -> {
ServerHttpRequest request = ctx.getRequest();
if (CorsUtils.isCorsRequest(request)) {
HttpHeaders requestHeaders = request.getHeaders();
ServerHttpResponse response = ctx.getResponse();
HttpMethod requestMethod = requestHeaders.getAccessControlRequestMethod();
HttpHeaders headers = response.getHeaders();
headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN, requestHeaders.getOrigin());
headers.addAll(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS,
requestHeaders.getAccessControlRequestHeaders());
if (requestMethod != null) {
headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS, requestMethod.name());
}
headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");
headers.add(HttpHeaders.ACCESS_CONTROL_EXPOSE_HEADERS, "*");
if (request.getMethod() == HttpMethod.OPTIONS) {
response.setStatusCode(HttpStatus.OK);
return Mono.empty();
}
}
return chain.filter(ctx);
};
}
}
配置文件方式配置跨域:
代码语言:javascript复制spring:
cloud:
gateway:
globalcors:
corsConfigurations:
'[/**]':
allowedOrigins: "*"
exposedHeaders:
- content-type
allowedHeaders:
- content-type
allowCredentials: true
allowedMethods:
- GET
- OPTIONS
- PUT
- DELETE
- POST
统一异常处理
Spring Cloud Gateway 中的全局异常处理不能直接使用 @ControllerAdvice,可以通过跟踪异常信息的抛出,找到对应的源码,自定义一些处理逻辑来匹配业务的需求。 网关是给接口做代理转发的,后端对应的是 REST API,返回数据格式是 JSON。如果不做处理,当发生异常时,Gateway 默认给出的错误信息是页面,不方便前端进行异常处理。 所以我们需要对异常信息进行处理,并返回 JSON 格式的数据给客户端。下面先看实现的代码,后面再跟大家讲一下需要注意的地方。 自定义异常处理逻辑,代码如下所示。
代码语言:javascript复制public class JsonExceptionHandler extends DefaultErrorWebExceptionHandler {
public JsonExceptionHandler(ErrorAttributes errorAttributes, ResourceProperties resourceProperties,
ErrorProperties errorProperties, ApplicationContext applicationContext) {
super(errorAttributes, resourceProperties, errorProperties, applicationContext);
}
/**
* 获取异常属性
*/
@Override
protected Map<String, Object> getErrorAttributes(ServerRequest request, boolean includeStackTrace) {
int code = 500;
Throwable error = super.getError(request);
if (error instanceof org.springframework.cloud.gateway.support.NotFoundException) {
code = 404;
}
return response(code, this.buildMessage(request, error));
}
/**
* 指定响应处理方法为JSON处理的方法
*
* @param errorAttributes
*/
@Override
protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse);
}
/**
* 根据code获取对应的HttpStatus
*
* @param errorAttributes
*/
@Override
protected HttpStatus getHttpStatus(Map<String, Object> errorAttributes) {
int statusCode = (int) errorAttributes.get("code");
return HttpStatus.valueOf(statusCode);
}
/**
* 构建异常信息
*
* @param request
* @param ex
* @return
*/
private String buildMessage(ServerRequest request, Throwable ex) {
StringBuilder message = new StringBuilder("Failed to handle request [");
message.append(request.methodName());
message.append(" ");
message.append(request.uri());
message.append("]");
if (ex != null) {
message.append(": ");
message.append(ex.getMessage());
}
return message.toString();
}
/**
* 构建返回的JSON数据格式
*
* @param status 状态码
* @param errorMessage 异常信息
* @return
*/
public static Map<String, Object> response(int status, String errorMessage) {
Map<String, Object> map = new HashMap<>();
map.put("code", status);
map.put("message", errorMessage);
map.put("data", null);
return map;
}
}
覆盖默认的配置,代码如下所示。
代码语言:javascript复制@Configuration
@EnableConfigurationProperties({ ServerProperties.class, ResourceProperties.class })
public class ErrorHandlerConfiguration {
private final ServerProperties serverProperties;
private final ApplicationContext applicationContext;
private final ResourceProperties resourceProperties;
private final List<ViewResolver> viewResolvers;
private final ServerCodecConfigurer serverCodecConfigurer;
public ErrorHandlerConfiguration(ServerProperties serverProperties, ResourceProperties resourceProperties,
ObjectProvider<List<ViewResolver>> viewResolversProvider, ServerCodecConfigurer serverCodecConfigurer,
ApplicationContext applicationContext) {
this.serverProperties = serverProperties;
this.applicationContext = applicationContext;
this.resourceProperties = resourceProperties;
this.viewResolvers = viewResolversProvider.getIfAvailable(Collections::emptyList);
this.serverCodecConfigurer = serverCodecConfigurer;
}
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public ErrorWebExceptionHandler errorWebExceptionHandler(ErrorAttributes errorAttributes) {
JsonExceptionHandler exceptionHandler = new JsonExceptionHandler(errorAttributes,
this.resourceProperties,this.serverProperties.getError(), this.applicationContext);
exceptionHandler.setViewResolvers(this.viewResolvers);
exceptionHandler.setMessageWriters(this.serverCodecConfigurer.getWriters());
exceptionHandler.setMessageReaders(this.serverCodecConfigurer.getReaders());
return exceptionHandler;
}
}
1. 异常时如何返回 JSON 而不是 HTML?
在 org.springframework.boot.autoconfigure.web.reactive.error.DefaultErrorWeb-Exception-Handler 中的 getRoutingFunction() 方法就是控制返回格式的,源代码如下所示。
代码语言:javascript复制@Override
protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
return RouterFunctions.route(acceptsTextHtml(), this::renderErrorView).andRoute(RequestPredicates.all(), this::renderErrorResponse);
}
这里优先是用 HTML 来显示的,如果想用 JSON 显示改动就可以了,具体代码如下所示。
代码语言:javascript复制protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
return RouterFunctions.route(RequestPredicates.all(),this::renderErrorResponse);
}
2. getHttpStatus 需要重写
原始的方法是通过 status 来获取对应的 HttpStatus 的,具体代码如下所示。
代码语言:javascript复制protected HttpStatus getHttpStatus(Map<String, Object> errorAttributes) {
int statusCode = (int) errorAttributes.get("status");
return HttpStatus.valueOf(statusCode);
}
如果我们定义的格式中没有 status 字段的话,就会报错,因为找不到对应的响应码。要么返回数据格式中增加 status 子段,要么重写,在笔者的操作中返回的是 code,所以要重写,代码如下所示。
代码语言:javascript复制@Override
protected HttpStatus getHttpStatus(Map<String, Object> errorAttributes) {
int statusCode = (int) errorAttributes.get("code");
return HttpStatus.valueOf(statusCode);
}
重试机制
RetryGatewayFilter 是 Spring Cloud Gateway 对请求重试提供的一个 GatewayFilter Factory。配置方式如下所示。
代码语言:javascript复制spring:
cloud:
gateway:
routes:
- id: zuul-encrypt-service
uri: lb://zuul-encrypt-service
predicates:
- Path=/data/**
filters:
- name: Retry
args:
retries: 3
series: SERVER_ERROR
上述代码中具体参数含义如下所示。
- retries:重试次数,默认值是 3 次。
- series:状态码配置(分段),符合某段状态码才会进行重试逻辑,默认值是 SERVER_ERROR,值是 5,也就是 5XX(5 开头的状态码),共有 5 个值,代码如下所示。
public enum Series {
INFORMATIONAL(1), SUCCESSFUL(2), REDIRECTION(3), CLIENT_ERROR(4), SERVER_ERROR(5);
}
上述代码中具体参数含义如下所示。
- statuses:状态码配置,和 series 不同的是这里是具体状态码的配置,取值请参考 org.springframework.http.HttpStatus。
- methods:指定哪些方法的请求需要进行重试逻辑,默认值是 GET 方法,取值代码如下所示。
public enum HttpMethod {
GET, HEAD, POST, PUT, PATCH, DELETE, OPTIONS, TRACE;
}
上述代码中具体参数含义如下所示。 exceptions:指定哪些异常需要进行重试逻辑。默认值是 java.io.IOException 和 org.springframework.cloud.gateway.support.TimeoutException。
参考:
Spring Cloud Gateway实战案例:http://m.biancheng.net/view/5447.html
分布式环境下限流方案的实现:https://www.cnblogs.com/softidea/p/6229543.html
Gateway Redis令牌桶请求限流过滤器:https://www.cnblogs.com/linjiqin/p/13633607.html