解释器模式实战:实现自定义的告警规则功能

2021-07-01 16:40:51 浏览数 (1)

大家好,我是征哥,今天分享一种设计模式,解释器模式。

先来看一个需求:

在告警系统中,有很多规则的配置,如果配置的规则被触发,监控系统就通过短信、微信、邮件等方式发送告警给开发者。比如,每分钟 API 总出错数超过 100 或者每分钟 API 总调用数超过 10000 就触发告警。配置的规则如下:

代码语言:javascript复制
api_error_per_minute > 9 || api_count_per_minute > 10000

在监控系统中,告警模块只判断是否触发告警。至于每分钟 API 接口出错数、每分钟接口调用数等统计数据的计算,是由其他模块来负责的。其他模块将统计数据放到一个 dict 中,数据的格式如下所示:

代码语言:javascript复制
apiStat = {}
apiStat["api_error_per_minute"] = 10
apiStat["api_count_per_minute"] = 987

接下来,编写程序,输入是一个字典,代表统计数据 apiStat,和一个字符串,代表告警规则 "api_error_per_minute > 9 || api_count_per_minute > 10000",输出:True 或 False,True 表述满足告警规则,False 表示不满足。

为了简化代码实现,我们假设自定义的告警规则只包含“||、&&、>、<、==”这五个运算符,其中,“>、<、==”运算符的优先级高于“||、&&”运算符,“&&”运算符优先级高于“||”。在表达式中,任意元素之间需要通过空格来分隔。

除此之外,用户可以自定义要监控的 key,比如前面的 api_error_per_minute、api_count_per_minute。

那么如何写代码实现呢?

更具体一点,请将以下 pass 语句替换成可以执行的代码,其中 rule 字符串是可以自由变化的:

代码语言:javascript复制
class AlertRuleInterpreter:
    def __init__(self, ruleExpression: str):
        pass

    def interpret(self, stats: dict) -> bool:
        pass


if __name__ == "__main__":
    rule = "key1 > 100 && key2 < 30 || key3 < 100 || key4 == 88"
    interpreter = AlertRuleInterpreter(rule)
    stats = {}
    stats["key1"] = 101
    stats["key3"] = 121
    stats["key4"] = 88
    alert = interpreter.interpret(stats)
    print(alert)

你可以先暂停,思考下怎么写,然后和我这里对比一下:

基础版本

思路:先把字符串按照 || 分割成子串,每一个字串内部的逻辑关系就是 &&,再将字串按照 && 分割成子子串,每一个子子串的内部逻辑关系就三种:>、<、==,这是不是很像一个树?

编码实现每一个字串、子子串对应处理逻辑 Expression 类,这里为统一格式,用到了抽象基类,每一种 Expression 类必须包含 interpret 方法。具体代码如下:

代码语言:javascript复制
from abc import ABCMeta
from numbers import Real
import re

class Expression(metaclass=ABCMeta):
    def interpret(self, stats: dict) -> bool:
        pass


class GreaterExpression(Expression):
    def __init__(self, express: str = None, key: str = None, value: Real = None):
        if express:
            elements = re.split(r"s ", express)
            if len(elements) == 3 and elements[1] == ">":
                self.key = elements[0]
                self.value = float(elements[2])
            else:
                raise Exception("Invalid GreaterExpression")
        elif key and value:
            self.key = key
            self.value = value
        else:
            raise Exception("GreaterExpression init error")

    def interpret(self, stats: dict) -> bool:
        if self.key in stats:
            return stats[self.key] > self.value
        return False


class LessExpression(Expression):
    def __init__(self, express: str = None, key: str = None, value: Real = None):
        if express:
            elements = re.split(r"s ", express)
            if len(elements) == 3 and elements[1] == "<":
                self.key = elements[0]
                self.value = float(elements[2])
            else:
                raise Exception("Invalid LessExpression")
        elif key and value:
            self.key = key
            self.value = value
        else:
            raise Exception("LessExpression init error")

    def interpret(self, stats: dict) -> bool:
        if self.key in stats:
            return stats[self.key] < self.value
        return False


class EqualExpression(Expression):
    def __init__(self, express: str = None, key: str = None, value: Real = None):
        if express:
            elements = re.split(r"s ", express)
            if len(elements) == 3 and elements[1] == "==":
                self.key = elements[0]
                self.value = float(elements[2])
            else:
                raise Exception("Invalid EqualExpression")
        elif key and value:
            self.key = key
            self.value = value
        else:
            raise Exception("EqualExpression init error")

    def interpret(self, stats: dict) -> bool:
        if self.key in stats:
            return stats[self.key] == self.value
        return False


class AndExpression(Expression):
    def __init__(self, express: str):
        self.express_list = []
        strExpressions = re.split(r"s &&s ", express)
        for express in strExpressions:
            if ">" in express:
                self.express_list.append(GreaterExpression(express))
            elif "<" in express:
                self.express_list.append(LessExpression(express))
            elif "==" in express:
                self.express_list.append(EqualExpression(express))
            elif "True" == express or "False" == express:
                self.express_list.append(BoolExpression(express))

    def interpret(self, stats: dict) -> bool:
        for expression in self.express_list:
            if expression.interpret(stats) == False:
                return False
        return True


class OrExpression(Expression):
    def __init__(self, express: str):
        self.express_list = []
        strExpressions = re.split(r"s ||s ", express)
        for express in strExpressions:
            self.express_list.append(AndExpression(express))

    def interpret(self, stats: dict) -> bool:
        for expression in self.express_list:
            if expression.interpret(stats) == True:
                return True
        return False


class AlertRuleInterpreter:
    def __init__(self, ruleExpression: str):
        self.expression = OrExpression(ruleExpression)

    def interpret(self, stats: dict) -> bool:
        return self.expression.interpret(stats)


if __name__ == "__main__":
    rule = "key1 > 100 && key2 < 30 || key3 < 100 || key4 == 88"

    interpreter = AlertRuleInterpreter(rule)
    stats = {}
    stats["key1"] = 120
    stats["key3"] = 121
    stats["key4"] = 88
    alert = interpreter.interpret(stats)
    print(alert)

这样的设计代码的模式,就叫做解释器模式,英文翻译是 Interpreter Design Pattern。在 GoF 的《设计模式》一书中,它是这样定义的:

Interpreter pattern is used to defines a grammatical representation for a language and provides an interpreter to deal with this grammar.

解释器模式为某个语言定义它的语法(或者叫文法)表示,并定义一个解释器用来处理这个语法。它属于行为型模式。这种模式被用在 SQL 解析、符号处理引擎等。这里的语言并不是我们说的中文和英文,而是任意一个信息的载体,比如本文中的告警规则。

加点难度

现在,我们给他增加点难度,比如说支持小括号,小括号内的表达式会作为一个整体,与 || 或 && 平级,小括号内还嵌套小括号。

与基础版本相比,只需要增加一个 ComplexAlertRuleInterpreter 类,利用栈来先计算括号内的值,要么是 True,要么是 False,然后写回表达式,已达到去除括号的目的,最后在用基础版本的套路来实现即可。

代码语言:javascript复制
class ComplexAlertRuleInterpreter:
    def __init__(self, ruleExpression: str):
        self.expression = ruleExpression

    def interpret(self, stats: dict) -> bool:

        stack = deque()
        for express in re.split(r"s ", self.expression):
            if express == "(":
                stack.append(express)
                continue
            elif express == ")":
                # 取出括号内容,并计算
                tmp_express = deque()
                while len(stack) > 0:
                    tmp = stack.pop()
                    if tmp == "(":
                        break
                    else:
                        tmp_express.appendleft(tmp)
                # 计算结果
                result = AlertRuleInterpreter(" ".join(tmp_express)).interpret(stats)
                stack.append(str(result))
                continue

            else:
                stack.append(express)

        return AlertRuleInterpreter(" ".join(stack)).interpret(stats)

括号去除后,表达式多了'True' 或者 'False' 这样的文本,因此就需要一个 BoolExpression 来处理 'True' 或者 'False' :

代码语言:javascript复制
class BoolExpression(Expression):
    def __init__(self, express: str):
        self.express = express

    def interpret(self, stats: dict) -> bool:
        if self.express == "True":
            return True
        return False

最后修改 AndExpression 的初始化部分,增加对 'True' 或者 'False' 表达式的处理:

代码语言:javascript复制
class AndExpression(Expression):
    def __init__(self, express: str):
        self.express_list = []
        strExpressions = re.split(r"s &&s ", express)
        for express in strExpressions:
            if ">" in express:
                self.express_list.append(GreaterExpression(express))
            elif "<" in express:
                self.express_list.append(LessExpression(express))
            elif "==" in express:
                self.express_list.append(EqualExpression(express))
            elif "True" == express or "False" == express:
                self.express_list.append(BoolExpression(express))

    def interpret(self, stats: dict) -> bool:
        for expression in self.express_list:
            if expression.interpret(stats) == False:
                return False
        return True

最后, rule 可以增加任意多的括号,main 函数如下:

代码语言:javascript复制
if __name__ == "__main__":
    # rule = "key1 > 100 && key2 < 30"
    rule = "key1 > 101 && ( ( key2 < 30 || ( key3 < 100 || key4 == 88 ) ) || False ) && True"

    interpreter = ComplexAlertRuleInterpreter(rule)
    stats = {}
    stats["key1"] = 120
    stats["key3"] = 121
    stats["key4"] = 88
    alert = interpreter.interpret(stats)
    print(alert)

完整代码

https://github.com/somenzz/geekbang/blob/master/design-pattern/72-interpreter-demo.py

解释器模式应用场景:

简单来说,程序需要对字符串进行解释的,就像编程语言对代码的解释一样,这种情况下,就需要用到解释器模式。比如说:

  • 需要解释的字符串可以表示为一个抽象的语法树
  • 一个重复出现的问题可以用一种简单的语言来表达
  • 现在比较流行的规则引擎系统

0 人点赞