Python实现Paramiko二次封装

2022-12-28 17:36:09 浏览数 (1)

Paramiko是一个用于执行SSH命令的Python第三方库,使用该库可实现自动化运维的所有任务,如下是一些常用代码的封装方式,多数代码为半成品,只是敲代码时的备份副本防止丢失,仅供参考,目前本人巡检百台设备完全无压力。

实现命令执行: 直接使用过程化封装,执行CMD命令.

代码语言:javascript复制
import paramiko

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

def BatchCMD(address,username,password,port,command):
    try:
        ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)
        stdin , stdout , stderr = ssh.exec_command(command)
        result = stdout.read()
        if len(result) != 0:
            result = str(result).replace("\n", "n")
            result = result.replace("b'", "").replace("'", "")
            return result
        else:
            return None
    except Exception:
        return None

实现磁盘巡检: 获取磁盘空间并返回字典格式

代码语言:javascript复制
def GetAllDiskSpace(address,username,password,port):
    ref_dict = {}
    cmd_dict = {"Linuxn" : "df | grep -v 'Filesystem' | awk '{print $5 ":" $6}'",
                "AIXn" : "df | grep -v 'Filesystem' | awk '{print $4 ":" $7}'"
                }
    # 首先检测系统版本
    os_version = BatchCMD(address,username,password,port,"uname")
    for version,run_cmd in cmd_dict.items():
        if(version == os_version):
            # 根据不同版本选择不同的命令
            os_ref = BatchCMD(address,username,password,port,run_cmd)
            ref_list= os_ref.split("n")
            # 循环将其转换为字典
            for each in ref_list:
                # 判断最后是否为空,过滤最后一项
                if each != "":
                    ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])
    return ref_dict

# 磁盘巡检总函数
def DiskMain():
    with open("db.json", "r", encoding="utf-8") as read_fp:
        load_json = read_fp.read()
        js = json.loads(load_json)
        base = js.get("base")
        count = len(base)

        for each in range(0,count):
            print("33[37m-33[0m" * 80)
            print("33[35m 检测地址: {0:10} t 用户名: {1:10} t 密码: {2:10} t 端口: {3:4}33[0m".
                format(base[each][1],base[each][2],base[each][3],base[each][4]))
            print("33[37m-33[0m" * 80)

            ref = GetAllDiskSpace(base[each][1],base[each][2],base[each][3],base[each][4])
            for k,v in ref.items():
                # 判断是否存在空盘
                if( v.split("%")[0] != "-"):
                    # 将占用百分比转换为整数
                    space_ret  = int(v.split("%")[0])
                    if space_ret >= 70:
                        print("33[31m 磁盘分区: {0:30} t 磁盘占用: {1:5} 33[0m".format(k,v))
                        continue
                    if space_ret >= 50:
                        print("33[33m 磁盘分区: {0:30} t 磁盘占用: {1:5} 33[0m".format(k, v))
                        continue
                    else:
                        print("33[34m 磁盘分区: {0:30} t 磁盘占用: {1:5} 33[0m".format(k, v))
                        continue
            print()

# 组内传递用户名密码时调用此方法
def GroupDiskMain(address,username,password,port):
    ref = GetAllDiskSpace(address,username,password,port)
    for k, v in ref.items():
        if (v.split("%")[0] != "-"):
            space_ret = int(v.split("%")[0])
            if space_ret >= 70:
                print("磁盘分区: {0:30} t 磁盘占用: {1:5} -> [警告]".format(k, v))
                continue
            if space_ret >= 50:
                print("磁盘分区: {0:30} t 磁盘占用: {1:5} -> [警惕]".format(k, v))
                continue
            else:
                print("磁盘分区: {0:30} t 磁盘占用: {1:5} -> [正常]".format(k, v))
                continue
    print()

获取系统内存利用率: 获取系统内存利用率

代码语言:javascript复制
def GetAllMemSpace(address,username,password,port):
    cmd_dict = {"Linuxn" : "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 ":" $2}'",
                "AIXn" : "df | grep -v 'Filesystem' | awk '{print $4 ":" $7}'"
                }
    # 首先检测系统版本
    os_version = BatchCMD(address,username,password,port,"uname")
    for version,run_cmd in cmd_dict.items():
        if(version == os_version):
            # 根据不同版本选择不同的命令
            os_ref = BatchCMD(address,username,password,port,run_cmd)

            # 首先现将KB转化为MB
            mem_total = math.ceil( int(os_ref.split(":")[0].replace("n","")) / 1024)
            mem_free = math.ceil(int(os_ref.split(":")[1].replace("n","")) / 1024)
            mem_used = str( int(mem_total) - int(mem_free))

            # 计算占用空间百分比
            percentage = 100 - int(mem_free / int(mem_total / 100))

            print("内存总计空间: {}".format(str(mem_total)   " MB"))
            print("内存剩余空间: {}".format(str(mem_free)   " MB"))
            print("内存已用空间: {}".format(str(mem_used)   " MB"))
            print("计算百分比: {}".format(str(percentage)   " %"))

获取系统进程信息: 获取系统进程信息,并返回字典格式

代码语言:javascript复制
def GetAllProcessSpace(address,username,password,port):
    ref_dict = {}
    cmd_dict = {"Linuxn" : "ps aux | grep -v 'USER' | awk '{print $2 ":" $11}' | uniq",
                "AIXn" : "ps aux | grep -v 'USER' | awk '{print $2 ":" $12}' | uniq"
                }
    os_version = BatchCMD(address,username,password,port,"uname")
    for version,run_cmd in cmd_dict.items():
        if(version == os_version):
            os_ref = BatchCMD(address, username, password, port, run_cmd)
            ref_list = os_ref.split("n")
            for each in ref_list:
                if each != "":
                    ref_dict[str(each.split(":")[0])] = str(each.split(":")[1])
    return ref_dict

# 巡检进程是否存在
def ProcessMain():
    with open("db.json", "r", encoding="utf-8") as read_fp:
        load_json = read_fp.read()
        js = json.loads(load_json)

        process = js.get("process")
        process_count = len(process)
        for x in range(0,process_count):
            # 根据process中的值查询base中的账号密码
            base = js.get("base")
            if( list(process[x].keys())[0] == base[x][0] ):
                # 拿到账号密码之后再提取出他们的进程ID于进程名
                print("33[37m-33[0m" * 80)
                print("33[35m 检测地址: {0:10} t 用户名: {1:10} t 密码: {2:10} t 端口: {3:4}33[0m".
                      format(base[x][1], base[x][2], base[x][3], base[x][4]))
                print("33[37m-33[0m" * 80)

                ref_dic = GetAllProcessSpace(base[x][1],base[x][2],base[x][3],base[x][4])
                # ref_val = 全部进程列表 proc_val = 需要检测的进程列表
                ref_val = list(ref_dic.values())
                proc_val = list(process[x].values())[0]
                # 循环比较是否在列表中
                for each in proc_val:
                    flag = each in ref_val
                    if(flag == True):
                        print("33[34m 进程: {0:50} 状态: √ 33[0m".format(each))
                    else:
                        print("33[31m 进程: {0:50} 状态: × 33[0m".format(each))

实现剧本运行功能: 针对特定一台主机运行剧本功能,随便写的一个版本,仅供参考

代码语言:javascript复制
def RunRule(address,username,password,port,playbook):
    os_version = BatchCMD(address,username,password,port,"uname")
    if(os_version == list(playbook.keys())[0]):
        play = list(playbook.values())[0]
        print()
        print("33[37m-33[0m" * 130)
        print("33[35m 系统类型: {0:4} t 地址: {1:10} t 用户名: {2:10} t 密码: {3:15} t 端口: {4:4}33[0m"
              .format(os_version.replace("n",""),address,username,password,port))
        print("33[37m-33[0m" * 130)

        for each in range(0,len(play)):
            RunCmd = play[each]   " > /dev/null 2>&1 && echo $?"
            print("33[30m [>] 派发命令: {0:100} t 状态: {1:5} 33[0m".format(
                RunCmd.replace(" > /dev/null 2>&1 && echo $?", ""),"正在派发"))

            os_ref = BatchCMD(address, username, password, port, RunCmd)
            if(os_ref == "0n"):
                print("33[34m [√] 运行命令: {0:100} t 状态: {1:5} 33[0m".format(
                    RunCmd.replace(" > /dev/null 2>&1 && echo $?",""),"派发完成"))
            else:
                print("33[31m [×] 运行命令: {0:100} t 状态: {1:5} 33[0m".format(
                    RunCmd.replace(" > /dev/null 2>&1 && echo $?",""),"派发失败"))
                # 既然失败了,就把剩下的也打出来吧,按照失败处理
                for x in range(each 1,len(play)):
                    print("33[31m [×] 运行命令: {0:100} t 状态: {1:5} 33[0m".format(
                        play[x].replace(" > /dev/null 2>&1 && echo $?", ""), "终止执行"))
                break
    else:
        return 0

# 批量: 传入主机组不同主机执行不同剧本
def RunPlayBook(HostList,PlayBook):
    count = len(HostList)
    error = []
    success = []
    for each in range(0,count):
        ref = RunRule(HostList[each][0],HostList[each][1],HostList[each][2],HostList[each][3],PlayBook)
        if ref == 0:
            error.append(HostList[each][0])
        else:
            success.append(HostList[each][0])
    print("nn")
    print("-" * 130)
    print("执行清单")
    print("-" * 130)
    for each in success:
        print("成功主机: {}".format(each))
    for each in error:
        print("失败主机: {}".format(each))

# 运行测试
def PlayBookRun():
    playbook = 
        {
            "Linuxn":
                [
                    "ifconfig",
                    "vmstat",
                    "ls",
                    "netstat -an",
                    "ifconfis",
                    "cat /etc/passwd | grep 'root' | awk '{print $2}'"
                ]
        }

    addr_list = 
        [
            ["192.168.1.127", "root", "1233", "22"],
            ["192.168.1.126", "root", "1203", "22"]
        ]

    # 指定addr_list这几台机器执行playbook剧本
    RunPlayBook(addr_list,playbook)

过程化实现文件上传下载: 文件传输功能 PUT上传 GET下载

代码语言:javascript复制
def BatchSFTP(address,username,password,port,soruce,target,flag):
    transport = paramiko.Transport((address, int(port)))
    transport.connect(username=username, password=password)
    sftp = paramiko.SFTPClient.from_transport(transport)
    if flag == "PUT":
        try:
            ret = sftp.put(soruce, target)
            if ret !="":
                transport.close()
                return 1
            else:
                transport.close()
                return 0
            transport.close()
        except Exception:
            transport.close()
            return 0
    elif flag == "GET":
        try:
            target = str(address   "_"   target)
            os.chdir("./recv_file")
            ret = sftp.get(soruce, target)
            if ret != "":
                transport.close()
                return 1
            else:
                transport.close()
                return 0
            transport.close()
        except Exception:
            transport.close()
            return 0

# 批量将本地文件 source 上传到目标 target 中
def PutRemoteFile(source,target):
    with open("db.json", "r", encoding="utf-8") as fp:
        load_json = fp.read()
        js = json.loads(load_json)
        base = js.get("base")

        print("-" * 130)
        print("接收主机 tt 登录用户 t 登录密码 t 登录端口 t 本地文件 tt 传输到 ttt 传输状态")
        print("-" * 130)

        for each in range(0,len(base)):
            # 先判断主机是否可通信
            ref = BatchCMD(base[each][1], base[each][2], base[each][3], base[each][4],"uname")
            if ref == None:
                print("33[31m{0:15} t {1:6} t {2:10} t {3:3} t {4:10} t {5:10} t 未连通33[0m".format(
                    base[each][1],base[each][2],base[each][3],base[each][4],source,target))
                continue

            ref = BatchSFTP(base[each][1],base[each][2],base[each][3],base[each][4],source,target,"PUT")
            if(ref == 1):
                print("33[34m{0:15} t {1:6} t {2:10} t {3:3} t {4:10} t {5:10} t 传输成功33[0m".format(
                    base[each][1],base[each][2],base[each][3],base[each][4],source,target))
            else:
                print("33[31m{0:15} t {1:6} t {2:10} t {3:3} t {4:10} t {5:10} t 传输失败33[0m".format(
                    base[each][1], base[each][2], base[each][3], base[each][4], source, target))

# 批量将目标文件拉取到本地特定目录(存在缺陷)
def GetRemoteFile(source,target):
    with open("db.json", "r", encoding="utf-8") as fp:
        load_json = fp.read()
        js = json.loads(load_json)
        base = js.get("base")

        print("-" * 130)
        print("发送主机 tt 登录用户 t 登录密码 t 登录端口 tt 远程文件 tt 拉取到 ttt 传输状态")
        print("-" * 130)

        for each in range(0,len(base)):
            ref = BatchCMD(base[each][1], base[each][2], base[each][3], base[each][4], "uname")
            if ref == None:
                print("33[31m{0:15} t {1:6} t {2:10} t {3:3} t {4:10} t {5:10} t 未连通33[0m".format(
                    base[each][1], base[each][2], base[each][3], base[each][4], source, target))
                continue

            ref = BatchSFTP(base[each][1],base[each][2],base[each][3],base[each][4],source,target,"GET")
            if(ref == 1):
                print("33[34m{0:15} t {1:6} t {2:10} t {3:3} t {4:10} t {5:10} t 传输成功33[0m".format(
                    base[each][1],base[each][2],base[each][3],base[each][4],source,target))
            else:
                print("33[31m{0:15} t {1:6} t {2:10} t {3:3} t {4:10} t {5:10} t 传输失败33[0m".format(
                    base[each][1], base[each][2], base[each][3], base[each][4], source, target))

简单实现批量执行SSH命令

代码语言:javascript复制
import os,paramiko

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

def BatchCMD(address,username,password,port,command):
    try:
        ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)
        stdin , stdout , stderr = ssh.exec_command(command)
        result = stdout.read()
        if len(result) != 0:
            print('33[0mIP: {} UserName:{} Port: {} Status: OK'.format(address,username,port))
            return 1
        else:
            print('33[45mIP: {} UserName:{} Port: {} Status: Error'.format(address,username,port))
            return 0
    except Exception:
        print('33[45mIP: {} UserName:{} Port: {} Status: Error'.format(address, username, port))
        return 0

if __name__ == "__main__":
    fp = open("ip.log","r ")
    for temp in fp.readlines():
        ip = temp.split("n")[0]
        BatchCMD(ip, "root", "1233", "22", "ls && echo $?")

简单实现批量SFTP远程传输

代码语言:javascript复制
import paramiko

def BatchSFTP(address,username,password,port,soruce,target,flag):
    transport = paramiko.Transport((address, int(port)))
    transport.connect(username=username, password=password)
    sftp = paramiko.SFTPClient.from_transport(transport)
    if flag == "PUT":
        try:
            ret = sftp.put(soruce, target)
            if ret !="":
                print("Addr:{}  UserName:{}  Source:{}  Target:{} Success".format(address,username,soruce,target))
                return 1
            else:
                print("Addr:{}  UserName:{}  Source:{}  Target:{} Error".format(address, username, soruce, target))
                return 0
            transport.close()
        except Exception:
            return 0
            transport.close()
    elif flag == "GET":
        try:
            target = str(target   "_"   address)
            ret = sftp.get(soruce, target)
            if ret != "":
                print("Addr:{}  UserName:{}  Source:{}  Target:{} Success".format(address, username, soruce, target))
                return 1
            else:
                print("Addr:{}  UserName:{}  Source:{}  Target:{} Error".format(address, username, soruce, target))
                return 0
            transport.close()
        except Exception:
            return 0

if __name__ == "__main__":
    # 将本地文件./main.py上传到/tmp/main.py
    BatchSFTP("192.168.1.20","root","1233","22","./main.py","/tmp/main.py","PUT")

    # 将目标主机下的/tmp/main.py拷贝到本地文件./get/test.py
    BatchSFTP("192.168.1.20","root","1233","22","/tmp/main.py","./get/test.py","GET")

通过SSH模块获取系统内存数据 这里我写了一个简单的获取内存数据的脚本,当然获取CPU磁盘等,同样可以这样来搞.

代码语言:javascript复制
import os,paramiko,re

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

def SSH_Get_Mem():
    dict ={}
    list = []
    head =["MemTotal","MemFree","Cached","SwapTotal","SwapFree"]

    ssh.connect(hostname="192.168.1.20", username="root", password="1233", port=22, timeout=2)
    stdin, stdout, stderr = ssh.exec_command('cat /proc/meminfo')
    string = str(stdout.read())

    for i in [0,1,4,14,15]:  # 取出列表中的这几行
        Total = string.split("\n")[i].split(":")[1].replace(" kB","").strip()
        list.append(Total)
    for (head,list) in zip(head,list):
        dict[head]=int(list);  # 组合成一个字典
    return dict

if __name__ == "__main__":
    for i in range(10):
        dic = SSH_Get_Mem()
        print(dic)

fabric的使用 fabric工具也是自动化运维利器,其默认依赖于paramiko的二次封装.

代码语言:javascript复制
# 简单实现命令执行
from fabric import Connection
conn = Connection(host="192.168.1.10",user="root",port="22",connect_kwargs={"password":"123"})
try:
    with conn.cd("/var/www/html/"):
        ret = conn.run("ls -lh",hide=True)
        print("主机:"   conn.host   "端口:"   conn.port   "完成")
except Exception:
    print("主机:"   conn.host   "端口:"   conn.port   "失败")

# 读取数据到本地
from fabric import Connection
conn = Connection(host="192.168.1.20",user="root",port="22",connect_kwargs={"password":"123"})

uname = conn.run('uname -s', hide=True)
if 'Linux' in uname.stdout:
    command = "df -h / | tail -n1 | awk '{print $5}'"
    print(conn.run(command,hide=True).stdout.strip())

# 文件上传与下载
from fabric import Connection
conn = Connection(host="192.168.1.20",user="root",port="22",connect_kwargs={"password":"123"})
conn.put("D://zabbix_get.exe","/tmp/zabbix.exe")  # 文件上传
conn.get("/tmp/zabbix.exe","./zab.exe")           # 下载文件

另一种命令执行方法:

代码语言:javascript复制
import paramiko

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def BatchCMD(address,username,password,port,command):
    try:
        ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)
        stdin , stdout , stderr = ssh.exec_command(command)
        result = stdout.read()
        if len(result) != 0:
            return result
        else:
            return -1
    except Exception:
        return -1

# 通过获取主机Ping状态
def GetPing():
    fp = open("unix_base.db", "r", encoding="utf-8")
    count = len(open("unix_base.db", "r", encoding="utf-8").readlines())
    print("-" * 100)
    print("{0:20} t {1:10} t {2:13} t {3:5} t {4:9} t {5:40}".format("IP地址","机器系统","设备SN","机房位置","存活状态","主机作用"))
    print("-" * 100)
    for each in range(count):
        ref = eval(fp.readline())
        ret = BatchCMD(ref[0],ref[5],ref[6],22,"pwd | echo $?")
        if(int(ret)==0):
            print("{0:20} t {1:10} t {2:11} t {3:5} t {4:9} t {5:40}".
                  format(ref[0],ref[1],ref[2],ref[3],"正常",ref[4]))
        else:
            print("{0:20} t {1:10} t {2:13} t {3:5} t {4:9} t {5:40}".
                  format(ref[0],ref[1],ref[2],ref[3],"异常",ref[4]))
    fp.close()

# ps aux | grep "usbCfgDev" | grep -v "grep" | awk {'print $2'}
def GetProcessStatus():
    fp = open("unix_process.db", "r", encoding="utf-8")
    count = len(open("unix_process.db", "r", encoding="utf-8").readlines())
    for each in range(count):
        proc = eval(fp.readline())
        proc_len = len(proc)
        print("-" * 70)
        print("---> 巡检地址: {0:10} t 登录用户: {1:7} t 登录密码: {2:10}".format(proc[0],proc[1],proc[2]))
        print("-" * 70)
        for process in range(3, proc_len):
            command = "ps aux | grep '{}' | grep -v 'grep' | awk '{}' | head -1".format(proc[process],"{print $2}")
            try:
                ref = BatchCMD(proc[0],proc[1],proc[2],22,command)
                if(int(ref)!=-1):
                    print("进程: {0:18}  t PID: {1:10} t 状态: {2}".format(proc[process], int(ref),"√"))
                else:
                    print("进程: {0:18} t  PID:{1:10} t 状态: {2}".format(proc[process], 0,"×"))
            except Exception:
                print("进程: {0:18} t  PID:{1:10} t 状态: {2}".format(proc[process], 0,"×"))
        print()
    fp.close()


def GetDiskStatus():
    fp = open("unix_disk.db", "r", encoding="utf-8")
    count = len(open("unix_disk.db", "r", encoding="utf-8").readlines())
    for each in range(count):
        proc = eval(fp.readline())
        proc_len = len(proc)
        print("-" * 100)
        print("---> 巡检地址: {0:10} t 登录系统: {1:7} t 登录账号: {2:10} 登录密码: {3:10}".
              format(proc[0],proc[1],proc[2],proc[3]))
        print("-" * 100)
        try:
            ref = BatchCMD(proc[0], proc[2], proc[3], 22, "df | grep -v 'Filesystem'")
            st = str(ref).replace("\n", "n")
            print(st.replace("b'", "").replace("'", ""))
        except Exception:
            pass
        print()
    fp.close()

# 运行命令
def RunCmd(command,system):
    fp = open("unix_disk.db", "r", encoding="utf-8")
    count = len(open("unix_disk.db", "r", encoding="utf-8").readlines())
    for each in range(count):
        proc = eval(fp.readline())
        proc_len = len(proc)

        if proc[1] == system:
            print("-" * 100)
            print("---> 巡检地址: {0:10} t 登录系统: {1:7} t 登录账号: {2:10} 登录密码: {3:10}".
                  format(proc[0],proc[1],proc[2],proc[3]))
            print("-" * 100)
            try:
                ref = BatchCMD(proc[0], proc[2], proc[3], 22, command)
                st = str(ref).replace("\n", "n")
                print(st.replace("b'", "").replace("'", ""))
            except Exception:
                pass
    fp.close()

面向对象的封装方法: 使用面向对象封装,可极大的提高复用性。

代码语言:javascript复制
import paramiko

class MySSH:
    def __init__(self,address,username,password,default_port = 22):
        self.address = address
        self.default_port = default_port
        self.username = username
        self.password = password

        self.obj = paramiko.SSHClient()
        self.obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.obj.connect(self.address,self.default_port,self.username,self.password)
        self.objsftp = self.obj.open_sftp()

    def BatchCMD(self,command):
        stdin , stdout , stderr = self.obj.exec_command(command)
        result = stdout.read()
        if len(result) != 0:
            result = str(result).replace("\n", "n")
            result = result.replace("b'", "").replace("'", "")
            return result
        else:
            return None

    def GetRemoteFile(self,remotepath,localpath):
        self.objsftp.get(remotepath,localpath)

    def PutLocalFile(self,localpath,remotepath):
        self.objsftp.put(localpath,remotepath)


    def GetFileSize(self,file_path):
        ref = self.BatchCMD("du -s "   file_path   " | awk '{print $1}'")
        return ref

    def CloseSSH(self):
        self.objsftp.close()
        self.obj.close()

if __name__ == '__main__':
    ssh = MySSH('192.168.191.3','root','1233',22)

    ref = ssh.BatchCMD("ifconfig")
    print(ref)

    sz = ssh.GetFileSize("/etc/passwd")
    print(sz)
    ssh.CloseSSH()

第二次封装完善。

代码语言:javascript复制
import paramiko,os,json,re

class MySSH:
    def __init__(self,address,username,password,default_port = 22):
        self.address = address
        self.default_port = default_port
        self.username = username
        self.password = password
        try:
            self.obj = paramiko.SSHClient()
            self.obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())

            self.obj.connect(self.address,self.default_port,self.username,self.password,timeout=3,allow_agent=False,look_for_keys=False)
            self.objsftp = self.obj.open_sftp()
        except Exception:
            pass

    def BatchCMD(self,command):
        try:
            stdin , stdout , stderr = self.obj.exec_command(command,timeout=3)
            result = stdout.read()
            if len(result) != 0:
                result = str(result).replace("\n", "n")
                result = result.replace("b'", "").replace("'", "")
                return result
            else:
                return None
        except Exception:
            return None

    def GetRemoteFile(self,remote_path,local_path):
        try:
            self.objsftp.get(remote_path,local_path)
            return True
        except Exception:
            return False

    def PutLocalFile(self,localpath,remotepath):
        try:
            self.objsftp.put(localpath,remotepath)
            return True
        except Exception:
            return False

    def CloseSSH(self):
        self.objsftp.close()
        self.obj.close()

    # 获取文件大小
    def GetFileSize(self,file_path):
        ref = self.BatchCMD("du -s "   file_path   " | awk '{print $1}'")
        return ref.replace("n","")
    # 判断文件是否存在
    def IsFile(self,file_path):
        return self.BatchCMD("[ -e {} ] && echo 'True' || echo 'False'".format(file_path))

通过Eval函数解析执行: 自定义语法规则与函数,通过Eval函数实现解析执行. 没写完,仅供参考。

代码语言:javascript复制
import json,os,sys,math
import argparse,time,re
import paramiko

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

def BatchCMD(address,username,password,port,command):
    try:
        ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)
        stdin , stdout , stderr = ssh.exec_command(command)
        result = stdout.read()
        if len(result) != 0:
            result = str(result).replace("\n", "n")
            result = result.replace("b'", "").replace("'", "")
            return result
        else:
            return None
    except Exception:
        return None


# ------------------------------------------------------------------------
# 内置解析方法

def GetDisk(x):
    return str(x)

def GetCPULoad():
    return str(10)

# 句式解析器,解析句子并执行
def judge(string):
    # 如果匹配到IF则执行判断条件解析
    if re.findall(r'IF{ (.*?) }', string, re.M) != []:
        # 则继续提取出他的表达式
        ptr = re.compile(r'IF[{] (.*?) [}]',re.S)
        subject = re.findall(ptr,string)[0]
        subject_list = subject.split(" ")
        # 拼接语句并执行
        sentence = eval(subject_list[0])   subject_list[1]   subject_list[2]
        # 组合后执行,返回结果
        if eval(sentence):
            return "IF"
        else:
            return False

    # 如果匹配到put则执行上传解析
    elif re.findall(r'PUT{ (.*?) }', string, re.M) != []:
        print("put")
    return False

# 获取特定目录下所有的剧本
def GetAllRule():
    rootdir = os.getcwd()   "\rule\"
    all_files = [f for f in os.listdir(rootdir)]
    print("-" * 90)
    print("{0:15} t {1:10} t {2:10} t {3:5} t {4:5}".format("剧本名称","应用平台","应用端口","执行主机数","命令条数"))
    print("-" * 90)
    for switch in all_files:
        # 首先判断文件结尾是否为Json
        if( switch.endswith(".json") == True):
            all_switch_dir = rootdir   switch
            try:
                # 判断文件内部是否符合JSON规范
                with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:
                    # 判断是否存在指定字段来识别规范
                    load = json.loads(read_file.read())
                    if load.get("framework") != None and load.get("task_sequence") != None:
                        run_addr_count = len(load.get("address_list"))
                        command_count = len(load.get("task_sequence"))
                        print("{0:15} t {1:10} {2:10} tt {3:5} tt {4:5}".
                              format(switch,load.get("framework"),load.get("default_port"),run_addr_count,command_count))
            except ValueError:
                pass

# 指定一个剧本并运行
def RunPlayBook(rule_name):
    rootdir = os.getcwd()   "\rule\"
    all_files = [f for f in os.listdir(rootdir)]
    for switch in all_files:
        if( switch.endswith(".json") == True):
            all_switch_dir = rootdir   switch
            # 寻找到需要加载的剧本地址
            if( switch == rule_name):
                with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:
                    data = json.loads(read_file.read())
                    address_list = data.get("address_list")
                    # 循环每个主机任务
                    for each in address_list:
                        # 得到剧本内容
                        task_sequence = data.get("task_sequence")
                        default_port = data.get("default_port")
                        print("-" * 90)
                        print("地址: {0:15} 用户名: {1:10} 密码: {2:10}".format(each[0],each[1],each[2]))
                        print("-" * 90)
                        for task in task_sequence:
                            flag = judge(task[0])
                            if flag == "IF":
                                ref = BatchCMD(each[0],each[1],each[2],default_port,task[1])
                                print(ref)
                            elif flag == False:
                                ref = BatchCMD(each[0],each[1],each[2],default_port,task[0])
                                print(ref)

if __name__ == "__main__":
    RunPlayBook("get_log.json")

定义剧本规范如下。

代码语言:javascript复制
{
  "framework": "Centos",
  "version": "7.0",
  "address_list":
  [
    ["192.168.191.3","root","1233"]
  ],
  "default_port": "22",
  "task_sequence":
  [
    ["ifconfig"],
    ["IF{ GetLastCmdFlag() == True }","uname"]
  ]
}

词法分析: 词法分析解析剧本内容。

代码语言:javascript复制
# 获取特定目录下所有的剧本
def GetAllRule():
    rootdir = os.getcwd()   "\rule\"
    all_files = [f for f in os.listdir(rootdir)]
    print("-" * 90)
    print("{0:15} t {1:10} t {2:10} t {3:5} t {4:5}".format("剧本名称","应用平台","应用端口","执行主机数","命令条数"))
    print("-" * 90)
    for switch in all_files:
        # 首先判断文件结尾是否为Json
        if( switch.endswith(".json") == True):
            all_switch_dir = rootdir   switch
            try:
                # 判断文件内部是否符合JSON规范
                with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:
                    # 判断是否存在指定字段来识别规范
                    load = json.loads(read_file.read())
                    if load.get("framework") != None and load.get("task_sequence") != None:
                        run_addr_count = len(load.get("address_list"))
                        command_count = len(load.get("task_sequence"))
                        print("{0:15} t {1:10} {2:10} tt {3:5} tt {4:5}".
                              format(switch,load.get("framework"),load.get("default_port"),run_addr_count,command_count))
            except ValueError:
                pass

# 句式解析器,解析句子并执行
def judge(string):
    # 如果匹配到IF则执行判断条件解析
    if re.findall(r'IF{ (.*?) }', string, re.M) != []:
        # 则继续提取出他的表达式
        ptr = re.compile(r'IF[{] (.*?) [}]',re.S)
        subject = re.findall(ptr,string)[0]
        subject_list = subject.split(" ")

        # 公开接口,执行命令
        ssh = MySSH("192.168.191.3","root","1233","22")

        # 组合命令并执行
        sentence = str(eval(subject_list[0])   subject_list[1]   subject_list[2])
        if eval(sentence):
            return "IF",ssh
        else:
            return False

    # 如果匹配到put则执行上传解析
    elif re.findall(r'PUT{ (.*?) }', string, re.M) != []:
        print("put")
    return False

# 指定一个剧本并运行
def RunPlayBook(rule_name):
    rootdir = os.getcwd()   "\rule\"
    all_files = [f for f in os.listdir(rootdir)]
    for switch in all_files:
        if( switch.endswith(".json") == True):
            all_switch_dir = rootdir   switch
            # 寻找到需要加载的剧本地址
            if( switch == rule_name):
                with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:
                    data = json.loads(read_file.read())
                    address_list = data.get("address_list")
                    # 循环每个主机任务
                    for each in address_list:
                        # 得到剧本内容
                        task_sequence = data.get("task_sequence")
                        default_port = data.get("default_port")
                        print("-" * 90)
                        print("地址: {0:15} 用户名: {1:10} 密码: {2:10}".format(each[0],each[1],each[2]))
                        print("-" * 90)
                        for task in task_sequence:

                            flag,obj = judge(task[0])

                            if flag == "IF":
                                ret = obj.BatchCMD(task[1])
                                print(ret)
if __name__ == '__main__':
    ret = judge("IF{ ssh.GetFileSize('/etc/passwd') >= 4 }")
    print(ret)

MySSH类最终封装: 通过面向对象对其进行封装,实现了查询CPU,负载,内存利用率,磁盘容量,等通用数据的获取。

代码语言:javascript复制
import paramiko, math,json

class MySSH:
    def __init__(self, address, username, password, default_port):
        self.address = address
        self.default_port = default_port
        self.username = username
        self.password = password
    # 初始化,远程模块
    def Init(self):
        try:
            self.ssh_obj = paramiko.SSHClient()
            self.ssh_obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self.ssh_obj.connect(self.address, self.default_port, self.username, self.password, timeout=3,
                                 allow_agent=False, look_for_keys=False)
            self.sftp_obj = self.ssh_obj.open_sftp()
        except Exception:
            return False
    # 执行非交互命令
    def BatchCMD(self, command):
        try:
            stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3)
            result = stdout.read()
            if len(result) != 0:
                result = str(result).replace("\n", "n")
                result = result.replace("b'", "").replace("'", "")
                return result
            else:
                return None
        except Exception:
            return None
    # 将远程文件下载到本地
    def GetRemoteFile(self, remote_path, local_path):
        try:
            self.sftp_obj.get(remote_path, local_path)
            return True
        except Exception:
            return False
    # 将本地文件上传到远程
    def PutLocalFile(self, localpath, remotepath):
        try:
            self.sftp_obj.put(localpath, remotepath)
            return True
        except Exception:
            return False
    # 关闭接口
    def CloseSSH(self):
        try:
            self.sftp_obj.close()
            self.ssh_obj.close()
        except Exception:
            pass
    # 获取文件大小
    def GetFileSize(self, file_path):
        ref = self.BatchCMD("du -s "   file_path   " | awk '{print $1}'")
        return ref.replace("n", "")
    # 判断文件是否存在
    def IsFile(self, file_path):
        return self.BatchCMD("[ -e {} ] && echo 'True' || echo 'False'".format(file_path))
    # 获取系统型号
    def GetSystemVersion(self):
        return self.BatchCMD("uname")
    # 检测目标主机存活状态
    def GetPing(self):
        try:
            if self.GetSystemVersion() != None:
                return True
            else:
                return False
        except Exception:
            return False
    # 获取文件列表,并得到大小
    def GetFileList(self, path):
        try:
            ref_list = []
            self.sftp_obj.chdir(path)
            file_list = self.sftp_obj.listdir("./")
            for sub_path in file_list:
                dict = {}
                file_size = self.GetFileSize(path   sub_path)
                dict[path   sub_path] = file_size
                ref_list.append(dict)
            return ref_list
        except Exception:
            return False
    # 将远程文件全部打包后拉取到本地
    def GetTarPackageAll(self, path):
        try:
            file_list = self.sftp_obj.listdir(path)
            self.sftp_obj.chdir(path)
            for packageName in file_list:
                self.ssh_obj.exec_command("tar -czf /tmp/{0}.tar.gz {0}".format(packageName))
                self.sftp_obj.get("/tmp/{}.tar.gz".format(packageName), "./file/{}.tar.gz".format(packageName))
                self.sftp_obj.remove("/tmp/{}.tar.gz".format(packageName))
                return True
        except Exception:
            return True
    # 获取磁盘空间并返回字典
    def GetAllDiskSpace(self):
        ref_dict = {}
        cmd_dict = {"Linuxn": "df | grep -v 'Filesystem' | awk '{print $5 ":" $6}'",
                    "AIXn": "df | grep -v 'Filesystem' | awk '{print $4 ":" $7}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    # 根据不同版本选择不同的命令
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("n")
                    # 循环将其转换为字典
                    for each in ref_list:
                        # 判断最后是否为空,过滤最后一项
                        if each != "":
                            ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])
            return ref_dict
        except Exception:
            return False
    # 获取系统内存利用率
    def GetAllMemSpace(self):
        cmd_dict = {"Linuxn": "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 ":" $2}'",
                    "AIXn": "svmon -G | grep -v 'virtual' | head -n 1 | awk '{print $2 ":" $4}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    # 根据不同版本选择不同的命令
                    os_ref = self.BatchCMD(run_cmd)
                    # 首先现将KB转化为MB
                    mem_total = math.ceil(int(os_ref.split(":")[0].replace("n", "")) / 1024)
                    mem_free = math.ceil(int(os_ref.split(":")[1].replace("n", "")) / 1024)

                    # 计算占用空间百分比
                    percentage = 100 - int(mem_free / int(mem_total / 100))
                    # 拼接字典数据
                    return {"Total": str(mem_total), "Free": str(mem_free), "Percentage": str(percentage)}
        except Exception:
            return False
    # 获取系统进程信息,并返回字典格式
    def GetAllProcessSpace(self):
        ref_dict = {}
        cmd_dict = {"Linuxn": "ps aux | grep -v 'USER' | awk '{print $2 ":" $11}' | uniq",
                    "AIXn": "ps aux | grep -v 'USER' | awk '{print $2 ":" $12}' | uniq"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("n")
                    for each in ref_list:
                        if each != "":
                            ref_dict[str(each.split(":")[0])] = str(each.split(":")[1])
            return ref_dict
        except Exception:
            return False
    # 获取CPU利用率
    def GetCPUPercentage(self):
        ref_dict = {}
        cmd_dict = {"Linuxn": "vmstat | tail -n 1 | awk '{print $13 ":" $14 ":" $15}'",
                    "AIXn": "vmstat | tail -n 1 | awk '{print $14 ":" $15 ":" $16}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("n")
                    for each in ref_list:
                        if each != "":
                            each = each.split(":")
                            ref_dict = {"us": each[0],"sys":each[1],"idea":each[2]}
            return ref_dict
        except Exception:
            return False
    # 获取机器的负载情况
    def GetLoadAVG(self):
        ref_dict = {}
        cmd_dict = {"Linuxn": "uptime | awk '{print $10 ":" $11 ":" $12}'",
                    "AIXn": "uptime | awk '{print $10 ":" $11 ":" $12}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("n")
                    for each in ref_list:
                        if each != "":
                            each = each.replace(",","").split(":")
                            ref_dict = {"1avg": each[0],"5avg": each[1],"15avg": each[2]}
                            return ref_dict
            return False
        except Exception:
            return False

    # 获取当前系统端口状态数据,只支持Linux
    def GetAllPort(self):
        ref_port_list = []
        cmd_dict = {"Linuxn": "netstat -antp | grep -vE '^[^tcp]' | grep -v '::' | awk '{print $4 ":" $6}'",
                    "AIXn": "uptime"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("n")

                    for each in ref_list:
                        if each != "":
                            dic = { "Address": each.split(":")[0],"Port": each.split(":")[1],"Status": each.split(":")[2] }
                            ref_port_list.append(dic)
                return ref_port_list
            return False
        except Exception:
            return False

    # 修改当前用户密码
    def SetPasswd(self,username,password):
        try:
            os_id = self.BatchCMD("id | awk {'print $1'}")
            print(os_id)
            if(os_id == "uid=0(root)n"):
                self.BatchCMD("echo '{}' | passwd --stdin '{}' > /dev/null".format(password,username))
                return True
        except Exception:
            return False

# 定义超类,集成基类MySSH
class SuperSSH(MySSH):
    def __init__(self,address, username, password, default_port):
        super(SuperSSH, self).__init__(address, username, password, default_port)

我们继续为上面的代码加上命令行,让其可以直接使用,这里需要遵循一定的格式规范,我们使用JSON解析数据,JSON格式如下.

代码语言:javascript复制
{
  "aix":
  [
    ["192.168.1.1","root","123123"],
    ["192.168.1.1","root","2019"],
  ],
  "suse":
  [
    ["192.168.1.1","root","123123"],
  ],
  "centos":
    [
    ["192.168.1.1","root","123123"],
    ]
}

接着是主程序代码,如下所示.

代码语言:javascript复制
# -*- coding: utf-8 -*-
from MySSH import MySSH
import json,os,sys,argparse

class InitJson():
    def __init__(self,db):
        self.db_name = db
    def GetPlatform(self,plat):
        with open(self.db_name, "r", encoding="utf-8") as Read_Pointer:
            load_json = json.loads(Read_Pointer.read())
            for k,v in load_json.items():
                try:
                    if k == plat:
                        return v
                except Exception:
                    return None
        return None

if __name__ == "__main__":
    ptr = InitJson("database.json")
    parser = argparse.ArgumentParser()

    parser.add_argument("-G","--group",dest="group",help="指定主机组")
    parser.add_argument("-C","--cmd",dest="cmd",help="指定CMD命令")
    parser.add_argument("--get",dest="get",help="指定获取数据类型<ping>")
    parser.add_argument("--dst", dest="dst_file",help="目标位置")
    parser.add_argument("--src",dest="src_file",help="原文件路径")
    args = parser.parse_args()

    # 批量CMD --group=aix --cmd=ls
    if args.group and args.cmd:
        platform = ptr.GetPlatform(args.group)
        success,error = [],[]
        for each in platform:
            ssh = MySSH(each[0], each[1], each[2], 22)
            if ssh.Init() != False:
                print("-" * 140)
                print("主机: {0:15} t 账号: {1:10} t 密码: {2:10} t 命令: {3:30}".
                      format(each[0], each[1], each[2], args.cmd))
                print("-" * 140)
                print(ssh.BatchCMD(args.cmd))
                ssh.CloseSSH()
                success.append(each[0])
            else:
                error.append(each[0])
                ssh.CloseSSH()
        print("nn","-" * 140,
              "n 执行报告 n",
              "-" * 140,
              "n失败主机: {}n".format(error),
              "-" * 140)

    # 批量获取主机其他数据 --group=centos --get=ping
    if args.group and args.get:
            platform = ptr.GetPlatform(args.group)
            success, error = [], []
            for each in platform:
                ssh = MySSH(each[0], each[1], each[2], 22)
                # 判断是否为ping
                if ssh.Init() != False:
                    if args.get == "ping":
                        ret = ssh.GetPing()
                        if ret == True:
                            print("[*] 主机: {} 存活中.".format(each[0]))

                    # 收集磁盘数据
                    elif args.get == "disk":
                        print("-" * 140)
                        print("主机: {0:15} t 账号: {1:10} t 密码: {2:10}".
                              format(each[0], each[1], each[2]))
                        print("-" * 140)

                        ret = ssh.GetAllDiskSpace()
                        for k, v in ret.items():
                            if (v.split("%")[0] != "-"):
                                space_ret = int(v.split("%")[0])
                                if space_ret >= 70:
                                    print("磁盘分区: {0:30} t 磁盘占用: {1:5} -> [警告]".format(k, v))
                                    continue
                                if space_ret >= 50:
                                    print("磁盘分区: {0:30} t 磁盘占用: {1:5} -> [警惕]".format(k, v))
                                    continue
                                else:
                                    print("磁盘分区: {0:30} t 磁盘占用: {1:5}".format(k, v))
                                    continue
                        print()
                else:
                    error.append(each[0])
                    ssh.CloseSSH()
            print("nn", "-" * 140,
                  "n 执行报告 n",
                  "-" * 140,
                  "n失败主机: {}n".format(error),
                  "-" * 140)

    # 实现文件上传过程 --group=centos --src=./a.txt --dst=/tmp/test.txt
    if args.group and args.src_file and args.dst_file:
        platform = ptr.GetPlatform(args.group)
        success, error = [], []
        for each in platform:
            ssh = MySSH(each[0], each[1], each[2], 22)
            if ssh.Init() != False:
                ret = ssh.PutLocalFile(args.src_file,args.dst_file)
                if ret == True:
                    print("主机: {} t 本地文件: {} t ---> 传到: {}".format(each[0], args.src_file,args.dst_file))
            else:
                error.append(each[0])
                ssh.CloseSSH()
        print("nn", "-" * 140,
              "n 执行报告 n",
              "-" * 140,
              "n失败主机: {}n".format(error),
              "-" * 140)

0 人点赞