如何使用 OpenTracing 在 TCM 中实现异步消息调用跟踪

2021-04-19 10:11:11 浏览数 (1)

赵化冰,腾讯云高级工程师,Istio Member,ServiceMesher 管理委员,Istio 项目贡献者,热衷于开源、网络和云计算。目前主要从事服务网格的开源和研发工作。

背景

在上一篇文章《Istio 最佳实践系列:如何实现方法级调用跟踪》中,我们通过一个网上商店的示例程序学习了如何使用 OpenTracing 在 Istio 服务网格中传递分布式调用跟踪的上下文,以及如何将方法级的调用信息加入到 Istio/Envoy 生成的调用链中。采用 OpenTracing 可以减少应用代码中传递HTTP header的重复代码;也可以根据需要在调用链中加入更细粒度的 Span,以用于对系统性能瓶颈进行在线分析。

在实际项目中,除了同步调用之外,异步消息也是微服务架构中常见的一种通信方式。在本篇文章中,我将继续利用 eshop demo 程序来探讨如何通过 OpenTracing 将 Kafka 异步消息也纳入到 Istio 的分布式调用跟踪中。

eshop 示例程序结构

如下图所示,demo 程序中增加了发送和接收 Kafka 消息的代码。eshop 微服务在调用 inventory,billing,delivery 服务后,发送了一个 kafka 消息通知,consumer 接收到通知后调用 notification 服务的REST接口向用户发送购买成功的邮件通知。

将Kafka消息处理加入调用链跟踪

植入 Kafka OpenTracing 代码

首先从 github 下载代码。

代码语言:javascript复制
git clone git@github.com:aeraki-framework/method-level-tracing-with-istio.git

可以直接使用该代码,但建议跟随下面的步骤查看相关的代码,以了解各个步骤背后的原理。

根目录下分为了 rest-service 和 kafka-consumer 两个目录,rest-service 下包含了各个 REST 服务的代码,kafka-consumer下是Kafka消息消费者的代码。

首先需要将spring kafka和OpenTracing kafka的依赖加入到两个目录下的pom文件中。

代码语言:javascript复制
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
 <dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-client</artifactId>
    <version>${version.opentracing.kafka-client}</version>
</dependency>

在 rest-service 目录中的 KafkaConfig.java 中配置消息 Producer 端的 OpenTracing Instrument。TracingProducerInterceptor 会在发送 Kafka 消息时生成发送端的Span。

代码语言:javascript复制
@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
    return new DefaultKafkaProducerFactory<>(configProps);
}

在 kafka-consumer 目录中的 KafkaConfig.java 中配置消息 Consumer 端的OpenTracing Instrument。TracingConsumerInterceptor 会在接收到 Kafka 消息是生成接收端的 Span。

代码语言:javascript复制
@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
    return new DefaultKafkaConsumerFactory<>(props);
}

只需要这两步即可完成Spring程序的Kafka OpenTracing代码植入。下面安装并运行示例程序查看效果。

安装Kafka集群

示例程序中使用到了Kafka消息,因此我们在 TKE 集群中部署一个简单的Kafka实例:

代码语言:javascript复制
cd method-level-tracing-with-istio
kubectl apply -f k8s/kafka.yaml

部署demo应用

修改 Kubernetes yaml 部署文件 k8s/eshop.yaml,设置Kafka bootstrap server,以用于demo程序连接到Kafka集群中。

代码语言:javascript复制
apiVersion: apps/v1
kind: Deployment
metadata:
  name: delivery
  ......
    spec:
      containers:
      - name: eshop
        image: aeraki/istio-opentracing-demo:latest
        ports:
        - containerPort: 8080
        env:
          ....
          //在这里加入Kafka server地址
          - name: KAFKA_BOOTSTRAP_SERVERS
            value: "kafka-service:9092"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-consumer
  ......
    spec:
      containers:
      - name: kafka-consumer
        image: aeraki/istio-opentracing-demo-kafka-consumer:latest
        env:
          ....
          //在这里加入Kafka server地址
          - name: KAFKA_BOOTSTRAP_SERVERS
            value: "kafka-service:9092"

然后部署应用程序,相关的镜像可以直接从dockerhub下载,也可以通过源码编译生成。

代码语言:javascript复制
kubectl apply -f k8s/eshop.yaml

在浏览器中打开地址:http://{INGRESS_EXTERNAL_IP}/checkout,以触发调用eshop示例程序的REST接口。然后打开 TCM 的界面查看生成的分布式调用跟踪信息。

从图中可以看到,在调用链中增加了两个 Span,分布对应于Kafka消息发送和接收的两个操作。由于Kafka消息的处理是异步的,消息发送端不直接依赖接收端的处理。根据 OpenTracing 对引用关系的定义,From_eshop_topic Span 对 To_eshop_topic Span 的引用关系是 FOLLOWS_FROM 而不是 CHILD_OF 关系。

将调用跟踪上下文从Kafka传递到REST服务

现在 eshop 代码中已经加入了 REST 和 Kafka 的 OpenTracing Instrumentation,可以在进行 REST 调用和发送 Kafka 消息时生成调用跟踪信息。但如果需要从 Kafka 的消息消费者的处理方法中调用一个 REST 接口呢?

我们会发现在eshop示例程序中,缺省生成的调用链里面并不会把Kafka消费者的Span和其发起的调用notification服务的REST请求的Span关联在同一个Trace中。

要分析导致该问题的原因,我们首先需要了解“Active Span”[1]的概念。在OpenTracing中,一个线程可以有一个Active Span,该Active Span代表了目前该线程正在执行的工作。在调用Tracer.buildSpan()方法创建新的Span时,如果Tracer目前存在一个Active Span,则会将该Active Span缺省作为新创建的Span的Parent Span。

Tracer.buildSpan 方法的说明如下:

代码语言:javascript复制
Tracer.SpanBuilder buildSpan(String operationName)
Return a new SpanBuilder for a Span with the given `operationName`.
You can override the operationName later via BaseSpan.setOperationName(String).

A contrived example:


   Tracer tracer = ...
   
   // Note: if there is a `tracer.activeSpan()`, it will be used as the target of an implicit CHILD_OF
   // Reference for "workSpan" when `startActive()` is invoked.
   // 如果存在active span,则其创建的新Span会隐式地创建一个 CHILD_OF 引用到该active span
   try (ActiveSpan workSpan = tracer.buildSpan("DoWork").startActive()) {
       workSpan.setTag("...", "...");
       // etc, etc
   }
   // 也可以通过asChildOf方法指定新创建的Span的Parent Span
   // It's also possible to create Spans manually, bypassing the ActiveSpanSource activation.
   Span http = tracer.buildSpan("HandleHTTPRequest")
                     .asChildOf(rpcSpanContext)  // an explicit parent
                     .withTag("user_agent", req.UserAgent)
                     .withTag("lucky_number", 42)
                     .startManual();

分析 Kafka OpenTracing Instrumentation 的代码,会发现 TracingConsumerInterceptor 在调用 Kafka 消费者的处理方法之前已经把消费者的 Span 结束了,因此发起 REST 调用时 tracer 没有 active span,不会将Kafka 消费者的Span作为后面 REST 调用的 parent span。

代码语言:javascript复制
public static <K, V> void buildAndFinishChildSpan(ConsumerRecord<K, V> record, Tracer tracer,
      BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider) {
    SpanContext parentContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
    String consumerOper =
        FROM_PREFIX   record.topic(); // <====== It provides better readability in the UI
    Tracer.SpanBuilder spanBuilder = tracer
        .buildSpan(consumerSpanNameProvider.apply(consumerOper, record))
        .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER);
    if (parentContext != null) {
      spanBuilder.addReference(References.FOLLOWS_FROM, parentContext);
    }
    Span span = spanBuilder.start();
    SpanDecorator.onResponse(record, span);
    //在调用消费者的处理方法之前,该Span已经被结束。
    span.finish();
    // Inject created span context into record headers for extraction by client to continue span chain
    //这个Span被放到了Kafka消息的header中
    TracingKafkaUtils.inject(span.context(), record.headers(), tracer);
  }

此时 TracingConsumerInterceptor 已经将 Kafka 消费者的 Span 放到了Kafka 消息的 header 中,因此从 Kafka 消息头中取出该Span,显示地将 Kafka消费者的 Span 作为 REST 调用的 Parent Span 即可。

为MessageConsumer.java 使用的 RestTemplate 设置一个TracingKafka2RestTemplateInterceptor。

代码语言:javascript复制
@KafkaListener(topics = "eshop-topic")
public void receiveMessage(ConsumerRecord<String, String> record) {
    restTemplate
            .setInterceptors(Collections.singletonList(new TracingKafka2RestTemplateInterceptor(record.headers())));
    restTemplate.getForEntity("http://notification:8080/sendEmail", String.class);
}

TracingKafka2RestTemplateInterceptor是基于Spring OpenTracing Instrumentation的TracingRestTemplateInterceptor修改的,将从Kafka header中取出的Span设置为出向请求的Span的Parent Span。

代码语言:javascript复制
@Override
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] body, ClientHttpRequestExecution xecution)
        throws IOException {
    ClientHttpResponse httpResponse;
    SpanContext parentSpanContext = TracingKafkaUtils.extractSpanContext(headers, tracer);
    Span span = tracer.buildSpan(httpRequest.getMethod().toString()).asChildOf(parentSpanContext)
            .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT).start();
    ......
}

在浏览器中打开地址:http://{INGRESS_EXTERNAL_IP}/checkout,以触发调用eshop示例程序的REST接口。然后打开 TCM 的界面查看生成的分布式调用跟踪信息。

从上图可以看到,调用链中出现了 Kafka 消费者调用 notification 服务的 sendEmail REST 接口的 Span。从图中可以看到,由于调用链经过了 Kafka 消息,sendEmail Span 的时间没有包含在 checkout Span 中。

总结

Istio 服务网格通过分布式调用跟踪来提高微服务应用的可见性,这需要在应用程序中通过HTTP header传递调用跟踪的上下文。对于 JAVA 应用程序,我们可以使用 OpenTracing Instrumentation 来代替应用编码传递分布式跟踪的相关http header,以减少对业务代码的影响;我们还可以将方法级的调用跟踪和 Kafka 消息的调用跟踪加入到 Istio 生成的调用跟踪链中,以为应用程序的故障定位提供更为丰富详细的调用跟踪信息。

参考资料

[1]

“Active Span”: https://opentracing.io/docs/overview/scopes-and-threading/

[2]

本文中eshop示例程序的源代码: https://github.com/aeraki-framework/method-level-tracing-with-istio

  往期精选推荐  

  • 开工必备!50 篇超实用云原生技术干货合集
  • Istio最佳实践系列:如何实现方法级调用跟踪?
  • 如何在 Istio 中支持 Dubbo、Thrift、Redis 以及任何七层协议?
  • 在 Istio 中实现 Redis 集群的数据分片、读写分离和流量镜像
  • Istio 运维实战系列(1):应用容器对 Envoy Sidecar 的启动依赖问题

0 人点赞