架构
这篇文章会着重分析 其中的 discovery => scrap => storage 的流程
配置
配置有两部分,一部分来自命令行的启动参数,一部分来自 prometheus.yml
来自命令行的配置参数
配置核心的部分分成几块,其中以 web.Options 为重点,比如 notifier.Options
等其他配置在初始化的过程中最终会被转换为 web.Options
中的一部分。
- configFile: prometheus.yml 文件路径
- storage: 本地存储配置,可配置的有
- localStoragePath: 数据存储位置
- WALSegmentSize
- newFlagRetentionDuration 存储时间
- NoLockfile/ AllowOverlappingBlocks/ WALCompression: lock 压缩等
- RemoteFlushDeadline: shutdown或者reload等时候需要 flush,flush超时时间
- RemoteReadSampleLimit/ RemoteReadConcurrencyLimit/ RemoteReadBytesInFrame: Remote其他配置 sample数量、并发、大小等
- alert
- outageTolerance/ forGracePeriod/ resendDelay: alert 触发相关
- notifier.QueueCapacity/notifierTimeout: notifier相关
- query
- lookbackDelta: maximum lookback duratio
- queryTimeout/queryConcurrency/queryMaxSamples: 超时/并发/最大数量
- web: prometheus 作为一个 web server的配置
- web.ListenAddress/ webTimeout/ MaxConnections/ prometheusURL/ web.RoutePrefix/ web.UserAssetsPath 基本 web参数
- web.EnableLifecycle/ web.EnableAdminAPI/ web.ConsoleTemplatesPath/ web.ConsoleLibrariesPath
- web.PageTitle/ corsRegexString/
// prometheus/prometheus/cmd/prometheus/main.go
来自 prometheus.yml 的配置参数
来自 prometheus.yml 的参数对大部分组件是 reloadable
的,实现方式为 这些组件都实现了一个 ApplyConfig(*config.Config) error
的函数, 比如:
- remoteStorage: 远程存储。支持比如 influx db/ open tsdb作为存储后端的核心
- webHandler: web 核心
- scrapeManager: 指标抓取
- discoveryManagerScrape:发现抓取目标
- discoveryManagerNotify:发现 alert 推送目标
- ruleManager: 抓取规则(recording / alerting rules)管理
- notifierManager: 分发 alert notifications 到 alert manager
也就是 当 prometheus.yml 本更新之后,这些组件无需重启(SIGHUP,web触发)就可以得到更新
配置的格式参考这里, 这里我们做简要的解释。
代码语言:txt复制global:
# 这两个是全局的刮取间隔以及超时设置
# How frequently to scrape targets by default.
[ scrape_interval: <duration> | default = 1m ]
# How long until a scrape request times out.
[ scrape_timeout: <duration> | default = 10s ]
# 多久evaluate rules一次
[ evaluation_interval: <duration> | default = 1m ]
# 全局的 external label, 当 prometheus (federation, remote storage, Alertmanager)
# 和外部交互的时候很有用 . 举个例子:当多个prometheus数据聚合到同一个federation prometheus
# 或者 remote storage 的时候,可以加一个 id/cluster label作为区分
external_labels:
[ <labelname>: <labelvalue> ... ]
# 规则文件路径,规则分为两种,一种叫 recoding rule, 另一种叫 alter rule
# 他们都是 隔一段时间内部 evaluate 一次,生成新的 metrics 或者 产生 alter notification
rule_files:
[ - <filepath_glob> ... ]
# 抓取配置,这个是配置文件的大头,各种抓取规则都在这里,下面会分解细讲
scrape_configs:
[ - <scrape_config> ... ]
# Alertmanager相关的配置
alerting:
alert_relabel_configs:
[ - <relabel_config> ... ]
alertmanagers:
[ - <alertmanager_config> ... ]
# remote_write/read 相关的配置
remote_write:
[ - <remote_write> ... ]
remote_read:
[ - <remote_read> ... ]
scrape_configs
代码语言:txt复制# 1. 每一个抓取任务叫一个 job,通常配置里面会有多个抓取任务,比如抓取node监控,server监控等
job_name: <job_name>
# 2. 对于这个job设置的抓取间隔和超时时间
[ scrape_interval: <duration> | default = <global_config.scrape_interval> ]
[ scrape_timeout: <duration> | default = <global_config.scrape_timeout> ]
# 3. 这个job的抓取路径
[ metrics_path: <path> | default = /metrics ]
[ scheme: <scheme> | default = http ]
params:
[ <string>: [<string>, ...] ]
# 4. 其他抓取相关的配置略,主要是抓取鉴权/代理相关,配置 prometheus 如何发送抓取请求
basic_auth/ bearer_token/ bearer_token_file/ tls_config/ proxy_url
# 5. 当 抓取到的指标 label 和 服务端生成的 label产生冲突的时候如何处理,服务端生成的 label
# 包括 job / instance/ 配置 labels, labels 和 service discovery (sd) implementations 生成的 label;
# 最后一种很重要也很容易让人迷惑,后面会具体的讲一下 sd 是如何生成和配置的
[ honor_labels: <boolean> | default = false ]
# 是否完全使用 抓取到的指标中的时间戳
[ honor_timestamps: <boolean> | default = true ]
# 6. sd 相关的配置,由于 prometheus 是主动抓取,而抓取目标往往是快速变化的,比如一个容器,他的生命周期可能很短
# 那么就存在一个如何自动发现抓取目标,已经在抓取数据上添加各种 [元Label] 的问题
# sd 就是为这个产生,根据抓取目标、服务发现机制的不同,sd 有多种实现,下面 discovery 会我们把用的比较多的
# kubernetes_sd_configs 细讲一下,其他(azure_sd_configs/consul_sd_configs/dns_sd_configs等等)略
kubernetes_sd_configs:
[ - <kubernetes_sd_config> ... ]
# 这里面的主要参数就是 api_server/role/namespaces,其他鉴权相关略
# kubernetes api server 地址,不填就是 incluster config
[ api_server: <host> ]
# 角色 见discover一节,支持endpoints, service, pod, node, or ingress
role: <role>
# 抓取对象的 namespace,不填就是所有空间
namespaces:
names:
[ - <string> ]
# 7. 固定的静态配置的 label
static_configs:
[ - <static_config> ... ]
# 8. 这部分和 如何 抓取/保存 metrics 数据有关系,也比较令人困惑,下面的 scrape 一节会细讲
# 注意这里 relabel_configs,metric_relabel_configs 都是用的 relabel_config,不同的是 relabel_configs
# 还会影响如何抓取(在 scrape 之前)的动作,而 metric_relabel_configs 只会影响 抓取之后的 存储
relabel_configs:
[ - <relabel_config> ... ]
# 静态配置的地址
targets:
[ - '<host>' ]
# Labels assigned to all metrics scraped from the targets.
labels:
[ <labelname>: <labelvalue> ... ]
# List of metric relabel configurations.
metric_relabel_configs:
[ - <relabel_config> ... ]
# Per-scrape limit on number of scraped samples that will be accepted.
# If more than this number of samples are present after metric relabelling
# the entire scrape will be treated as failed. 0 means no limit.
[ sample_limit: <int> | default = 0 ]
Discovery
一个sd config对应一个 provider, 下面等代码以 kubernetes sd 为例. 从 discover.Manager 里面启动,读取 sd config的部分
代码语言:txt复制// Run implements the discoverer interface.
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group)
discover 更新流程
- 根据 Role 的不同,启动不同的封装, 比如 Role endpoint会启动 endpointsInformer; serviceInformer; podInformer; 也就是说对于这个 namespace的 endpoint; service; pod 都会 watch
- 从下面的代码可以看出,对于每个 provider 都要run 一次,也就是要 watch k8s api一次,而NewSharedInformer 其实什么也没 share,会不会造成对 k8s api的压力,和重复对缓存 ?答案是不会,因为如果 provider的配置相同在上一步就被过滤了。
// prometheus/prometheus/discovery/targetgroup/targetgroup.go
type Group struct {
// Targets is a list of targets identified by a label set. Each target is
// uniquely identifiable in the group by its address label.
Targets []model.LabelSet
// Labels is a set of labels that is common across all targets in the group.
Labels model.LabelSet
// Source is an identifier that describes a group of targets.
Source string
}
// prometheus/prometheus/discovery/kubernetes/kubernetes.go
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
switch d.role {
case RoleEndpoint:
for _, namespace := range namespaces {
// ....
eps := NewEndpoints(
log.With(d.logger, "role", "endpoint"),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
cache.NewSharedInformer(elw, &apiv1.Endpoints{}, resyncPeriod),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
)
d.discoverers = append(d.discoverers, eps)
go eps.endpointsInf.Run(ctx.Done())
go eps.serviceInf.Run(ctx.Done())
go eps.podInf.Run(ctx.Done())
- watch 之后的变化根据 role 类型会做转化,但是最终转化为的东西叫
Group
,Group
的定义非常简单,就是一组label
; 那么怎么体现出发现
这个过程呢。核心就在一些特殊的 label
里面。
下面以 service 为例子
Label | 含义 |
---|---|
| service 的地址, 比如 aa.default.svc:80; 这个 label 更为特殊,决定了具体 scrape 的目标地址 |
| service 端口名 |
| service 协议 |
| serivce 名字 |
| serivce cluster ip |
| serivce external name |
| namespace 名字 |
| 其他来自 service 的 labels 的 label |
| true |
| 其他来自 service 的 annotation 的 label |
| true |
- 除了
__address__
其实还有__scheme__
和__metrics_path__
会影响抓取目标,但是因为 relabel 的灵活性,可以在上面的 任何一个 label 里面(可能是 pod 的label)得到 相关的信息,在relabel的过程中写入到__scheme__
和__metrics_path__
就可以了 - 对于 role 为endpoint,
__address__
是 endpoint 地址,同时还会把 关联的 pod 和 service label都加上去。 - role 为 node 的
__address__
是各个node对应的 kubelet 地址 - role 为 pod 的
__address__
是 podip container port,同时会被其 contoller的 name/id 也加上去 - 所以可以看出 role 的不同,影响是抓取对象地址 以及对应数据上的
meta_label
; 区别并不大,所以在很多配置上 可以看到 抓取的 role 全都是endpoint
, 因为这种 role 已经能把大部分目标都囊括了 (endpoint 接近 pod,同时 可以体现 service 和 node, 比如用 kubelet 的 endpoint 就抓取了 node 目标,只是有部分 label的差异,比如 pod 有 pod ready的相关label) - 官方介绍
Scrape
Scrape 的核心是 scrapePool
- scrapePools => scrapePool, 这部分由 discovery 发现的
targetSets map[string][]*targetgroup.Group
触发生成并 定期 reload, 每一个scrape_configs
对应了一个 scrapePool, key 都是job_name
- 生成 target 的过程会 populateLabels(/scrape/target.go),这个过程 就会做 relabel 操作, 同时一些特殊的配置也变成label了 方便后面统一处理
- scrapePool.sync 中 对 targets中的每一个 target 都生成了一个 loop
- scrapeloop 中由几个缓存,一种
pool(buffer)
是缓存各种大小的 []byte, 避免频繁内存申请;另一种是scrapeCache
scrapeCache
保存 metric string和 label sets的关系,同时跟踪 scrape 之间的 staleness of series
relabel
- label 在 prometheus 被广泛的使用,在prometheus甚至任何数据都看看成是一组 label.
- relabel 的动作不仅会影响数据的抓取 (target address / filtering targets)也会影响数据的保存 (filtering data / label-modification)
// prometheus/scrape/scrape.go
// 抓取数据
func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error)
=>
// 通过 append 保存
func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error)
=>
// 修改 label
// 1. lset是数据中的 label
// 2. target 里面有 配置的, discover出来的 label,经过了 relabel_configs 的修改
// 3. rc 是 metric_relabel_configs,这里再次 relabel process
// 这里有个潜规则,label 修改为空了,那么表示 metrics 都不要了,整体丢弃
func mutateSampleLabels(lset labels.Labels, target *Target, honor bool, rc []*relabel.Config) labels.Labels
relabel 部分的配置格式
代码语言:txt复制# 来源 labels (label key),多个用, 分隔 => 用于 replace, keep, and drop
[ source_labels: '[' <labelname> [, ...] ']' ]
# label value 连接的分隔符
[ separator: <string> | default = ; ]
# 生成的 label 名,用于 replace 必填
[ target_label: <labelname> ]
# 用于提取 source label 内容的正则表达式,默认 (.*)
[ regex: <regex> | default = (.*) ]
# hash 使用
[ modulus: <uint64> ]
# 选择 regex 使用,用于 replace
[ replacement: <string> | default = $1 ]
# Action,可用 replace;keep;drop;hashmod;labelmap;labeldrop;labelkeep
[ action: <relabel_action> | default = replace ]
relabel 的处理功能强大,但是代码却十分简洁
action | 类别 | 作用 |
---|---|---|
Drop | 抓取动作 | Regex 匹配 label value, 匹配则丢弃数据 |
Keep | 抓取动作 | Regex 匹配 label value, 不匹配则丢弃数据 |
Replace | 修改 label: 增删改 | Regex 匹配 label value,根据TargetLabel和Replacement模板生成新 label 对修改 |
HashMod | 修改 label: 增 | target label: hash(source label value) |
LabelMap | 修改 label: 改 | 匹配 label key, 修改 label key, |
LabelDrop | 修改 label: 删 | 匹配 label key 则丢弃这个 label |
LabelKeep | 修改 label: 删 | 不匹配 label key 则丢弃这个 label |
// prometheus/pkg/relabel/relabel.go
func Process(labels labels.Labels, cfgs ...*Config) labels.Labels {
for _, cfg := range cfgs {
labels = relabel(labels, cfg)
if labels == nil {
return nil
}
}
return labels
}
func relabel(lset labels.Labels, cfg *Config) labels.Labels {
// 省略部分代码,val 是 label key 对应的 label value 连接
val := strings.Join(values, cfg.Separator)
lb := labels.NewBuilder(lset)
switch cfg.Action {
case Drop: if cfg.Regex.MatchString(val) return nil
case Keep: if !cfg.Regex.MatchString(val) return nil
case Replace: lb[TargetLabel(val)] = Replacement(val)
case HashMod: lb.Set(cfg.TargetLabel, fmt.Sprintf("%d", mod := sum64(md5.Sum([]byte(val))) % cfg.Modulus))
case LabelMap: for l in lset { if cfg.Regex.MatchString(l.Name) { lb.Set(cfg.Regex.ReplaceAllString(l.Name, cfg.Replacement), l.Value) } }
case LabelDrop: for l in lset { if cfg.Regex.MatchString(l.Name) { lb.Del(l.Name) } }
case LabelKeep: for l in lset { if !cfg.Regex.MatchString(l.Name) { lb.Del(l.Name) } }
}
return lb.Labels()
}
- 举个使用得比较多的 keep 和 replace 的例子, 下面这个例子表示: 满足条件
endpoint.service.labels["k8s_app"] == "kube-state-metrics" && endpoint.port_name == "http-main"
的 target 才会被抓取, 并且会设置 labelnamespace=endpoint.namespace
relabel_configs:
- source_labels: [__meta_kubernetes_service_label_k8s_app]
separator: ;
regex: kube-state-metrics
replacement: $1
action: keep
- source_labels: [__meta_kubernetes_endpoint_port_name]
separator: ;
regex: http-main
replacement: $1
action: keep
- source_labels: [__meta_kubernetes_namespace]
separator: ;
regex: (.*)
target_label: namespace
replacement: $1
action: replace
Storage
参考
- fanout storage 通过一个
Appendable
的interface 暴露给 scrape,Appendable
返回Appender
, fanout storage 实现fanoutAppender
,底层委托给local storage
和remote storage
, 分别为 primary, 和 secondaries (secondary可以有多个) - fanoutAppender 的逻辑很简单,就是依次对 primary, secondaries storage 执行对应 interface的操作
- 查询的时候 也是有个
mergeQuerier
对primaryQuerier
和 其他queriers
进行包装
type Appender interface {
Add(l labels.Labels, t int64, v float64) (uint64, error)
AddFast(l labels.Labels, ref uint64, t int64, v float64) error
// Commit submits the collected samples and purges the batch.
Commit() error
Rollback() error
}
// Querier provides reading access to time series data.
type Querier interface {
// Select returns a set of series that matches the given label matchers.
Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error)
// LabelValues returns all potential values for a label name.
LabelValues(name string) ([]string, Warnings, error)
// LabelNames returns all the unique label names present in the block in sorted order.
LabelNames() ([]string, Warnings, error)
// Close releases the resources of the Querier.
Close() error
}