一、ssh操作及接受响应
common.py
代码语言:javascript复制# coding=utf-8
"""
@Project :pachong-master
@File :common.py
@Author :gaojs
@Date :2022/7/9 14:01
@Blogs : https://www.gaojs.com.cn
"""
import sys
import logging
import time
import paramiko
from time import sleep
import re
import requests
from faker import Factory
sys.setrecursionlimit(5000)
class CLI:
def ssh_ag(self, hostname='192.168.120.209', port=22, username='array', password='admin'):
"""
:param self:
:return:
"""
# 创建ssh对象
self.fin = open('log.txt', 'w')
self.ssh = paramiko.SSHClient()
self.logging = logging
# 允许连接不在know_hosts文件中的主机
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)
# 连接AG
self.ssh.connect(hostname=hostname, port=port, username=username, password=password)
sleep(5)
channel = self.ssh.invoke_shell()
self.channel = channel
channel.settimeout(5)
output = channel.recv(2048).decode('utf-8')
time.sleep(1)
self.fin.write(output)
self.cli_cmd('enable')
self.cli_cmd('')
self.cli_cmd('config ter')
self.cli_cmd('no page')
def print_step(self):
"""
:return:
"""
result = self.channel.recv(2048)
print(result.decode())
def cli_cmd(self, cli, prompt="[#$]", timeout=3):
"""
:param cli:
:return:
"""
output = ''
self.logging.info("AG execute command in enable mode: " cli)
self.channel.send(cli 'n')
output = self.read_until(prompt, timeout)
return output
def quit_enable(self):
"""
:return:
"""
self.cli_cmd('switch global')
self.cli_cmd('exit')
def clear_log(self):
"""
:return:
"""
sleep(2)
self.cli_cmd('clear log b')
def read_until(self, expected, timeout=10):
"""
等待回显:这个方法是从同事那里偷来的哈哈哈
:return:
"""
output = ''
regexp = re.compile(expected)
max_time = time.time() timeout
i = 0
while time.time() < max_time:
i = i 1
tmp = ""
try:
tmp = self.channel.recv(1024).decode('utf-8')
except:
pass
self.fin.write(tmp)
self.fin.flush()
output = tmp
if regexp.search(output):
return output
return output
def switch_vsite(self, vsite):
"""
切换虚拟站点
:param vsite:
:return:
"""
self.cli_cmd('sw %s' % vsite)
def get_sn(self):
"""
获取sn码
:return:
"""
# with open('log.txt', mode='r') as fin:
# resp = fin.read()
resp = self.cli_cmd('show version')
result = re.compile('Serial Number : [A-Z0-9]{31}')
sn = result.findall(resp)[0]
sn_number = sn.split(':')[1]
# print(sn_number)
return sn_number
def create_test_password(self, sn):
"""
生成test用户密码
"""
url = 'http://10.3.0.50/cgi-bin/passwd_res'
f = Factory.create()
ua = f.user_agent()
headers = {
'User-Agent': ua
}
data = {
'serial': sn
}
rsp = requests.post(url=url, headers=headers, data=data)
passwd = re.findall('password: (.*?)</pre>', rsp.text)[0]
print(passwd)
return passwd
if __name__ == '__main__':
"""
自动获取test密码
"""
ag_ip = input('请输入您AG管理IP:')
test = CLI()
test.ssh_ag(hostname=ag_ip)
sn_number = test.get_sn()
passwd = test.create_test_password(sn_number)
print(f'*********************** 您的test账户密码是 {passwd} ************************n')
2.云服务器操作
代码语言:javascript复制# coding=utf-8
"""
@Project :pachong-master
@File :test01.py
@Author :gaojs
@Date :2022/7/9 15:40
@Blogs : https://www.gaojs.com.cn
"""
import sys
import logging
import time
import paramiko
from time import sleep
import re
sys.setrecursionlimit(5000)
class CLI:
def ssh_ag(self, hostname='101.4x.39.xxx', port=22, username='root', password='xxxxxx'):
"""
:param self:
:return:
"""
# 创建ssh对象
self.fin = open('log.txt', 'w ')
self.ssh = paramiko.SSHClient()
self.logging = logging
# 允许连接不在know_hosts文件中的主机
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)
# 连接AG
self.ssh.connect(hostname=hostname, port=port, username=username, password=password)
sleep(5)
channel = self.ssh.invoke_shell()
self.channel = channel
channel.settimeout(5)
output = channel.recv(2048).decode('utf-8')
time.sleep(1)
self.fin.write(output)
def print_step(self):
"""
:return:
"""
result = self.channel.recv(2048)
print(result.decode())
def cli_cmd(self, cli, prompt="[#$]", timeout=3):
"""
:param cli:
:return:
"""
output = ''
self.logging.info("cloud server execute command in enable mode: " cli)
self.channel.send(cli 'n')
output = self.read_until(prompt, timeout)
return output
def read_until(self, expected, timeout=10):
"""
等待回显
:return:
"""
output = ''
regexp = re.compile(expected)
max_time = time.time() timeout
i = 0
while time.time() < max_time:
i = i 1
tmp = ""
try:
tmp = self.channel.recv(1024).decode('utf-8')
except:
pass
self.fin.write(tmp)
self.fin.flush()
output = tmp
if regexp.search(output):
return output
return output
if __name__ == '__main__':
"""
自动获取test密码
"""
test = CLI()
test.ssh_ag()
test.cli_cmd('ll /opt')
test.cli_cmd('cd /var/www/html/')
test.cli_cmd('date')
test.cli_cmd('ls')