什么是Rule
Prometheus支持用户自定义Rule规则。Rule分为两类,一类是Recording Rule,另一类是Alerting Rule。Recording Rule的主要目的是通过PromQL可以实时对Prometheus中采集到的样本数据进行查询,聚合以及其它各种运算操作。而在某些PromQL较为复杂且计算量较大时,直接使用PromQL可能会导致Prometheus响应超时的情况。这时需要一种能够类似于后台批处理的机制能够在后台完成这些复杂运算的计算,对于使用者而言只需要查询这些运算结果即可。Prometheus通过Recoding Rule规则支持这种后台计算的方式,可以实现对复杂查询的性能优化,提高查询效率。
今天主要带来告警规则的分析。Prometheus中的告警规则允许你基于PromQL表达式定义告警触发条件,Prometheus后端对这些触发规则进行周期性计算,当满足触发条件后则会触发告警通知。
什么是告警Rule
告警是prometheus的一个重要功能,接下来从源码的角度来分析下告警的执行流程。
怎么定义告警Rule
一条典型的告警规则如下所示:
代码语言:javascript复制groups:
- name: example
rules:
- alert: HighErrorRate
#指标需要在触发告警之前的10分钟内大于0.5。
expr: job:request_latency_seconds:mean5m{job="myjob"} > 0.5
for: 10m
labels:
severity: page
annotations:
summary: High request latency
description: description info
在告警规则文件中,我们可以将一组相关的规则设置定义在一个group下。在每一个group中我们可以定义多个告警规则(rule)。一条告警规则主要由以下几部分组成:
- alert:告警规则的名称。
- expr:基于PromQL表达式告警触发条件,用于计算是否有时间序列满足该条件。
- for:评估等待时间,可选参数。用于表示只有当触发条件持续一段时间后才发送告警。在等待期间新产生告警的状态为pending。
- labels:自定义标签,允许用户指定要附加到告警上的一组附加标签。
- annotations:用于指定一组附加信息,比如用于描述告警详细信息的文字等,annotations的内容在告警产生时会一同作为参数发送到Alertmanager。
Rule管理器
规则管理器会根据配置的规则,基于规则PromQL表达式告警的触发条件,用于计算是否有时间序列满足该条件。在满足该条件时,将告警信息发送给告警服务。
代码语言:javascript复制type Manager struct {
opts *ManagerOptions //外部的依赖
groups map[string]*Group //当前的规则组
mtx sync.RWMutex //规则管理器读写锁
block chan struct{}
done chan struct{}
restored bool
logger log.Logger
}
- opts(*ManagerOptions类型):记录了Manager实例使用到的其他模块,例如storage模块、notify模块等。
- groups(map[string]*Group类型):记录了所有的rules.Group实例,其中key由rules.Group的名称及其所在的配置文件构成。
- mtx(sync.RWMutex类型):在读写groups字段时都需要获取该锁进行同步。
读取Rule组配置
在Prometheus Server启动的过程中,首先会调用Manager.Update()方法加载Rule配置文件并进行解析,其大致流程如下。
- 调用Manager.LoadGroups()方法加载并解析Rule配置文件,最终得到rules.Group实例集合。
- 停止原有的rules.Group实例,启动新的rules.Group实例。其中会为每个rules.Group实例启动一个goroutine,它会关联rules.Group实例下的全部PromQL查询。
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string) error {
m.mtx.Lock()
defer m.mtx.Unlock()
// 从当前文件中加载规则
groups, errs := m.LoadGroups(interval, externalLabels, externalURL, files...)
if errs != nil {
for _, e := range errs {
level.Error(m.logger).Log("msg", "loading groups failed", "err", e)
}
return errors.New("error loading rules, previous rule set restored")
}
m.restored = true
var wg sync.WaitGroup
//循环遍历规则组
for _, newg := range groups {
// If there is an old group with the same identifier,
// check if new group equals with the old group, if yes then skip it.
// If not equals, stop it and wait for it to finish the current iteration.
// Then copy it into the new group.
//根据新的rules.Group的信息获取规则组名
gn := GroupKey(newg.file, newg.name)
//根据规则组名获取到老的规则组并删除原有的rules.Group实例
oldg, ok := m.groups[gn]
delete(m.groups, gn)
if ok && oldg.Equals(newg) {
groups[gn] = oldg
continue
}
wg.Add(1)
//为每一个rules.Group实例启动一个goroutine
go func(newg *Group) {
if ok {
oldg.stop()
//将老的规则组中的状态信息复制到新的规则组
newg.CopyState(oldg)
}
wg.Done()
// Wait with starting evaluation until the rule manager
// is told to run. This is necessary to avoid running
// queries against a bootstrapping storage.
<-m.block
//调用rules.Group.run()方法,开始周期性的执行PromQl语句
newg.run(m.opts.Context)
}(newg)
}
// Stop remaining old groups.
//停止所有老规则组的服务
wg.Add(len(m.groups))
for n, oldg := range m.groups {
go func(n string, g *Group) {
g.markStale = true
g.stop()
if m := g.metrics; m != nil {
m.IterationsMissed.DeleteLabelValues(n)
m.IterationsScheduled.DeleteLabelValues(n)
m.EvalTotal.DeleteLabelValues(n)
m.EvalFailures.DeleteLabelValues(n)
m.GroupInterval.DeleteLabelValues(n)
m.GroupLastEvalTime.DeleteLabelValues(n)
m.GroupLastDuration.DeleteLabelValues(n)
m.GroupRules.DeleteLabelValues(n)
m.GroupSamples.DeleteLabelValues((n))
}
wg.Done()
}(n, oldg)
}
wg.Wait()
//更新规则管理器中的规则组
m.groups = groups
return nil
}
运行Rule组调度方法
规则组启动流程(Group.run):进入Group.run方法后先进行初始化等待,以使规则的运算时间在同一时刻,周期为g.interval;然后定义规则运算调度方法:iter,调度周期为g.interval;在iter方法中调用g.Eval方法执行下一层次的规则运算调度。
规则运算的调度周期g.interval,由prometheus.yml配置文件中global中的 [ evaluation_interval:| default = 1m ]指定。实现如下:
代码语言:javascript复制func (g *Group) run(ctx context.Context) {
defer close(g.terminated)
// Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.interval)
select {
case <-time.After(time.Until(evalTimestamp))://初始化等待
case <-g.done:
return
}
ctx = promql.NewOriginContext(ctx, map[string]interface{}{
"ruleGroup": map[string]string{
"file": g.File(),
"name": g.Name(),
},
})
//定义规则组规则运算调度算法
iter := func() {
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc()
start := time.Now()
//规则运算的入口
g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)
g.metrics.IterationDuration.Observe(timeSinceStart.Seconds())
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
}
// The assumption here is that since the ticker was started after having
// waited for `evalTimestamp` to pass, the ticks will trigger soon
// after each `evalTimestamp N * g.interval` occurrence.
tick := time.NewTicker(g.interval) //设置规则运算定时器
defer tick.Stop()
defer func() {
if !g.markStale {
return
}
go func(now time.Time) {
for _, rule := range g.seriesInPreviousEval {
for _, r := range rule {
g.staleSeries = append(g.staleSeries, r)
}
}
// That can be garbage collected at this point.
g.seriesInPreviousEval = nil
// Wait for 2 intervals to give the opportunity to renamed rules
// to insert new series in the tsdb. At this point if there is a
// renamed rule, it should already be started.
select {
case <-g.managerDone:
case <-time.After(2 * g.interval):
g.cleanupStaleSeries(ctx, now)
}
}(time.Now())
}()
//调用规则组规则运算的调度方法
iter()
if g.shouldRestore {
// If we have to restore, we wait for another Eval to finish.
// The reason behind this is, during first eval (or before it)
// we might not have enough data scraped, and recording rules would not
// have updated the latest values, on which some alerts might depend.
select {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.interval) - 1
if missed > 0 {
g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
}
evalTimestamp = evalTimestamp.Add((missed 1) * g.interval)
iter()
}
g.RestoreForState(time.Now())
g.shouldRestore = false
}
for {
select {
case <-g.done:
return
default:
select {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.interval) - 1
if missed > 0 {
g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
}
evalTimestamp = evalTimestamp.Add((missed 1) * g.interval)
//调用规则组规则运算的调度方法
iter()
}
}
}
}
运行Rule调度方法
规则组对具体规则的调度在Group.Eval中实现,在Group.Eval方法中会将规则组下的每条规则通过QueryFunc将(promQL)放到查询引擎(queryEngine)中执行,如果被执行的是AlertingRule类型,那么执行结果指标会被NotifyFunc组件发送给告警服务;如果是RecordingRule类型,最后将改结果指标存储到Prometheus的储存管理器中,并对过期指标进行存储标记处理。
代码语言:javascript复制// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *Group) Eval(ctx context.Context, ts time.Time) {
var samplesTotal float64
遍历当前规则组下的所有规则
for i, rule := range g.rules {
select {
case <-g.done:
return
default:
}
func(i int, rule Rule) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "rule")
sp.SetTag("name", rule.Name())
defer func(t time.Time) {
sp.Finish()
//更新服务指标-规则的执行时间
since := time.Since(t)
g.metrics.EvalDuration.Observe(since.Seconds())
rule.SetEvaluationDuration(since)
//记录本次规则执行的耗时
rule.SetEvaluationTimestamp(t)
}(time.Now())
//记录规则运算的次数
g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
//运算规则
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
if err != nil {
//规则出现错误后,终止查询
rule.SetHealth(HealthBad)
rule.SetLastError(err)
//记录查询失败的次数
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
if _, ok := err.(promql.ErrQueryCanceled); !ok {
level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err)
}
return
}
samplesTotal = float64(len(vector))
//判断是否是告警类型规则
if ar, ok := rule.(*AlertingRule); ok {
发送告警
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
}
var (
numOutOfOrder = 0
numDuplicates = 0
)
//此处为Recording获取存储器指标
app := g.opts.Appendable.Appender(ctx)
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
defer func() {
if err := app.Commit(); err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
level.Warn(g.logger).Log("msg", "Rule sample appending failed", "err", err)
return
}
g.seriesInPreviousEval[i] = seriesReturned
}()
for _, s := range vector {
if _, err := app.Append(0, s.Metric, s.T, s.V); err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
switch errors.Cause(err) {
储存指标返回的各种错误码处理
case storage.ErrOutOfOrderSample:
numOutOfOrder
level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
case storage.ErrDuplicateSampleForTimestamp:
numDuplicates
level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
default:
level.Warn(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
}
} else {
//缓存规则运算后的结果指标
seriesReturned[s.Metric.String()] = s.Metric
}
}
if numOutOfOrder > 0 {
level.Warn(g.logger).Log("msg", "Error on ingesting out-of-order result from rule evaluation", "numDropped", numOutOfOrder)
}
if numDuplicates > 0 {
level.Warn(g.logger).Log("msg", "Error on ingesting results from rule evaluation with different value but same timestamp", "numDropped", numDuplicates)
}
for metric, lset := range g.seriesInPreviousEval[i] {
if _, ok := seriesReturned[metric]; !ok {
//设置过期指标的指标值
// Series no longer exposed, mark it stale.
_, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
switch errors.Cause(err) {
case nil:
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if series
// is exposed from a different rule.
default:
level.Warn(g.logger).Log("msg", "Adding stale sample failed", "sample", metric, "err", err)
}
}
}
}(i, rule)
}
if g.metrics != nil {
g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal)
}
g.cleanupStaleSeries(ctx, ts)
}
然后就是规则的具体执行了,我们这里先只看AlertingRule的流程。首先看下AlertingRule的结构:
代码语言:javascript复制// An AlertingRule generates alerts from its vector expression.
type AlertingRule struct {
// The name of the alert.
name string
// The vector expression from which to generate alerts.
vector parser.Expr
// The duration for which a labelset needs to persist in the expression
// output vector before an alert transitions from Pending to Firing state.
holdDuration time.Duration
// Extra labels to attach to the resulting alert sample vectors.
labels labels.Labels
// Non-identifying key/value pairs.
annotations labels.Labels
// External labels from the global config.
externalLabels map[string]string
// true if old state has been restored. We start persisting samples for ALERT_FOR_STATE
// only after the restoration.
restored bool
// Protects the below.
mtx sync.Mutex
// Time in seconds taken to evaluate rule.
evaluationDuration time.Duration
// Timestamp of last evaluation of rule.
evaluationTimestamp time.Time
// The health of the alerting rule.
health RuleHealth
// The last error seen by the alerting rule.
lastError error
// A map of alerts which are currently active (Pending or Firing), keyed by
// the fingerprint of the labelset they correspond to.
active map[uint64]*Alert
logger log.Logger
}
这里比较重要的就是active字段了,它保存了执行规则后需要进行告警的资源,具体是否告警还要执行一系列的逻辑来判断是否满足告警条件。具体执行的逻辑如下:
代码语言:javascript复制func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
res, err := query(ctx, r.vector.String(), ts)
if err != nil {
r.SetHealth(HealthBad)
r.SetLastError(err)
return nil, err
}
// ......
}
这一步通过创建Manager时传入的QueryFunc函数执行规则配置中的expr表达式,然后得到返回的结果,这里的结果是满足表达式的指标的集合。比如配置的规则为:
代码语言:javascript复制cpu_usage > 90
那么查出来的结果可能是
代码语言:javascript复制cpu_usage{instance="192.168.0.11"} 91
cpu_usage{instance="192.168.0.12"} 92
然后遍历查询到的结果,根据指标的标签生成一个hash值,然后判断这个hash值是否之前已经存在(即之前是否已经有相同的指标数据返回),如果是,则更新上次的value及annotations,如果不是,则创建一个新的alert并保存至该规则下的active alert列表中。然后遍历规则的active alert列表,根据规则的持续时长配置、alert的上次触发时间、alert的当前状态、本次查询alert是否依然存在等信息来修改alert的状态。具体规则如下:
- 如果alert之前存在,但本次执行时不存在
- 状态是StatePending或者本次检查时间距离上次触发时间超过15分钟(15分钟为写死的常量),则将该alert从active列表中删除
- 状态不为StateInactive的alert修改为StateInactive
- 如果alert之前存在并且本次执行仍然存在
- alert的状态是StatePending并且本次检查距离上次触发时间超过配置的for持续时长,那么状态修改为StateFiring
- 其余情况修改alert的状态为StatePending
上面那一步只是修改了alert的状态,但是并没有真正执行发送告警操作。下面才是真正要执行告警操作:
代码语言:javascript复制// 判断规则是否是alert规则,如果是则发送告警信息(具体是否真正发送由ar.sendAlerts中的逻辑判断)
if ar, ok := rule.(*AlertingRule); ok {
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
}
// .......
func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) {
if alert.needsSending(ts, resendDelay) {
alert.LastSentAt = ts
// Allow for two Eval or Alertmanager send failures.
delta := resendDelay
if interval > resendDelay {
delta = interval
}
alert.ValidUntil = ts.Add(4 * delta)
anew := *alert
alerts = append(alerts, &anew)
}
})
notifyFunc(ctx, r.vector.String(), alerts...)
}
func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool {
if a.State == StatePending {
return false
}
// if an alert has been resolved since the last send, resend it
if a.ResolvedAt.After(a.LastSentAt) {
return true
}
return a.LastSentAt.Add(resendDelay).Before(ts)
}
概括一下以上逻辑就是:
- 如果alert的状态是StatePending,则不发送告警
- 如果alert的已经被解决,那么再次发送告警标记该条信息已经被解决
- 如果当前时间距离上次发送告警的时间大于配置的重新发送延时时间(ResendDelay),则发送告警,否则不发送
以上就是prometheus的告警流程。学习这个流程主要是问了能够对prometheus的rules相关的做二次开发。我们可以修改LoadGroups()方法,让其可以动态侧加载定义在mysql中定义的规则,动态实现告警规则更新。
参考:
《深入浅出prometheus原理、应用、源码与拓展详解》