github:https://github.com/prometheus/client_python
安装 prometheus_client
使用 pip 工具可以非常方便地安装 prometheus_client:
代码语言:javascript复制pip install prometheus-client
基本使用介绍
prometheus_client 提供了丰富的 API,可以用于定义和注册 metrics,并根据需要暴露这些 metrics 的接口。
代码语言:javascript复制from prometheus_client import Counter, Gauge, Summary, Histogram, start_http_server
# 定义和注册 metric
c = Counter('test_counter', '测试计数器')
g = Gauge('test_gauge', '测试仪表盘')
s = Summary('test_summary', '测试摘要')
h = Histogram('test_histogram', '测试直方图', buckets=(1, 2, 3))
# 计数器自增
c.inc()
# 仪表盘设置值
g.set(42)
# 摘要和直方图设置数值
s.observe(1.2)
h.observe(4.2)
# 启动 HTTP 服务器,暴露 metrics 接口
start_http_server(8080)
以上代码中,我们首先定义了四个不同类型的 metric(计数器、仪表盘、摘要和直方图),然后分别对它们进行了操作,比如计数器进行了自增操作,仪表盘设置了值,摘要和直方图设置了观察值。最后,我们调用了 start_http_server()
函数,将 metrics 接口暴露出来,以便外部程序可以访问。
应用实例
收集 CPU 使用率指标
下面的示例代码可以用来收集 CPU 的使用率指标:
代码语言:javascript复制from prometheus_client import Counter, Gauge, Summary, Histogram, start_http_server
import psutil
import time
# 定义和注册指标
cpu_percent = Gauge('cpu_percent', 'CPU 使用率百分比')
cpu_freq_current = Gauge('cpu_freq_current', 'CPU 当前频率')
cpu_freq_min = Gauge('cpu_freq_min', 'CPU 最小频率')
cpu_freq_max = Gauge('cpu_freq_max', 'CPU 最大频率')
# 获取 CPU 频率信息
cpu_freq = psutil.cpu_freq()
# 设置初始值
cpu_freq_current.set(cpu_freq.current)
cpu_freq_min.set(cpu_freq.min)
cpu_freq_max.set(cpu_freq.max)
# 启动 HTTP 服务器,暴露 metrics 接口
start_http_server(8080)
while True:
# 收集 CPU 使用率指标
cpu_percent.set(psutil.cpu_percent())
# 收集 CPU 频率指标
cpu_freq = psutil.cpu_freq()
cpu_freq_current.set(cpu_freq.current)
# 等待 1 秒钟,再次进行收集
time.sleep(1)
在以上代码中,我们使用 psutil 库获取 CPU 使用率和频率等信息,并将它们作为指标进行收集。每次循环执行时,我们都会将当前的指标值设置到相应的 metric 中,并等待一秒钟之后再次收集。
收集自定义指标
prometheus_client 不仅可以收集系统级别的指标,还可以方便地收集自定义的指标。
下面的示例代码演示了如何收集一个随机数指标:
代码语言:javascript复制from prometheus_client import Gauge, start_http_server
import random
# 定义和注册指标
random_value = Gauge('random_value', '随机数指标')
# 启动 HTTP 服务器,暴露 metrics 接口
start_http_server(8080)
while True:
# 生成 0 到 100 的随机数,并设置到指标中
random_value.set(random.randint(0, 100))
# 等待 5 秒钟,再次进行收集
time.sleep(5)
以上代码中,我们定义了一个随机数指标,每隔 5 秒钟会生成一个随机数,并将其设置到指标中。然后,我们又将这个指标通过 start_http_server()
接口暴露出来,以便其他程序可以方便地访问。
通过以上两个示例,我们可以看到 prometheus_client 灵活的 API,可以轻松地实现各种不同类型和不同维度的指标收集和暴露。
Python封装
- monitor.py
# encoding: utf-8
from prometheus_client import Counter, Gauge, Summary
from prometheus_client.core import CollectorRegistry
from prometheus_client.exposition import choose_encoder
class Monitor:
def __init__(self):
# 注册收集器&最大耗时map
self.collector_registry = CollectorRegistry(auto_describe=False)
self.request_time_max_map = {}
# 接口调用summary统计
self.http_request_summary = Summary(name="http_server_requests_seconds",
documentation="Num of request time summary",
labelnames=("method", "code", "uri"),
registry=self.collector_registry)
# 接口最大耗时统计
self.http_request_max_cost = Gauge(name="http_server_requests_seconds_max",
documentation="Number of request max cost",
labelnames=("method", "code", "uri"),
registry=self.collector_registry)
# 请求失败次数统计
self.http_request_fail_count = Counter(name="http_server_requests_error",
documentation="Times of request fail in total",
labelnames=("method", "code", "uri"),
registry=self.collector_registry)
# 模型预测耗时统计
self.http_request_predict_cost = Counter(name="http_server_requests_seconds_predict",
documentation="Seconds of prediction cost in total",
labelnames=("method", "code", "uri"),
registry=self.collector_registry)
# 图片下载耗时统计
self.http_request_download_cost = Counter(name="http_server_requests_seconds_download",
documentation="Seconds of download cost in total",
labelnames=("method", "code", "uri"),
registry=self.collector_registry)
# 获取/metrics结果
def get_prometheus_metrics_info(self, handler):
encoder, content_type = choose_encoder(handler.request.headers.get('accept'))
handler.set_header("Content-Type", content_type)
handler.write(encoder(self.collector_registry))
self.reset_request_time_max_map()
# summary统计
def set_prometheus_request_summary(self, handler):
self.http_request_summary.labels(handler.request.method, handler.get_status(), handler.request.path).observe(handler.request.request_time())
self.set_prometheus_request_max_cost(handler)
# 自定义summary统计
def set_prometheus_request_summary_customize(self, method, status, path, cost_time):
self.http_request_summary.labels(method, status, path).observe(cost_time)
self.set_prometheus_request_max_cost_customize(method, status, path, cost_time)
# 失败统计
def set_prometheus_request_fail_count(self, handler, amount=1.0):
self.http_request_fail_count.labels(handler.request.method, handler.get_status(), handler.request.path).inc(amount)
# 自定义失败统计
def set_prometheus_request_fail_count_customize(self, method, status, path, amount=1.0):
self.http_request_fail_count.labels(method, status, path).inc(amount)
# 最大耗时统计
def set_prometheus_request_max_cost(self, handler):
requset_cost = handler.request.request_time()
if self.check_request_time_max_map(handler.request.path, requset_cost):
self.http_request_max_cost.labels(handler.request.method, handler.get_status(), handler.request.path).set(requset_cost)
self.request_time_max_map[handler.request.path] = requset_cost
# 自定义最大耗时统计
def set_prometheus_request_max_cost_customize(self, method, status, path, cost_time):
if self.check_request_time_max_map(path, cost_time):
self.http_request_max_cost.labels(method, status, path).set(cost_time)
self.request_time_max_map[path] = cost_time
# 预测耗时统计
def set_prometheus_request_predict_cost(self, handler, amount=1.0):
self.http_request_predict_cost.labels(handler.request.method, handler.get_status(), handler.request.path).inc(amount)
# 自定义预测耗时统计
def set_prometheus_request_predict_cost_customize(self, method, status, path, cost_time):
self.http_request_predict_cost.labels(method, status, path).inc(cost_time)
# 下载耗时统计
def set_prometheus_request_download_cost(self, handler, amount=1.0):
self.http_request_download_cost.labels(handler.request.method, handler.get_status(), handler.request.path).inc(amount)
# 自定义下载耗时统计
def set_prometheus_request_download_cost_customize(self, method, status, path, cost_time):
self.http_request_download_cost.labels(method, status, path).inc(cost_time)
# 校验是否赋值最大耗时map
def check_request_time_max_map(self, uri, cost):
if uri not in self.request_time_max_map:
return True
if self.request_time_max_map[uri] < cost:
return True
return False
# 重置最大耗时map
def reset_request_time_max_map(self):
for key in self.request_time_max_map:
self.request_time_max_map[key] = 0.0
调用
代码语言:javascript复制import tornado
import tornado.ioloop
import tornado.web
import tornado.gen
from datetime import datetime
from tools.monitor import Monitor
global g_monitor
class ClassifierHandler(tornado.web.RequestHandler):
def post(self):
# TODO Something you need
# work....
# 统计Summary,包括请求次数和每次耗时
g_monitor.set_prometheus_request_summary(self)
self.write("OK")
class PingHandler(tornado.web.RequestHandler):
def head(self):
print('INFO', datetime.now(), "/ping Head.")
g_monitor.set_prometheus_request_summary(self)
self.write("OK")
def get(self):
print('INFO', datetime.now(), "/ping Get.")
g_monitor.set_prometheus_request_summary(self)
self.write("OK")
class MetricsHandler(tornado.web.RequestHandler):
def get(self):
print('INFO', datetime.now(), "/metrics Get.")
g_monitor.set_prometheus_request_summary(self)
# 通过Metrics接口返回统计结果
g_monitor.get_prometheus_metrics_info(self)
def make_app():
return tornado.web.Application([
(r"/ping?", PingHandler),
(r"/metrics?", MetricsHandler),
(r"/work?", ClassifierHandler)
])
if __name__ == "__main__":
g_monitor = Monitor()
app = make_app()
app.listen(port)
tornado.ioloop.IOLoop.current().start()
Metrics返回结果实例
参考:https://pythonjishu.com/ldkilffcsapzipi/ https://www.cnblogs.com/alioth01/p/14363574.html