修复 Spring Cloud Gateway 项目中无法通过 Skywalking 追踪 WebClient 调用的问题

2022-03-05 16:54:58 浏览数 (3)

解决 Spring Cloud Gateway 项目中无法追踪 WebClient 调用的问题

问题描述

Skywalking 通过 java agent 的方式为 java 应用带来无侵入的分布式链路采集。

在微服务架构中, Spring Cloud Gateway 做为业务网关, 一般需要自定义 Filter ,调用其它服务接口验证用户身份或判断权限。 Gateway 进程配置了 Skywalking Agent(8.8.0) , 但在 Filter 中使用 WebClient 调用远程服务, 可能导致生成多个调用链路, 无法正确跟踪。

Skywalking Jave Agent 采集链路信息原理

排查问题之前, 先了解下 Skywalking Jave Agent 是如何采集链路信息的。

Plugin Development Guide

单进程内同步调用 trace 状态维护

Skywalking Jave Agent 通过 org.apache.skywalking.apm.agent.core.context.ContextManager 来管理 Trace 上下文。

通过 ContextManager#createEntrySpanContextManager#createLocalSpanContextManager#createExistSpan 等方法来创建一个 Span。

  • EntrySpan 表示一次远程被调跨度
  • LocalSpan 表示一次进程内本地跨度
  • ExistSpan 表示一次远程主调跨度

当创建 Span 时, 如果链路上下文 TraceContext 还没有创建, 会先创建 Trace , 并把 TracerContext 存到 ContextManager 管理的 ThreadLocal ContextManager.CONTEXT 中。 新创建的 Span 会使用 TracerContext 的上下文信息。

因为 TracerContext 存在 ThreadLocal 中, 所以在同一个线程中创建的多个 Span 会使用到同一个 TracerContext 串起来。

单进程内异步调用 trace 状态维护

当使用 Spring WebFlux 或 Vert.x 等异步框架时, 一次调用事务的逻辑可能调度在不同的线程中。

因为 ContextManager 使用 ThreadLocal 来维持 TracerContext, 那么在一次调用事务链中每次创建 Span , 可能对应不同的 TracerContext. 最终在 Skyawalking 控制台中出现多个链路。

比如 Spring Mvc 接收到 Http 请求时, 创建了一个 EntrySpan, 在接下来的业务逻辑中需要调用一个远程服务, 那么需要创建一个 ExitSpan , 但在创建 ExitSpan 时由于多次异步调用, 已经切到别的线程上, ContextManager 获取不到原来的 TracerContext, 便新建了一个, 此时便出现 EntrySpan 与 ExitSpan 不属于同一个 Trace 的情况。

针对异步调用, Skywalking Agent 提供了 ContextSnapshot 用于在线程间共享 TracerContext.

在实现异步框架的插件时, 当创建第一个 Span 后, 需要使用 ContextManager.capture() 获取到 ContextSnapshot, 并放置到异步框架本身的上下文来传递。

而后, 再创建后续的 Span 时, 需要从框架的上下文中获取 ContextSnapshot , 再使用 ContextManger.continued 方法把 ContextSnapshot 恢复到当前 Span 中。

跨进程调用 trace 状态传递

Skywalking 根据不同的网络协议或框架(比如 Http Header 或 Kafka Message Header), 来传递链路上下文。 实现步聚如下:

  1. 主调端创建一个 ExitSpan, 通过 ContextManger.inject(ContextCarrier carrier) 把上下文信息注入到 carrier 中, 通过 carrier 可以获取到需要传递的 Hearder 信息, 再把 Header 信息注入到对应调用框架中(比如 HttpRequest)。
  2. 被调方从框架中取得 Header 信息封装成 ContextCarrier, 再使用 carrier 调用 ContextManager#createEntrySpan 来创建 EntrySpan 便能把主调跟被调的 Trace 上下文串起来。

Spring WebFlux Webclient 插件实现逻辑及问题重现。

既然是 WebClient 调用会导致生成多个 Trace , 那么直接查看 spring-webflux-5.x-webclient-plugin 插件的代码(8.8.0 版本)。

通过 org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient.define.WebFluxWebClientInstrumentation 可以看到插件通过 org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient.WebFluxWebClientInterceptor 拦截了 org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction#exchange 方法。

那么继续查看 WebFluxWebClientInterceptor 的代码:

代码语言:java复制
@Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInvocationContext context) throws Throwable {
        //..

        
        ClientRequest request = (ClientRequest) allArguments[0];
        final ContextCarrier contextCarrier = new ContextCarrier();

        URI uri = request.url();
        final String requestURIString = getRequestURIString(uri);
        final String operationName = requestURIString;
        final String remotePeer = getIPAndPort(uri);
        
        // 直接创建 ExitSpan , 没用使用 ContextManager.continued 来恢复上下文
        AbstractSpan span = ContextManager.createExitSpan(operationName, contextCarrier, remotePeer);

        //...

        //user async interface
        span.prepareForAsync();
        ContextManager.stopSpan();
        context.setContext(span);
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret, MethodInvocationContext context) throws Throwable {
        // fix the problem that allArgument[0] may be null
        if (allArguments[0] == null) {
            return ret;
        }
        Mono<ClientResponse> ret1 = (Mono<ClientResponse>) ret;
        AbstractSpan span = (AbstractSpan) context.getContext();
        return ret1.doOnSuccess(clientResponse -> {
            //...
        }).doOnError(error -> {
            span.log(error);
        }).doFinally(s -> {
            span.asyncFinish();
        });
    }

可以看到 WebFluxWebClientInterceptor#beforeMethod 中直接创建 ExitSpan , 并没有使用 ContextManger.continued 来恢复上下文。 因为在Spring WebFlux 基于 Reactor 异步框架 , 那么创建当前 Span 与前置创建 EntrySpan 不在同个线程中, 两个 Span 属于两个不同的 Trace.

Bug 复现, 创建一个 Spring WebFlux 项目, 编写测试 Controller 如下

代码语言:java复制
@SpringBootApplication
@RestController
public class SpringWebfluxProjectApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringWebfluxProjectApplication.class, args);
    }

    @GetMapping("test")
    public Mono<String> hello() {
        return WebClient.create("http://localhost:8080/foo")
                .get()
                .retrieve()
                .bodyToMono(String.class)
                .flatMap(s -> {
                    return WebClient.create("http://localhost:8080/bar")
                            .get()
                            .retrieve()
                            .bodyToMono(String.class);
                });

    }

    @GetMapping("foo")
    public Mono<String> baidu(ServerWebExchange exchange) {
        return Mono.just("foo");

    }

    @GetMapping("bar")
    public Mono<String> qq(ServerWebExchange exchange) throws InterruptedException {
        return Mono.just("bar").delayElement(Duration.ofMillis(100));

    }

}

配置好skywalking agent 相关 JVM 参数, 运行项目, 请求 http://localhost:8080/test , 查看 skywalking 面板, 确实生成了多个 Span , 但每个 Span 的 TraceId 都不一样。

skywalking-webclient-bug.pngskywalking-webclient-bug.png

解决方案

基于上节分析, 根本问题在于在创建 ExitSpan 时没有恢复上下文, 那么需有找到一个方法获取到上游的 ContextSnapshot 并恢复即可。

Spring Webflux 基于 Reactor 框架 , 可以通过 Reactor Context 来传递 ContextSnapshot.

Skywalking 默认插件中包含 mvc-annotation-5.x-plugin , 查看对应代码, 发现该插件通过拦截 Spring Mvc 相关注解方法, 在注解方法前创建 EntrySpan , 使用同步的方式,且拦截方法返回值不一定是 Mono 或 Flux , 难于在这个地方把 ContextSnapshot 放入 Reactor Context 中。 在 optional-plugin 还有 spring-webflux-5.x-plugin 插件, 该插件通过拦截 org.springframework.web.reactive.DispatcherHandler#handle 来创建 EntrySpan, DispatcherHandler#handle 返回 Mono , 可以在这里插入 ContextSnapshot.

具体实现如下:

代码语言:java复制
//org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor#afterMethod
 @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                              Object ret) throws Throwable {

        ServerWebExchange exchange = (ServerWebExchange) allArguments[0];

        AbstractSpan span = (AbstractSpan) exchange.getAttributes().get("SKYWALKING_SPAN");
        Mono<Void> monoReturn = (Mono<Void>) ret;

        
        // add skywalking context snapshot to reactor context.
        EnhancedInstance instance = getInstance(allArguments[0]);
        if (instance != null && instance.getSkyWalkingDynamicField() != null) {
            monoReturn = monoReturn.subscriberContext(
                    c -> c.put("SKYWALKING_CONTEXT_SNAPSHOT", instance.getSkyWalkingDynamicField()));
        }

        return monoReturn.doFinally(s -> {

            if (span != null) {
                maybeSetPattern(span, exchange);
                try {

                    HttpStatus httpStatus = exchange.getResponse().getStatusCode();
                    // fix webflux-2.0.0-2.1.0 version have bug. httpStatus is null. not support
                    if (httpStatus != null) {
                        Tags.HTTP_RESPONSE_STATUS_CODE.set(span, httpStatus.value());
                        if (httpStatus.isError()) {
                            span.errorOccurred();
                        }
                    }
                } finally {
                    span.asyncFinish();
                }
            }
        });
    }

个性 WebFluxWebClientInterceptor 从 Reactor Context 中获取 ContextSnapshot :

代码语言:java复制
public class WebFluxWebClientInterceptor implements InstanceMethodsAroundInterceptorV2 {

    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInvocationContext context) throws Throwable {
        // before method 中无法获取 Reactor 上下文 , 原逻辑直接删除掉
        // ExchangeFunctions$DefaultExchangeFunction 中只是构建 Reactor 链条, 并末真正执行, 所以原来逻辑可以推迟到 subscriberContext 中获取上下文后再执行。
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret, MethodInvocationContext context) throws Throwable {
        // fix the problem that allArgument[0] may be null
        if (allArguments[0] == null) {
            return ret;
        }
        Mono<ClientResponse> ret1 = (Mono<ClientResponse>) ret;
        // 从 Reactor Context 中获取 ContextSnapshot 
        return Mono.subscriberContext().flatMap(ctx -> {

            ClientRequest request = (ClientRequest) allArguments[0];
            URI uri = request.url();
            final String operationName = getRequestURIString(uri);
            final String remotePeer = getIPAndPort(uri);
            AbstractSpan span = ContextManager.createExitSpan(operationName, remotePeer);

            // get ContextSnapshot from reactor context,  the snapshot is set to reactor context by any other plugin
            // such as DispatcherHandlerHandleMethodInterceptor in spring-webflux-5.x-plugin
            final Optional<Object> optional = ctx.getOrEmpty("SKYWALKING_CONTEXT_SNAPSHOT");
            optional.ifPresent(snapshot -> ContextManager.continued((ContextSnapshot) snapshot));

            //set components name
            span.setComponent(ComponentsDefine.SPRING_WEBCLIENT);
            Tags.URL.set(span, uri.toString());
            Tags.HTTP.METHOD.set(span, request.method().toString());
            SpanLayer.asHttp(span);

            final ContextCarrier contextCarrier = new ContextCarrier();
            ContextManager.inject(contextCarrier);
            if (request instanceof EnhancedInstance) {
                ((EnhancedInstance) request).setSkyWalkingDynamicField(contextCarrier);
            }

            //user async interface
            span.prepareForAsync();
            ContextManager.stopSpan();
            return ret1.doOnSuccess(clientResponse -> {
                HttpStatus httpStatus = clientResponse.statusCode();
                if (httpStatus != null) {
                    Tags.HTTP_RESPONSE_STATUS_CODE.set(span, httpStatus.value());
                    if (httpStatus.isError()) {
                        span.errorOccurred();
                    }
                }
            }).doOnError(span::log).doFinally(s -> {
                span.asyncFinish();
            });
        });
    }
    
}

重新编译插件后把 spring-webflux-5.x-plugin 及 spring-webflux-5.x-webclient-plugin 两个插件拷到 Skywalking Agent plugin 目录下, 重新运行测试代码, 可以发现问题解决, 所有调用都串起来。

webclient_trace_fixed.pngwebclient_trace_fixed.png

修复代码已合并到 skywalking-java 主干(#114), 预计将在 8.10.0 版本中发布。

注意1: 因为 spring-webflux-5.x-plugin 是在 optional-plugins 目录中, 需要手工拷到 plugins 目录。

而 Spring Cloud Gateway 工程需要手工拷 gateway-3.x-plugin。

注意2: Srping MVC 插件 apm-springmvc-annotation-5.x-plugin 默认生效, 当与 spring-webflux-5.x-plugin 同时存在时, 一次调用会生成两个 EntrySpan, 而且 mvc 插件生成 EntrySpan 虽然与 Webclient 生成的 ExitSpan 能用同个 TraceId 串起来了, 但仍然没有 Parent/Child 关系, 介意的话在 Spring Webflux 工程中把 spring-webflux-5.x-plugin 移出 ${agetn/path}/plugin 目录。


参考

  1. Plugin Development Guide
  2. Reactor Context

0 人点赞