CAT中实现异步请求的调用链查看

2022-04-22 11:37:19 浏览数 (1)

CAT简介

CAT(Central Application Tracking),是美团点评基于 Java 开发的一套开源的分布式实时监控系统。美团点评基础架构部希望在基础存储、高性能通信、大规模在线访问、服务治理、实时监控、容器化及集群智能调度等领域提供业界领先的、统一的解决方案,CAT 目前在美团点评的产品定位是应用层的统一监控组件,在中间件(RPC、数据库、缓存、MQ 等)框架中得到广泛应用,为各业务线提供系统的性能指标、健康状况、实时告警等服务。

准备工作

对于同步请求API,CAT服务端自然是可以看到的。同步请求API的实例可以参考之前的文章《SpringBoot集成CAT调用链实例》。但对于异步请求API,因为不在同一线程中,在子线程中无法获取到父线程消息树,所以在CAT服务端是无法看到的对应请求。

首先,写一个类实现Cat.Context接口,用于存放消息树的上下文信息:

代码语言:javascript复制
public class CatContext implements Cat.Context {

    private Map<String, String> properties = new HashMap<>();

    @Override
    public void addProperty(String key, String value) {
        properties.put(key, value);
    }

    @Override
    public String getProperty(String key) {
        return properties.get(key);
    }

    @Override
    public String toString() {
        return "CatContext{"
                  "properties="   properties   '}';
    }
}

我们可以先父线程消息树的上下文信息保存下来,然后在子线程使用。先写一个存放上下文信息的地方:

代码语言:javascript复制
public class ContextWarehouse {
    private static ThreadLocal<CatContext> contextThreadLocal = new ThreadLocal();

    public static void setContext(final CatContext context) {
        contextThreadLocal.set(context);
    }

    public static CatContext getContext() {
        //xian 从ContextWarehouse中获取上下文信息
        CatContext context = contextThreadLocal.get();
        if (context == null) {
            context = new CatContext();
            Cat.logRemoteCallClient(context);
        }
        return context;
    }
}

实现Callable接口,创建一个自定义的类,实现了在子线程中存放父线程的上下文信息的功能:

代码语言:javascript复制
public class OneMoreCallable<V> implements Callable<V> {

    private CatContext catContext;

    private Callable<V> callable;

    public DdCallable(final Callable<V> callable) {
        this.callable = callable;
        this.catContext = new CatContext();
        //获取父线程消息树的上下文信息
        Cat.logRemoteCallClient(this.catContext);
    }

    @Override
    public V call() throws Exception {
        //保存父线程消息树的上下文信息到子线程
        ContextWarehouse.setContext(this.catContext);
        return callable.call();
    }
}

定义一些常量,在调用API时作为header中的key:

代码语言:javascript复制
public class CatHttpConstants {
    public static final String CAT_HTTP_HEADER_CHILD_MESSAGE_ID = "DD-CAT-CHILD-MESSAGE-ID";
    public static final String CAT_HTTP_HEADER_PARENT_MESSAGE_ID = "DD-CAT-PARENT-MESSAGE-ID";
    public static final String CAT_HTTP_HEADER_ROOT_MESSAGE_ID = "DD-CAT-ROOT-MESSAGE-ID";
}

埋点时,在调用API的HttpClient工具类中统一增加代码,以GET方式为例:

代码语言:javascript复制
public class HttpClientUtil {
    public static String doGet(String url) throws IOException {
        HttpGet httpGet = new HttpGet(url);
        CloseableHttpResponse response = null;
        CloseableHttpClient httpClient = HttpClientBuilder.create().build();
        String content = null;
        Transaction t = Cat.newTransaction(CatConstants.TYPE_CALL, url);
        try {
            CatContext context = ContextWarehouse.getContext();
            httpGet.setHeader(CatHttpConstants.CAT_HTTP_HEADER_ROOT_MESSAGE_ID, context.getProperty(Cat.Context.ROOT));
            httpGet.setHeader(CatHttpConstants.CAT_HTTP_HEADER_PARENT_MESSAGE_ID, context.getProperty(Cat.Context.PARENT));
            httpGet.setHeader(CatHttpConstants.CAT_HTTP_HEADER_CHILD_MESSAGE_ID, context.getProperty(Cat.Context.CHILD));

            response = httpClient.execute(httpGet);
            if (response.getStatusLine().getStatusCode() == 200) {
                content = EntityUtils.toString(response.getEntity(), "UTF-8");
                t.setStatus(Transaction.SUCCESS);
            }
        } catch (Exception e) {
            Cat.logError(e);
            t.setStatus(e);
            throw e;
        } finally {
            if (response != null) {
                response.close();
            }
            if (httpClient != null) {
                httpClient.close();
            }
            t.complete();
        }
        return content;
    }
}

异步请求实例

下面写一个异步请求的实例,通过多个商品ID异步获取对应的商品详细信息:

代码语言:javascript复制
public class ProductService {

    /**
     * 声明一个大小固定为10的线程池
     */
    private static ExecutorService executor = Executors.newFixedThreadPool(10);

    /**
     * 通过商品ID列表异步获取对应的商品详细信息
     *
     * @param productIds 商品ID列表
     * @return 对应的商品详细信息
     */
    public List<String> findProductInfo(List<Long> productIds) {
        List<Future<String>> futures = new ArrayList<>();
        for (Long productId : productIds) {
            futures.add(executor.submit(new DdCallable(() -> {
                try {
                    //调用获取商品详细信息的API
                    return HttpClientUtil.doGet("http://api.product/get?id="   productId);
                } catch (Exception e) {
                    return "";
                }
            })));
        }

        List<String> productInfos = new ArrayList<>();
        for (Future<String> future : futures) {
            try {
                //异步获取对应商品详细信息
                productInfos.add(future.get());
            } catch (Exception e) {
                productInfos.add("");
            }
        }
        return productInfos;
    }
}

这样写以后,在CAT服务端的Transaction报表中就可以查看到异步请求了。

0 人点赞