实现序列命令解析: 该功能实现了对序列任务的解析,我们可以将需要执行的命令写成序列,当需要使用时可直接执行任务.
首先需要定义两个配置文件,配置文件config.json
用于存放主机组信息,apache.json
则用于存放需要执行的命令序列.
config.json
{
"MyWebServer":
[
["192.168.191.1","root","1233"],
["192.168.191.2","root","1233"],
["192.168.191.3","root","1233"]
]
}
apache.json
{
"Group": "MyWebServer",
"task_sequence":
[
["iptables -F"],
["yum -y install gcc make pcre-devel openssl-devel expat-devel bzip2"],
["PUT","./put_file/httpd-2.4.46.tar.gz","/tmp/apache.tar.gz"],
["tar -xzf /tmp/apache.tar.gz -C /tmp/"],
["mv /tmp/apr-util-1.6.1 /tmp/httpd-2.4.46/srclib/apr-util"]
]
}
主代码如下,在两个表中解析并循环执行任务.
代码语言:javascript复制import os,json,sys
# 获取特定目录下所有的剧本
def DisplayAllRule():
rootdir = os.getcwd() "\rule\"
all_files = [f for f in os.listdir(rootdir)]
print("{0:15} t {1:15} t {2:5}".format("文件名","主机组","命令数"))
for switch in all_files:
# 首先判断文件结尾是否为Json
if( switch.endswith(".json") == True):
all_switch_dir = rootdir switch
try:
# 判断文件内部是否符合JSON规范
with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:
# 判断是否存在指定字段来识别规范
load = json.loads(read_file.read())
if load.get("Group") != None and load.get("task_sequence") != None:
print("{0:15} t {1:15} t {2:5}".format(switch,load.get("Group"),len(load.get("task_sequence"))))
except ValueError:
pass
# 执行命令行
def RunRule(rule_name):
rule_dir = os.getcwd() "\rule\" rule_name
config_dir = os.getcwd() "\config.json"
# 先打开配置建并读取到数据
with open(config_dir , "r" ,encoding="utf-8") as read_config_ptr:
config_load = json.loads(read_config_ptr.read())
# 接着读取选中剧本
with open(rule_dir,"r",encoding="utf-8") as read_rule_ptr:
rule_load = json.loads(read_rule_ptr.read())
# 先找到组名称,并以组名称查询组内主机数
ref = config_load.get(rule_load.get("Group"))
if ref != None:
# 加载剧本中的任务命令
task_sequence = rule_load.get("task_sequence")
# 循环执行针对组内主机
for addr in ref:
print("-" * 130 , "n针对地址执行: {}n".format(addr[0]),"-" * 130)
# 每个主机需要执行的命令
for cmd in task_sequence:
if cmd[0] == "PUT" and len(cmd) >= 3:
print("上传文件 从 {0} -> 到 {1}".format(cmd[1],cmd[2]))
else:
print("执行命令: {0}".format(cmd[0]))
else:
print("[-] 主机组不存在,无法继续执行.")
exit(0)
if __name__ == "__main__":
arg = sys.argv
if arg[1] == "display":
DisplayAllRule()
elif arg[1] == "run":
RunRule(arg[2])
实现简易词法分析: 通过使用Eval函数,并配合正则语句我们可以实现简易的词法分析器.
代码语言:javascript复制import re
# 内置方法,获取大小
def GetFileSize(size):
return size 10
# 内置上传文件
def UploadFile(file):
#print("上传: {}".format(file))
return True
# IF 句式解析器
def if_judge(string):
# 寻找IF{} 找到后拼接并提取出左值与右值
ptr = re.compile(r'IF [{] (.*?) [}]',re.S)
subject = re.findall(ptr, string)[0]
tree_ = subject.split(" ")
#print("左: {} 运算符: {} 右: {}".format(tree_[0],tree_[1],tree_[2]))
# 如果是左值是GetFileSize则需要解析左边数值并判断
if re.findall(r'GetFileSize', tree_[0], re.M) != []:
left_value = eval(tree_[0])
#print("左侧函数执行: {}".format(left_value))
compile_ = eval( str(left_value) tree_[1] tree_[2] )
if compile_:
return True
else:
return False
elif re.findall(r'UploadFile', tree_[0], re.M) != []:
left_value = eval(tree_[0])
#print("左侧函数执行: {}".format(left_value))
compile_ = eval( str(left_value) tree_[1] tree_[2] )
if compile_:
return True
else:
return False
# 纯数字之间的比较走该分支
else:
compile_ = eval( tree_[0] tree_[1] tree_[2] )
if compile_:
return True
else:
return False
# 变量解析器
def variable_judge(string):
ptr = re.compile(r'AUTO [{{] (.*?) [}}]',re.S)
subject = re.findall(ptr, string)[0]
ref = eval(subject)
return ref
if __name__ == "__main__":
a = 10
b = 20
_str = "IF { 10 > 10 }"
ref = if_judge(_str)
print("数值对比: ", ref)
u_str = "IF { a <= b }"
ref = if_judge(u_str)
print("变量对比: ", ref)
func_str = "IF { GetFileSize(10) >= 20 }"
ref = if_judge(func_str)
print("函数执行后对比: ", ref)
upload_str = "IF { UploadFile('/etc/passwd') == False }"
ref = if_judge(upload_str)
print("函数状态对比: ", ref)
string_str = "IF { 'lyshark' == 'lyshark' }"
ref = if_judge(string_str)
print("字符串对比: ", ref)
ids = "AUTO { {'uname': 'lyshark'} }"
di = variable_judge(ids)
print("定义变量: " , di)
提取字符串左右侧文本: 分别通过两种方式提取字符串左右两侧字符串,并实现复杂的过滤需求.
代码语言:javascript复制import re
import MySSH
# 通过正则匹配左右侧数据
def splite_re():
string = "1.24 & MySQL"
left = re.findall("^(.*)&",string)[0].replace(" ","")
right = re.findall("&(.*)",string)[0].replace(" ","")
print("匹配左侧记录: {} --> 匹配右侧记录: {}".format(left,right))
# 通过函数方式过滤左右侧
def split_func():
string = "275 [0.000000] ACPI"
# 取出左右两侧结果集
splitted = ["]".join(line.split("]")[:1]) for line in string.split("n") if len(line) > 1]
print("中间结果: " , splitted)
left_string = ["[".join(line.split("[")[:1]) for line in splitted[0].split("n") if len(line) > 1]
right_string = ["[".join(line.split("[")[1:]) for line in splitted[0].split("n") if len(line) > 1]
print("左侧结果: {} 右侧结果: {}".format(left_string,right_string))
# 只取出右侧结果
string = "0.000000*ACPI"
splitted = ["*".join(line.split("*")[1:]) for line in string.split("n") if len(line) > 1]
print("右侧结果: {}".format(splitted))
# 提取特定进程CPU内存利用率
def Get_CPU_Memory_load():
dic = {"PID":0,"CPU":0,"MEM":0,"Name":0}
ssh = MySSH.MySSH("192.168.81.129","root","1233","22")
ssh.Init()
ref = ssh.BatchCMD("ps -aux | sort -k4nr | head -n 10 | awk '{print $2 ":" $3 ":" $4 ":" $11 "@"}'")
string = ref.replace("n","").split("@")
for each in string:
if each != "":
ptr = each.split(":")
dic['PID'] = ptr[0]
dic['CPU'] = ptr[1]
dic['MEM'] = ptr[2]
dic['Name'] = ptr[3]
print(dic)
# 提取IO最重的前20个进程
def Get_Dmesg_Load():
dic = { "left": 0 , "right":0, "center":0 }
ssh = MySSH.MySSH("192.168.81.129","root","1233","22")
ssh.Init()
ssh.BatchCMD("echo 1 >/proc/sys/vm/block_dump")
ref = ssh.BatchCMD("dmesg | awk -F: '{print $1}' | sort | uniq -c | sort -rn | head -n 20")
for each in ref.split("n"):
if each != "":
string = each.replace(" ","")
# 提取左右侧
splitted = ["]".join(line.split("]")[:1]) for line in string.split("n") if len(line) > 1]
left_string = ["[".join(line.split("[")[:1]) for line in splitted[0].split("n") if len(line) > 1]
center_string = string.split("]")[1]
right_string = ["[".join(line.split("[")[1:]) for line in splitted[0].split("n") if len(line) > 1]
# print("左侧结果: {} 中间: {} 右侧结果: {}".format(left_string[0], center_string, right_string[0]))
dic['left'] = left_string[0]
dic['right'] = right_string[0]
dic['center'] = center_string
print(dic)
if __name__ == "__main__":
splite_re()
split_func()