解决 Spring Cloud Gateway 项目中无法追踪 WebClient 调用的问题
问题描述
Skywalking 通过 java agent 的方式为 java 应用带来无侵入的分布式链路采集。
在微服务架构中, Spring Cloud Gateway 做为业务网关, 一般需要自定义 Filter ,调用其它服务接口验证用户身份或判断权限。 Gateway 进程配置了 Skywalking Agent(8.8.0) , 但在 Filter 中使用 WebClient 调用远程服务, 可能导致生成多个调用链路, 无法正确跟踪。
Skywalking Jave Agent 采集链路信息原理
排查问题之前, 先了解下 Skywalking Jave Agent 是如何采集链路信息的。
Plugin Development Guide
单进程内同步调用 trace 状态维护
Skywalking Jave Agent 通过 org.apache.skywalking.apm.agent.core.context.ContextManager
来管理 Trace 上下文。
通过 ContextManager#createEntrySpan
、 ContextManager#createLocalSpan
、 ContextManager#createExistSpan
等方法来创建一个 Span。
- EntrySpan 表示一次远程被调跨度
- LocalSpan 表示一次进程内本地跨度
- ExistSpan 表示一次远程主调跨度
当创建 Span 时, 如果链路上下文 TraceContext 还没有创建, 会先创建 Trace , 并把 TracerContext 存到 ContextManager 管理的 ThreadLocal ContextManager.CONTEXT
中。 新创建的 Span 会使用 TracerContext 的上下文信息。
因为 TracerContext 存在 ThreadLocal 中, 所以在同一个线程中创建的多个 Span 会使用到同一个 TracerContext 串起来。
单进程内异步调用 trace 状态维护
当使用 Spring WebFlux 或 Vert.x 等异步框架时, 一次调用事务的逻辑可能调度在不同的线程中。
因为 ContextManager
使用 ThreadLocal
来维持 TracerContext, 那么在一次调用事务链中每次创建 Span , 可能对应不同的 TracerContext. 最终在 Skyawalking 控制台中出现多个链路。
比如 Spring Mvc 接收到 Http 请求时, 创建了一个 EntrySpan, 在接下来的业务逻辑中需要调用一个远程服务, 那么需要创建一个 ExitSpan , 但在创建 ExitSpan 时由于多次异步调用, 已经切到别的线程上, ContextManager 获取不到原来的 TracerContext, 便新建了一个, 此时便出现 EntrySpan 与 ExitSpan 不属于同一个 Trace 的情况。
针对异步调用, Skywalking Agent 提供了 ContextSnapshot
用于在线程间共享 TracerContext.
在实现异步框架的插件时, 当创建第一个 Span 后, 需要使用 ContextManager.capture()
获取到 ContextSnapshot
, 并放置到异步框架本身的上下文来传递。
而后, 再创建后续的 Span 时, 需要从框架的上下文中获取 ContextSnapshot
, 再使用 ContextManger.continued
方法把 ContextSnapshot
恢复到当前 Span 中。
跨进程调用 trace 状态传递
Skywalking 根据不同的网络协议或框架(比如 Http Header 或 Kafka Message Header), 来传递链路上下文。 实现步聚如下:
- 主调端创建一个 ExitSpan, 通过
ContextManger.inject(ContextCarrier carrier)
把上下文信息注入到 carrier 中, 通过 carrier 可以获取到需要传递的 Hearder 信息, 再把 Header 信息注入到对应调用框架中(比如 HttpRequest)。 - 被调方从框架中取得 Header 信息封装成 ContextCarrier, 再使用 carrier 调用
ContextManager#createEntrySpan
来创建 EntrySpan 便能把主调跟被调的 Trace 上下文串起来。
Spring WebFlux Webclient 插件实现逻辑及问题重现。
既然是 WebClient 调用会导致生成多个 Trace , 那么直接查看 spring-webflux-5.x-webclient-plugin 插件的代码(8.8.0 版本)。
通过 org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient.define.WebFluxWebClientInstrumentation
可以看到插件通过 org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient.WebFluxWebClientInterceptor
拦截了 org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction#exchange
方法。
那么继续查看 WebFluxWebClientInterceptor
的代码:
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInvocationContext context) throws Throwable {
//..
ClientRequest request = (ClientRequest) allArguments[0];
final ContextCarrier contextCarrier = new ContextCarrier();
URI uri = request.url();
final String requestURIString = getRequestURIString(uri);
final String operationName = requestURIString;
final String remotePeer = getIPAndPort(uri);
// 直接创建 ExitSpan , 没用使用 ContextManager.continued 来恢复上下文
AbstractSpan span = ContextManager.createExitSpan(operationName, contextCarrier, remotePeer);
//...
//user async interface
span.prepareForAsync();
ContextManager.stopSpan();
context.setContext(span);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret, MethodInvocationContext context) throws Throwable {
// fix the problem that allArgument[0] may be null
if (allArguments[0] == null) {
return ret;
}
Mono<ClientResponse> ret1 = (Mono<ClientResponse>) ret;
AbstractSpan span = (AbstractSpan) context.getContext();
return ret1.doOnSuccess(clientResponse -> {
//...
}).doOnError(error -> {
span.log(error);
}).doFinally(s -> {
span.asyncFinish();
});
}
可以看到 WebFluxWebClientInterceptor#beforeMethod
中直接创建 ExitSpan , 并没有使用 ContextManger.continued
来恢复上下文。 因为在Spring WebFlux 基于 Reactor 异步框架 , 那么创建当前 Span 与前置创建 EntrySpan 不在同个线程中, 两个 Span 属于两个不同的 Trace.
Bug 复现, 创建一个 Spring WebFlux 项目, 编写测试 Controller 如下
代码语言:java复制@SpringBootApplication
@RestController
public class SpringWebfluxProjectApplication {
public static void main(String[] args) {
SpringApplication.run(SpringWebfluxProjectApplication.class, args);
}
@GetMapping("test")
public Mono<String> hello() {
return WebClient.create("http://localhost:8080/foo")
.get()
.retrieve()
.bodyToMono(String.class)
.flatMap(s -> {
return WebClient.create("http://localhost:8080/bar")
.get()
.retrieve()
.bodyToMono(String.class);
});
}
@GetMapping("foo")
public Mono<String> baidu(ServerWebExchange exchange) {
return Mono.just("foo");
}
@GetMapping("bar")
public Mono<String> qq(ServerWebExchange exchange) throws InterruptedException {
return Mono.just("bar").delayElement(Duration.ofMillis(100));
}
}
配置好skywalking agent 相关 JVM 参数, 运行项目, 请求 http://localhost:8080/test , 查看 skywalking 面板, 确实生成了多个 Span , 但每个 Span 的 TraceId 都不一样。
解决方案
基于上节分析, 根本问题在于在创建 ExitSpan 时没有恢复上下文, 那么需有找到一个方法获取到上游的 ContextSnapshot 并恢复即可。
Spring Webflux 基于 Reactor 框架 , 可以通过 Reactor Context 来传递 ContextSnapshot.
Skywalking 默认插件中包含 mvc-annotation-5.x-plugin , 查看对应代码, 发现该插件通过拦截 Spring Mvc 相关注解方法, 在注解方法前创建 EntrySpan , 使用同步的方式,且拦截方法返回值不一定是 Mono 或 Flux , 难于在这个地方把 ContextSnapshot
放入 Reactor Context 中。 在 optional-plugin 还有 spring-webflux-5.x-plugin 插件, 该插件通过拦截 org.springframework.web.reactive.DispatcherHandler#handle
来创建 EntrySpan, DispatcherHandler#handle
返回 Mono
, 可以在这里插入 ContextSnapshot
.
具体实现如下:
代码语言:java复制//org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor#afterMethod
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
ServerWebExchange exchange = (ServerWebExchange) allArguments[0];
AbstractSpan span = (AbstractSpan) exchange.getAttributes().get("SKYWALKING_SPAN");
Mono<Void> monoReturn = (Mono<Void>) ret;
// add skywalking context snapshot to reactor context.
EnhancedInstance instance = getInstance(allArguments[0]);
if (instance != null && instance.getSkyWalkingDynamicField() != null) {
monoReturn = monoReturn.subscriberContext(
c -> c.put("SKYWALKING_CONTEXT_SNAPSHOT", instance.getSkyWalkingDynamicField()));
}
return monoReturn.doFinally(s -> {
if (span != null) {
maybeSetPattern(span, exchange);
try {
HttpStatus httpStatus = exchange.getResponse().getStatusCode();
// fix webflux-2.0.0-2.1.0 version have bug. httpStatus is null. not support
if (httpStatus != null) {
Tags.HTTP_RESPONSE_STATUS_CODE.set(span, httpStatus.value());
if (httpStatus.isError()) {
span.errorOccurred();
}
}
} finally {
span.asyncFinish();
}
}
});
}
个性 WebFluxWebClientInterceptor
从 Reactor Context 中获取 ContextSnapshot :
public class WebFluxWebClientInterceptor implements InstanceMethodsAroundInterceptorV2 {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInvocationContext context) throws Throwable {
// before method 中无法获取 Reactor 上下文 , 原逻辑直接删除掉
// ExchangeFunctions$DefaultExchangeFunction 中只是构建 Reactor 链条, 并末真正执行, 所以原来逻辑可以推迟到 subscriberContext 中获取上下文后再执行。
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret, MethodInvocationContext context) throws Throwable {
// fix the problem that allArgument[0] may be null
if (allArguments[0] == null) {
return ret;
}
Mono<ClientResponse> ret1 = (Mono<ClientResponse>) ret;
// 从 Reactor Context 中获取 ContextSnapshot
return Mono.subscriberContext().flatMap(ctx -> {
ClientRequest request = (ClientRequest) allArguments[0];
URI uri = request.url();
final String operationName = getRequestURIString(uri);
final String remotePeer = getIPAndPort(uri);
AbstractSpan span = ContextManager.createExitSpan(operationName, remotePeer);
// get ContextSnapshot from reactor context, the snapshot is set to reactor context by any other plugin
// such as DispatcherHandlerHandleMethodInterceptor in spring-webflux-5.x-plugin
final Optional<Object> optional = ctx.getOrEmpty("SKYWALKING_CONTEXT_SNAPSHOT");
optional.ifPresent(snapshot -> ContextManager.continued((ContextSnapshot) snapshot));
//set components name
span.setComponent(ComponentsDefine.SPRING_WEBCLIENT);
Tags.URL.set(span, uri.toString());
Tags.HTTP.METHOD.set(span, request.method().toString());
SpanLayer.asHttp(span);
final ContextCarrier contextCarrier = new ContextCarrier();
ContextManager.inject(contextCarrier);
if (request instanceof EnhancedInstance) {
((EnhancedInstance) request).setSkyWalkingDynamicField(contextCarrier);
}
//user async interface
span.prepareForAsync();
ContextManager.stopSpan();
return ret1.doOnSuccess(clientResponse -> {
HttpStatus httpStatus = clientResponse.statusCode();
if (httpStatus != null) {
Tags.HTTP_RESPONSE_STATUS_CODE.set(span, httpStatus.value());
if (httpStatus.isError()) {
span.errorOccurred();
}
}
}).doOnError(span::log).doFinally(s -> {
span.asyncFinish();
});
});
}
}
重新编译插件后把 spring-webflux-5.x-plugin 及 spring-webflux-5.x-webclient-plugin 两个插件拷到 Skywalking Agent plugin 目录下, 重新运行测试代码, 可以发现问题解决, 所有调用都串起来。
修复代码已合并到 skywalking-java 主干(#114), 预计将在 8.10.0 版本中发布。
注意1: 因为 spring-webflux-5.x-plugin 是在 optional-plugins 目录中, 需要手工拷到 plugins 目录。
而 Spring Cloud Gateway 工程需要手工拷 gateway-3.x-plugin。
注意2: Srping MVC 插件 apm-springmvc-annotation-5.x-plugin 默认生效, 当与 spring-webflux-5.x-plugin 同时存在时, 一次调用会生成两个 EntrySpan, 而且 mvc 插件生成 EntrySpan 虽然与 Webclient 生成的 ExitSpan 能用同个 TraceId 串起来了, 但仍然没有 Parent/Child 关系, 介意的话在 Spring Webflux 工程中把 spring-webflux-5.x-plugin 移出 ${agetn/path}/plugin 目录。
参考
- Plugin Development Guide
- Reactor Context