基于FastAPI/Celery/loguru实现全链路日志追踪功能

2023-08-26 15:08:08 浏览数 (3)

背景

在我们的系统里,已经记录了很多的日志,但是问题是这些日志很鸡肋,当需要定位问题的时候,根本很难区分,哪些日志是一起的,而且因为我们的系统大都是一些耗时的任务,不同请求的任务日志都交叉混在一起,更加加剧了这个问题。因此生产系统上,这些日志很难利用起来。

目标

目标主要有三个:
  1. 能够实现日志的全链路跟踪,这样出了问题之后,才能根据任务ID或者请求ID之类的快速找到相关的日志,提升定位问题的效率;
  2. 尽量是非侵入式的,尽量少改动系统原有的代码,不然就会变得很复杂;
  3. 能同时追踪FastAPI接口逻辑及Celery任务的日志。

实现

在网络上找了一些Python全链路日志实现方式,有一个和我想要的是比较接近的:https://zhuanlan.zhihu.com/p/432010113

总体实现logger.py

代码语言:javascript复制
import os
from uuid import uuid4
from loguru import logger
from contextvars import ContextVar
# 使用任务request_id来实现全链路日志追踪
_trace_id: ContextVar[str] = ContextVar('x_trace_id', default="")           # 任务ID
_x_request_id: ContextVar[str] = ContextVar('x_request_id', default="")     # 请求ID


class TraceID:
    """全链路追踪ID"""

    @staticmethod
    def set(req_id: str) -> ContextVar[str]:
        """设置请求ID,外部需要的时候,可以调用该方法设置
        Returns:
            ContextVar[str]: _description_
        """
        if req_id:
            req_id = uuid4().hex
        _x_request_id.set(req_id)
        return _x_request_id

    @staticmethod
    def set_trace(id: str, title: str = "task") -> ContextVar[str]:
        """设置全链路追踪ID
        Returns:
            ContextVar[str]: _description_
        """
        id = f"{title}:{id}"
        _trace_id.set(id)
        return _trace_id


def _logger_filter(record):
    record['trace_msg'] = f"{_x_request_id.get()} | {_trace_id.get()}"
    return record['trace_msg']


# 日志配置  公共参数

params = {
    "rotation": "500 MB", "encoding": 'utf-8', "enqueue": True, "backtrace": True, 
    "filter": _logger_filter,
    "format": "{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {trace_msg} | {name}:{function}:{line} - {message}",
}
# 去除默认控制台输出
logger.remove()
logger.add("inf.log", level='INFO', retention="90 days", **params)    # info日志只保留90天

__all__ = ["logger"]

(以上只是示例代码)

实现要点:

  1. 基于contextvars这个包实现上下文功能;
  2. 利用loguru包的filter功能,实现非侵入式的动态参数的注入;
  3. 同时追踪web请求ID及业务ID。

不过这也不是完全非侵入式的,还是要做一些简单的改动,如在FastAPI接口入口处,增加中间件:

代码语言:javascript复制
@app.middleware("http")
async def set_logger(request: Request, call_next):
    # 设置日志的全链路追踪
    REQUEST_ID_KEY = "X-Request-Id"
    _req_id_val = request.headers.get(REQUEST_ID_KEY, "")
    req_id = TraceID.set(_req_id_val)
    response: Response = await call_next(request)
    response.headers[REQUEST_ID_KEY] = req_id.get()
    return response

而在业务代码执行的源头(或者接近源头的位置)加上一句:

代码语言:javascript复制
TraceID.set_trace(task_id)

可以根据业务的特征设置对应的追踪ID。

效果


这样使用logger记录日志的时候,就会自动将相应的ID带上,记录日志的时候也原来一样,基本上实现了非侵入式的全链路日志追踪,对定位问题是大为有利。

使用最简单的grep就能直接把相关日志找出来:

高级一点的,可以统一收集到ES进行跨服务的日志检索。

0 人点赞