赵化冰,腾讯云高级工程师,Istio Member,ServiceMesher 管理委员,Istio 项目贡献者,热衷于开源、网络和云计算。目前主要从事服务网格的开源和研发工作。
背景
在上一篇文章《Istio 最佳实践系列:如何实现方法级调用跟踪》中,我们通过一个网上商店的示例程序学习了如何使用 OpenTracing 在 Istio 服务网格中传递分布式调用跟踪的上下文,以及如何将方法级的调用信息加入到 Istio/Envoy 生成的调用链中。采用 OpenTracing 可以减少应用代码中传递HTTP header的重复代码;也可以根据需要在调用链中加入更细粒度的 Span,以用于对系统性能瓶颈进行在线分析。
在实际项目中,除了同步调用之外,异步消息也是微服务架构中常见的一种通信方式。在本篇文章中,我将继续利用 eshop demo 程序来探讨如何通过 OpenTracing 将 Kafka 异步消息也纳入到 Istio 的分布式调用跟踪中。
eshop 示例程序结构
如下图所示,demo 程序中增加了发送和接收 Kafka 消息的代码。eshop 微服务在调用 inventory,billing,delivery 服务后,发送了一个 kafka 消息通知,consumer 接收到通知后调用 notification 服务的REST接口向用户发送购买成功的邮件通知。
将Kafka消息处理加入调用链跟踪
植入 Kafka OpenTracing 代码
首先从 github 下载代码。
代码语言:javascript复制git clone git@github.com:aeraki-framework/method-level-tracing-with-istio.git
可以直接使用该代码,但建议跟随下面的步骤查看相关的代码,以了解各个步骤背后的原理。
根目录下分为了 rest-service 和 kafka-consumer 两个目录,rest-service 下包含了各个 REST 服务的代码,kafka-consumer下是Kafka消息消费者的代码。
首先需要将spring kafka和OpenTracing kafka的依赖加入到两个目录下的pom文件中。
代码语言:javascript复制<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-kafka-client</artifactId>
<version>${version.opentracing.kafka-client}</version>
</dependency>
在 rest-service 目录中的 KafkaConfig.java 中配置消息 Producer 端的 OpenTracing Instrument。TracingProducerInterceptor 会在发送 Kafka 消息时生成发送端的Span。
代码语言:javascript复制@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
return new DefaultKafkaProducerFactory<>(configProps);
}
在 kafka-consumer 目录中的 KafkaConfig.java 中配置消息 Consumer 端的OpenTracing Instrument。TracingConsumerInterceptor 会在接收到 Kafka 消息是生成接收端的 Span。
代码语言:javascript复制@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
return new DefaultKafkaConsumerFactory<>(props);
}
只需要这两步即可完成Spring程序的Kafka OpenTracing代码植入。下面安装并运行示例程序查看效果。
安装Kafka集群
示例程序中使用到了Kafka消息,因此我们在 TKE 集群中部署一个简单的Kafka实例:
代码语言:javascript复制cd method-level-tracing-with-istio
kubectl apply -f k8s/kafka.yaml
部署demo应用
修改 Kubernetes yaml 部署文件 k8s/eshop.yaml,设置Kafka bootstrap server,以用于demo程序连接到Kafka集群中。
代码语言:javascript复制apiVersion: apps/v1
kind: Deployment
metadata:
name: delivery
......
spec:
containers:
- name: eshop
image: aeraki/istio-opentracing-demo:latest
ports:
- containerPort: 8080
env:
....
//在这里加入Kafka server地址
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka-service:9092"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-consumer
......
spec:
containers:
- name: kafka-consumer
image: aeraki/istio-opentracing-demo-kafka-consumer:latest
env:
....
//在这里加入Kafka server地址
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka-service:9092"
然后部署应用程序,相关的镜像可以直接从dockerhub下载,也可以通过源码编译生成。
代码语言:javascript复制kubectl apply -f k8s/eshop.yaml
在浏览器中打开地址:http://{INGRESS_EXTERNAL_IP}/checkout,以触发调用eshop示例程序的REST接口。然后打开 TCM 的界面查看生成的分布式调用跟踪信息。
从图中可以看到,在调用链中增加了两个 Span,分布对应于Kafka消息发送和接收的两个操作。由于Kafka消息的处理是异步的,消息发送端不直接依赖接收端的处理。根据 OpenTracing 对引用关系的定义,From_eshop_topic Span 对 To_eshop_topic Span 的引用关系是 FOLLOWS_FROM 而不是 CHILD_OF 关系。
将调用跟踪上下文从Kafka传递到REST服务
现在 eshop 代码中已经加入了 REST 和 Kafka 的 OpenTracing Instrumentation,可以在进行 REST 调用和发送 Kafka 消息时生成调用跟踪信息。但如果需要从 Kafka 的消息消费者的处理方法中调用一个 REST 接口呢?
我们会发现在eshop示例程序中,缺省生成的调用链里面并不会把Kafka消费者的Span和其发起的调用notification服务的REST请求的Span关联在同一个Trace中。
要分析导致该问题的原因,我们首先需要了解“Active Span”[1]的概念。在OpenTracing中,一个线程可以有一个Active Span,该Active Span代表了目前该线程正在执行的工作。在调用Tracer.buildSpan()方法创建新的Span时,如果Tracer目前存在一个Active Span,则会将该Active Span缺省作为新创建的Span的Parent Span。
Tracer.buildSpan 方法的说明如下:
代码语言:javascript复制Tracer.SpanBuilder buildSpan(String operationName)
Return a new SpanBuilder for a Span with the given `operationName`.
You can override the operationName later via BaseSpan.setOperationName(String).
A contrived example:
Tracer tracer = ...
// Note: if there is a `tracer.activeSpan()`, it will be used as the target of an implicit CHILD_OF
// Reference for "workSpan" when `startActive()` is invoked.
// 如果存在active span,则其创建的新Span会隐式地创建一个 CHILD_OF 引用到该active span
try (ActiveSpan workSpan = tracer.buildSpan("DoWork").startActive()) {
workSpan.setTag("...", "...");
// etc, etc
}
// 也可以通过asChildOf方法指定新创建的Span的Parent Span
// It's also possible to create Spans manually, bypassing the ActiveSpanSource activation.
Span http = tracer.buildSpan("HandleHTTPRequest")
.asChildOf(rpcSpanContext) // an explicit parent
.withTag("user_agent", req.UserAgent)
.withTag("lucky_number", 42)
.startManual();
分析 Kafka OpenTracing Instrumentation 的代码,会发现 TracingConsumerInterceptor 在调用 Kafka 消费者的处理方法之前已经把消费者的 Span 结束了,因此发起 REST 调用时 tracer 没有 active span,不会将Kafka 消费者的Span作为后面 REST 调用的 parent span。
代码语言:javascript复制public static <K, V> void buildAndFinishChildSpan(ConsumerRecord<K, V> record, Tracer tracer,
BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider) {
SpanContext parentContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
String consumerOper =
FROM_PREFIX record.topic(); // <====== It provides better readability in the UI
Tracer.SpanBuilder spanBuilder = tracer
.buildSpan(consumerSpanNameProvider.apply(consumerOper, record))
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER);
if (parentContext != null) {
spanBuilder.addReference(References.FOLLOWS_FROM, parentContext);
}
Span span = spanBuilder.start();
SpanDecorator.onResponse(record, span);
//在调用消费者的处理方法之前,该Span已经被结束。
span.finish();
// Inject created span context into record headers for extraction by client to continue span chain
//这个Span被放到了Kafka消息的header中
TracingKafkaUtils.inject(span.context(), record.headers(), tracer);
}
此时 TracingConsumerInterceptor 已经将 Kafka 消费者的 Span 放到了Kafka 消息的 header 中,因此从 Kafka 消息头中取出该Span,显示地将 Kafka消费者的 Span 作为 REST 调用的 Parent Span 即可。
为MessageConsumer.java 使用的 RestTemplate 设置一个TracingKafka2RestTemplateInterceptor。
代码语言:javascript复制@KafkaListener(topics = "eshop-topic")
public void receiveMessage(ConsumerRecord<String, String> record) {
restTemplate
.setInterceptors(Collections.singletonList(new TracingKafka2RestTemplateInterceptor(record.headers())));
restTemplate.getForEntity("http://notification:8080/sendEmail", String.class);
}
TracingKafka2RestTemplateInterceptor是基于Spring OpenTracing Instrumentation的TracingRestTemplateInterceptor修改的,将从Kafka header中取出的Span设置为出向请求的Span的Parent Span。
代码语言:javascript复制@Override
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] body, ClientHttpRequestExecution xecution)
throws IOException {
ClientHttpResponse httpResponse;
SpanContext parentSpanContext = TracingKafkaUtils.extractSpanContext(headers, tracer);
Span span = tracer.buildSpan(httpRequest.getMethod().toString()).asChildOf(parentSpanContext)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT).start();
......
}
在浏览器中打开地址:http://{INGRESS_EXTERNAL_IP}/checkout,以触发调用eshop示例程序的REST接口。然后打开 TCM 的界面查看生成的分布式调用跟踪信息。
从上图可以看到,调用链中出现了 Kafka 消费者调用 notification 服务的 sendEmail REST 接口的 Span。从图中可以看到,由于调用链经过了 Kafka 消息,sendEmail Span 的时间没有包含在 checkout Span 中。
总结
Istio 服务网格通过分布式调用跟踪来提高微服务应用的可见性,这需要在应用程序中通过HTTP header传递调用跟踪的上下文。对于 JAVA 应用程序,我们可以使用 OpenTracing Instrumentation 来代替应用编码传递分布式跟踪的相关http header,以减少对业务代码的影响;我们还可以将方法级的调用跟踪和 Kafka 消息的调用跟踪加入到 Istio 生成的调用跟踪链中,以为应用程序的故障定位提供更为丰富详细的调用跟踪信息。
参考资料
[1]
“Active Span”: https://opentracing.io/docs/overview/scopes-and-threading/
[2]
本文中eshop示例程序的源代码: https://github.com/aeraki-framework/method-level-tracing-with-istio
往期精选推荐
- 开工必备!50 篇超实用云原生技术干货合集
- Istio最佳实践系列:如何实现方法级调用跟踪?
- 如何在 Istio 中支持 Dubbo、Thrift、Redis 以及任何七层协议?
- 在 Istio 中实现 Redis 集群的数据分片、读写分离和流量镜像
- Istio 运维实战系列(1):应用容器对 Envoy Sidecar 的启动依赖问题