ssh操作及接受响应,自动获取test密码

2022-08-24 11:48:38 浏览数 (1)

一、ssh操作及接受响应

common.py

代码语言:javascript复制
# coding=utf-8
"""
    @Project :pachong-master 
    @File    :common.py
    @Author  :gaojs
    @Date    :2022/7/9 14:01
    @Blogs   : https://www.gaojs.com.cn
"""
import sys

import logging
import time

import paramiko
from time import sleep
import re

import requests
from faker import Factory


sys.setrecursionlimit(5000)


class CLI:

    def ssh_ag(self, hostname='192.168.120.209', port=22, username='array', password='admin'):
        """

        :param self:
        :return:
        """
        # 创建ssh对象
        self.fin = open('log.txt', 'w')
        self.ssh = paramiko.SSHClient()
        self.logging = logging
        # 允许连接不在know_hosts文件中的主机
        self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)
        # 连接AG
        self.ssh.connect(hostname=hostname, port=port, username=username, password=password)
        sleep(5)
        channel = self.ssh.invoke_shell()
        self.channel = channel
        channel.settimeout(5)
        output = channel.recv(2048).decode('utf-8')
        time.sleep(1)
        self.fin.write(output)
        self.cli_cmd('enable')
        self.cli_cmd('')
        self.cli_cmd('config ter')
        self.cli_cmd('no page')

    def print_step(self):
        """

        :return:
        """
        result = self.channel.recv(2048)
        print(result.decode())

    def cli_cmd(self, cli, prompt="[#$]", timeout=3):
        """

        :param cli:
        :return:
        """
        output = ''
        self.logging.info("AG execute command in enable mode:  "   cli)
        self.channel.send(cli   'n')
        output = self.read_until(prompt, timeout)
        return output

    def quit_enable(self):
        """

        :return:
        """
        self.cli_cmd('switch global')
        self.cli_cmd('exit')

    def clear_log(self):
        """

        :return:
        """
        sleep(2)
        self.cli_cmd('clear log b')

    def read_until(self, expected, timeout=10):
        """
        等待回显:这个方法是从同事那里偷来的哈哈哈 
        :return:
        """
        output = ''
        regexp = re.compile(expected)
        max_time = time.time()   timeout
        i = 0
        while time.time() < max_time:
            i = i   1
            tmp = ""
            try:
                tmp = self.channel.recv(1024).decode('utf-8')
            except:
                pass
            self.fin.write(tmp)
            self.fin.flush()
            output  = tmp
            if regexp.search(output):
                return output
        return output

    def switch_vsite(self, vsite):
        """
        切换虚拟站点
        :param vsite:
        :return:
        """
        self.cli_cmd('sw %s' % vsite)

    def get_sn(self):
        """
        获取sn码
        :return:
        """
        # with open('log.txt', mode='r') as fin:
        #     resp = fin.read()
        resp = self.cli_cmd('show version')
        result = re.compile('Serial Number : [A-Z0-9]{31}')
        sn = result.findall(resp)[0]
        sn_number = sn.split(':')[1]
        # print(sn_number)
        return sn_number

    def create_test_password(self, sn):
        """
        生成test用户密码
        """
        url = 'http://10.3.0.50/cgi-bin/passwd_res'
        f = Factory.create()
        ua = f.user_agent()
        headers = {
            'User-Agent': ua
        }
        data = {
            'serial': sn
        }
        rsp = requests.post(url=url, headers=headers, data=data)
        passwd = re.findall('password: (.*?)</pre>', rsp.text)[0]
        print(passwd)
        return passwd


if __name__ == '__main__':
    """
    自动获取test密码
    """
    ag_ip = input('请输入您AG管理IP:')
    test = CLI()
    test.ssh_ag(hostname=ag_ip)
    sn_number = test.get_sn()
    passwd = test.create_test_password(sn_number)
    print(f'*********************** 您的test账户密码是 {passwd} ************************n')

2.云服务器操作

代码语言:javascript复制
# coding=utf-8
"""
    @Project :pachong-master 
    @File    :test01.py
    @Author  :gaojs
    @Date    :2022/7/9 15:40
    @Blogs   : https://www.gaojs.com.cn
"""

import sys

import logging
import time

import paramiko
from time import sleep
import re


sys.setrecursionlimit(5000)


class CLI:

    def ssh_ag(self, hostname='101.4x.39.xxx', port=22, username='root', password='xxxxxx'):
        """

        :param self:
        :return:
        """
        # 创建ssh对象
        self.fin = open('log.txt', 'w ')
        self.ssh = paramiko.SSHClient()
        self.logging = logging
        # 允许连接不在know_hosts文件中的主机
        self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)
        # 连接AG
        self.ssh.connect(hostname=hostname, port=port, username=username, password=password)
        sleep(5)
        channel = self.ssh.invoke_shell()
        self.channel = channel
        channel.settimeout(5)
        output = channel.recv(2048).decode('utf-8')
        time.sleep(1)
        self.fin.write(output)

    def print_step(self):
        """

        :return:
        """
        result = self.channel.recv(2048)
        print(result.decode())

    def cli_cmd(self, cli, prompt="[#$]", timeout=3):
        """

        :param cli:
        :return:
        """
        output = ''
        self.logging.info("cloud server execute command in enable mode:  "   cli)
        self.channel.send(cli   'n')
        output = self.read_until(prompt, timeout)
        return output

    def read_until(self, expected, timeout=10):
        """
        等待回显
        :return:
        """
        output = ''
        regexp = re.compile(expected)
        max_time = time.time()   timeout
        i = 0
        while time.time() < max_time:
            i = i   1
            tmp = ""
            try:
                tmp = self.channel.recv(1024).decode('utf-8')
            except:
                pass
            self.fin.write(tmp)
            self.fin.flush()
            output  = tmp
            if regexp.search(output):
                return output
        return output


if __name__ == '__main__':
    """
    自动获取test密码
    """
    test = CLI()
    test.ssh_ag()
    test.cli_cmd('ll /opt')
    test.cli_cmd('cd /var/www/html/')
    test.cli_cmd('date')
    test.cli_cmd('ls')

0 人点赞