Spring Cloud升级之路 - Hoxton - 7. 后续更新(WebFlux等)

2021-04-12 14:42:10 浏览数 (1)

1. 修正实例列表乱序导致的负载均衡重试相同实例的问题

虽然之前考虑了通过每个请求的traceId隔离负载均衡的position来实现重试不会重试相同实例的问题,但是没有考虑在负载均衡过程中,实例列表的更新。

例如:

  • 请求第一次调用负载均衡,实例列表是:[实例1,实例2],position为1,对2取余=1,所以请求发送到实例2上面了
  • 请求失败,触发重试,实例列表缓存失效,更新后变成了:[实例2,实例1],position为2,对2取余=0,所以请求又发送到实例2上面了
代码语言:javascript复制
private Response getInstanceResponse(List serviceInstances) {
    if (serviceInstances.isEmpty()) {
        log.warn("No servers available for service: "   this.serviceId);
        return new EmptyResponse();
    }
    Span currentSpan = tracer.currentSpan();
    //如果没有 traceId,就生成一个新的,但是最好检查下为啥会没有
    //是不是 MQ 消费这种没有主动生成 traceId 的情况,最好主动生成下
    if (currentSpan == null) {
        currentSpan = tracer.newTrace();
    }
    long l = currentSpan.context().traceId();
    int seed = positionCache.get(l).getAndIncrement();
    //这里,serviceInstances可能与上次的内容不同
    //例如上次是实例1,实例2
    //这次是实例2,实例1
    return new DefaultResponse(serviceInstances.get(seed % serviceInstances.size()));
}

所以,在这里追加排序,保证实例有序,从而进一步不会重试相同的实例。

代码语言:javascript复制
private Response getInstanceResponse(List serviceInstances) {
    if (serviceInstances.isEmpty()) {
        log.warn("No servers available for service: "   this.serviceId);
        return new EmptyResponse();
    }
    Span currentSpan = tracer.currentSpan();
    //如果没有 traceId,就生成一个新的,但是最好检查下为啥会没有
    //是不是 MQ 消费这种没有主动生成 traceId 的情况,最好主动生成下
    if (currentSpan == null) {
        currentSpan = tracer.newTrace();
    }
    long l = currentSpan.context().traceId();
    int seed = positionCache.get(l).getAndIncrement();
    return new DefaultResponse(serviceInstances.stream().sorted(Comparator.comparing(ServiceInstance::getInstanceId)).collect(Collectors.toList()).get(seed % serviceInstances.size()));
}

2. WebFlux环境兼容与WebClient实现相同功能

maven依赖:

代码语言:javascript复制
        org.springframework.boot
        spring-boot-starter-parent
        2.2.7.RELEASE
    

    
        3.4.2
        1.1.0
    

    
        
        
        
            com.github.ben-manes.caffeine
            caffeine
        

        
        
            org.springframework.boot
            spring-boot-starter
            
                
                    org.springframework.boot
                    spring-boot-starter-logging
                
            
        
        
            org.springframework.boot
            spring-boot-starter-log4j2
        

        
        
            org.projectlombok
            lombok
        

        
        
            org.springframework.cloud
            spring-cloud-starter-netflix-eureka-client
        

        
        
            org.springframework.cloud
            spring-cloud-starter-openfeign
        
        
        
            org.springframework.cloud
            spring-cloud-starter-sleuth
        
        
        
            org.springframework.boot
            spring-boot-starter-actuator
        
        
            org.springframework.boot
            spring-boot-starter-webflux
        
        
            org.apache.httpcomponents
            httpclient
        
        
            org.springframework.cloud
            spring-cloud-starter-circuitbreaker-reactor-resilience4j
        
        
            io.github.resilience4j
            resilience4j-spring-cloud2
            ${resilience4j-spring-cloud2.version}
        
        
        
            com.lmax
            disruptor
            ${disruptor.version}
        
    

    
        
            
                org.springframework.cloud
                spring-cloud-dependencies
                Hoxton.SR4
                pom
                import

其他的配置是一样的,重点在于,如何使用WebClient调用其他微服务,并且实现针对Get请求重试或者是所有请求的网络 IO 异常,例如connect timeout等等,或者是断路器异常(因为请求还没发出)。

WebClient可以加入各种Filter,通过实现这些Filter来实现实例级别的断路器还有重试。

源码:WebClientConfig.java

实现重试:

代码语言:javascript复制
private static class RetryFilter implements ExchangeFilterFunction {
    private final String serviceName;

    private RetryFilter(String serviceName) {
        this.serviceName = serviceName;
    }

    @Override
    public Mono filter(ClientRequest clientRequest, ExchangeFunction exchangeFunction) {
        return exchangeFunction.exchange(clientRequest).retryWhen(Retry.onlyIf(retryContext -> {
            //get请求一定重试
            return clientRequest.method().equals(HttpMethod.GET)
                    //connect Timeout 是一种 IOException
                    || retryContext.exception() instanceof IOException
                    //实例级别的断路器的Exception
                    || retryContext.exception() instanceof CallNotPermittedException;
        }).retryMax(1).exponentialBackoff(Duration.ofMillis(100), Duration.ofMillis(1000)));
    }
}

实例级别的断路器:

代码语言:javascript复制
private static class InstanceCircuitBreakerFilter implements ExchangeFilterFunction {
    private final String serviceName;
    private final CircuitBreakerRegistry circuitBreakerRegistry;
    ;

    private InstanceCircuitBreakerFilter(String serviceName, CircuitBreakerRegistry circuitBreakerRegistry) {
        this.serviceName = serviceName;
        this.circuitBreakerRegistry = circuitBreakerRegistry;
    }

    @Override
    public Mono filter(ClientRequest clientRequest, ExchangeFunction exchangeFunction) {
        CircuitBreaker circuitBreaker;
        //这时候的url是经过负载均衡器的,是实例的url
        String instancId = clientRequest.url().getHost()   ":"   clientRequest.url().getPort();
        try {
            //使用实例id新建或者获取现有的CircuitBreaker,使用serviceName获取配置
            circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId, serviceName);
        } catch (ConfigurationNotFoundException e) {
            circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId);
        }

        return exchangeFunction.exchange(clientRequest).transform(CircuitBreakerOperator.of(circuitBreaker));
    }
}

组装调用某个微服务(这里是service-provider)的WebClient

代码语言:javascript复制
public static final String SERVICE_PROVIDER = "service-provider";

@Autowired
private ReactorLoadBalancerExchangeFilterFunction lbFunction;

@Bean(SERVICE_PROVIDER)
public WebClient getWebClient(CircuitBreakerRegistry circuitBreakerRegistry) {
    ConnectionProvider provider = ConnectionProvider.builder(SERVICE_PROVIDER)
            .maxConnections(50).pendingAcquireTimeout(Duration.ofSeconds(5)).build();
    HttpClient httpClient = HttpClient.create(provider)
            .tcpConfiguration(client ->
                    //链接超时
                    client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500)
                            .doOnConnected(conn -> conn
                                    //读取超时
                                    .addHandlerLast(new ReadTimeoutHandler(1))
                                    .addHandlerLast(new WriteTimeoutHandler(1))
                            )
            );

    return WebClient.builder()
            .clientConnector(new ReactorClientHttpConnector(httpClient))
            //Retry在负载均衡前
            .filter(new RetryFilter(SERVICE_PROVIDER))
            //负载均衡器,改写url
            .filter(lbFunction)
            //实例级别的断路器需要在负载均衡获取真正地址之后
            .filter(new InstanceCircuitBreakerFilter(SERVICE_PROVIDER, circuitBreakerRegistry))
            .baseUrl("http://"   SERVICE_PROVIDER)
            .build();
}

这样,我们就可以实现和之前feign一样的微服务调用了。

代码语言:javascript复制
@Log4j2
@RestController
public class TestController {
    @Resource(name = WebClientConfig.SERVICE_PROVIDER)
    private WebClient webClient;

    @RequestMapping("/testGetTimeOut")
    public Mono testGetTimeOut() {
        return webClient.get().uri("/test-read-time-out")
                .retrieve()
                .bodyToMono(new ParameterizedTypeReference<>() {
                });
    }

    @RequestMapping("/testPostTimeOut")
    public Mono testPostTimeOut() {
        return webClient.post().uri("/test-read-time-out")
                .retrieve()
                .bodyToMono(new ParameterizedTypeReference<>() {
                });
    }
}

0 人点赞