Python 实现序列与词法解析

2022-12-28 13:18:52 浏览数 (1)

实现序列命令解析: 该功能实现了对序列任务的解析,我们可以将需要执行的命令写成序列,当需要使用时可直接执行任务.

首先需要定义两个配置文件,配置文件config.json用于存放主机组信息,apache.json则用于存放需要执行的命令序列.

代码语言:javascript复制
config.json
{
  "MyWebServer":
    [
      ["192.168.191.1","root","1233"],
      ["192.168.191.2","root","1233"],
      ["192.168.191.3","root","1233"]
    ]
}
apache.json
{
  "Group": "MyWebServer",
  "task_sequence":
  [
    ["iptables -F"],
    ["yum -y install gcc make pcre-devel openssl-devel expat-devel bzip2"],
    ["PUT","./put_file/httpd-2.4.46.tar.gz","/tmp/apache.tar.gz"],
    ["tar -xzf /tmp/apache.tar.gz -C /tmp/"],
    ["mv /tmp/apr-util-1.6.1 /tmp/httpd-2.4.46/srclib/apr-util"]
  ]
}

主代码如下,在两个表中解析并循环执行任务.

代码语言:javascript复制
import os,json,sys

# 获取特定目录下所有的剧本
def DisplayAllRule():
    rootdir = os.getcwd()   "\rule\"
    all_files = [f for f in os.listdir(rootdir)]
    print("{0:15} t {1:15} t {2:5}".format("文件名","主机组","命令数"))

    for switch in all_files:
        # 首先判断文件结尾是否为Json
        if( switch.endswith(".json") == True):
            all_switch_dir = rootdir   switch
            try:
                # 判断文件内部是否符合JSON规范
                with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:
                    # 判断是否存在指定字段来识别规范
                    load = json.loads(read_file.read())
                    if load.get("Group") != None and load.get("task_sequence") != None:
                        print("{0:15} t {1:15} t {2:5}".format(switch,load.get("Group"),len(load.get("task_sequence"))))
            except ValueError:
                pass

# 执行命令行
def RunRule(rule_name):
    rule_dir = os.getcwd()   "\rule\"   rule_name
    config_dir = os.getcwd()   "\config.json"

    # 先打开配置建并读取到数据
    with open(config_dir , "r" ,encoding="utf-8") as read_config_ptr:
        config_load = json.loads(read_config_ptr.read())
        # 接着读取选中剧本
        with open(rule_dir,"r",encoding="utf-8") as read_rule_ptr:
            rule_load = json.loads(read_rule_ptr.read())
            # 先找到组名称,并以组名称查询组内主机数
            ref = config_load.get(rule_load.get("Group"))
            if ref != None:
                # 加载剧本中的任务命令
                task_sequence = rule_load.get("task_sequence")

                # 循环执行针对组内主机
                for addr in ref:
                    print("-" * 130 , "n针对地址执行: {}n".format(addr[0]),"-" * 130)
                    # 每个主机需要执行的命令
                    for cmd in task_sequence:
                        if cmd[0] == "PUT" and len(cmd) >= 3:
                            print("上传文件 从 {0} -> 到 {1}".format(cmd[1],cmd[2]))
                        else:
                            print("执行命令: {0}".format(cmd[0]))
            else:
                print("[-] 主机组不存在,无法继续执行.")
                exit(0)

if __name__ == "__main__":
    arg = sys.argv
    if arg[1] == "display":
        DisplayAllRule()
    elif arg[1] == "run":
        RunRule(arg[2])

实现简易词法分析: 通过使用Eval函数,并配合正则语句我们可以实现简易的词法分析器.

代码语言:javascript复制
import re

# 内置方法,获取大小
def GetFileSize(size):
    return size   10

# 内置上传文件
def UploadFile(file):
    #print("上传: {}".format(file))
    return True

# IF 句式解析器
def if_judge(string):
    # 寻找IF{} 找到后拼接并提取出左值与右值
    ptr = re.compile(r'IF [{] (.*?) [}]',re.S)
    subject = re.findall(ptr, string)[0]
    tree_ = subject.split(" ")
    #print("左: {} 运算符: {} 右: {}".format(tree_[0],tree_[1],tree_[2]))

    # 如果是左值是GetFileSize则需要解析左边数值并判断
    if re.findall(r'GetFileSize', tree_[0], re.M) != []:
        left_value = eval(tree_[0])
        #print("左侧函数执行: {}".format(left_value))
        compile_ = eval( str(left_value)    tree_[1]   tree_[2] )
        if compile_:
            return True
        else:
            return False

    elif re.findall(r'UploadFile', tree_[0], re.M) != []:
        left_value = eval(tree_[0])
        #print("左侧函数执行: {}".format(left_value))
        compile_ = eval( str(left_value)    tree_[1]   tree_[2] )
        if compile_:
            return True
        else:
            return False

    # 纯数字之间的比较走该分支
    else:
        compile_ = eval( tree_[0]   tree_[1]   tree_[2] )
        if compile_:
            return True
        else:
            return False

# 变量解析器
def variable_judge(string):
    ptr = re.compile(r'AUTO [{{] (.*?) [}}]',re.S)
    subject = re.findall(ptr, string)[0]
    ref = eval(subject)
    return ref

if __name__ == "__main__":
    a = 10
    b = 20

    _str = "IF { 10 > 10 }"
    ref = if_judge(_str)
    print("数值对比: ", ref)

    u_str = "IF { a <= b }"
    ref = if_judge(u_str)
    print("变量对比: ", ref)

    func_str = "IF { GetFileSize(10) >= 20 }"
    ref = if_judge(func_str)
    print("函数执行后对比: ", ref)

    upload_str = "IF { UploadFile('/etc/passwd') == False }"
    ref = if_judge(upload_str)
    print("函数状态对比: ", ref)

    string_str = "IF { 'lyshark' == 'lyshark' }"
    ref = if_judge(string_str)
    print("字符串对比: ", ref)

    ids = "AUTO { {'uname': 'lyshark'} }"
    di = variable_judge(ids)
    print("定义变量: " , di)

提取字符串左右侧文本: 分别通过两种方式提取字符串左右两侧字符串,并实现复杂的过滤需求.

代码语言:javascript复制
import re
import MySSH

# 通过正则匹配左右侧数据
def splite_re():
    string = "1.24 & MySQL"

    left = re.findall("^(.*)&",string)[0].replace(" ","")
    right = re.findall("&(.*)",string)[0].replace(" ","")
    print("匹配左侧记录: {} --> 匹配右侧记录: {}".format(left,right))

# 通过函数方式过滤左右侧
def split_func():
    string = "275 [0.000000] ACPI"

    # 取出左右两侧结果集
    splitted = ["]".join(line.split("]")[:1]) for line in string.split("n") if len(line) > 1]
    print("中间结果: " , splitted)
    left_string = ["[".join(line.split("[")[:1]) for line in splitted[0].split("n") if len(line) > 1]
    right_string = ["[".join(line.split("[")[1:]) for line in splitted[0].split("n") if len(line) > 1]
    print("左侧结果: {} 右侧结果: {}".format(left_string,right_string))

    # 只取出右侧结果
    string = "0.000000*ACPI"
    splitted = ["*".join(line.split("*")[1:]) for line in string.split("n") if len(line) > 1]
    print("右侧结果: {}".format(splitted))

# 提取特定进程CPU内存利用率
def Get_CPU_Memory_load():
    dic = {"PID":0,"CPU":0,"MEM":0,"Name":0}
    ssh = MySSH.MySSH("192.168.81.129","root","1233","22")
    ssh.Init()
    ref = ssh.BatchCMD("ps -aux | sort -k4nr | head -n 10 | awk '{print $2 ":" $3 ":" $4 ":" $11 "@"}'")
    string = ref.replace("n","").split("@")
    for each in string:
        if each != "":
            ptr = each.split(":")
            dic['PID'] = ptr[0]
            dic['CPU'] = ptr[1]
            dic['MEM'] = ptr[2]
            dic['Name'] = ptr[3]
            print(dic)

# 提取IO最重的前20个进程
def Get_Dmesg_Load():
    dic = { "left": 0 , "right":0, "center":0 }
    ssh = MySSH.MySSH("192.168.81.129","root","1233","22")
    ssh.Init()
    ssh.BatchCMD("echo 1 >/proc/sys/vm/block_dump")
    ref = ssh.BatchCMD("dmesg | awk -F: '{print $1}' | sort | uniq -c | sort -rn | head -n 20")
    for each in ref.split("n"):
        if each != "":
            string = each.replace(" ","")
            # 提取左右侧
            splitted = ["]".join(line.split("]")[:1]) for line in string.split("n") if len(line) > 1]
            left_string = ["[".join(line.split("[")[:1]) for line in splitted[0].split("n") if len(line) > 1]
            center_string = string.split("]")[1]
            right_string = ["[".join(line.split("[")[1:]) for line in splitted[0].split("n") if len(line) > 1]
            # print("左侧结果: {} 中间: {} 右侧结果: {}".format(left_string[0], center_string, right_string[0]))
            dic['left'] = left_string[0]
            dic['right'] = right_string[0]
            dic['center'] = center_string
            print(dic)

if __name__ == "__main__":
    splite_re()
    split_func()

0 人点赞