代码语言:javascript复制根据wireshark分析可知,通过分析qq语音通话,发现采用的是dup协议(应该是叫这个) 检测数据包的标志
020048
即可 需要pypacp这个库,但是python3 的环境下,推荐pacp-ct 具体缺啥库就安装啥....
import psutil,netaddr,pcap,dpkt
import time
import platform
import binascii
if 'Windows' in platform.platform():
import winreg as wr
FINGERPRINT = {
'qq.exe':('udp','020048'),
'tim.exe':('udp','020048')
}
IF_REG = r'SYSTEMCurrentControlSetControlNetwork{4d36e972-e325-11ce-bfc1-08002be10318}'
def get_interface_name(name):
reg = wr.ConnectRegistry(None, wr.HKEY_LOCAL_MACHINE)
reg_key = wr.OpenKey(reg, IF_REG)
for i in range(wr.QueryInfoKey(reg_key)[0]):
subkey_name = wr.EnumKey(reg_key, i)
try:
reg_subkey = wr.OpenKey(reg_key, subkey_name r'Connection')
Name = wr.QueryValueEx(reg_subkey, 'Name')[0]
wr.CloseKey(reg_subkey)
if Name == name:
return r'DeviceNPF_' subkey_name
except FileNotFoundError as e:
pass
return None
if 'Windows' in platform.platform():
iface = get_interface_name('Router') # WLAN
else:
iface = 'enp2s0'
def get_pid_sessions(is_show=True):
sessions = {}
for session in psutil.net_connections(kind="all"):
if session.laddr and session.raddr:
pid = session.pid #进程pid
pid_name = psutil.Process(int(pid)).name().lower() #进程名
pid_create_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(psutil.Process(int(pid)).create_time())) #进程创建时间
src_ip = session.laddr.ip #源IP
sport = session.laddr.port #源端口
des_ip = session.raddr.ip #目的IP
dport = session.raddr.port #目的端口
status = session.status #会话状态
filter_des_ip = netaddr.IPAddress(des_ip) #格式化目的IP地址,用于后续判断是否是公网IP
filter_src_ip = netaddr.IPAddress(src_ip)
if not filter_des_ip.is_private() and filter_des_ip.is_unicast() and not filter_des_ip.is_link_local() and not filter_src_ip.is_loopback(): #判断目的IP不是私网IP,是一个单播地址,不是linklocal地址,如果符合条件就记录
if pid_name not in sessions:
sessions[pid_name] = {}
if pid not in sessions[pid_name]:
sessions[pid_name][pid] = {}
if f'{src_ip}:{sport}' not in sessions[pid_name][pid]:
sessions[pid_name][pid][f'{src_ip}:{sport}'] = {}
if f'{des_ip}:{dport}' not in sessions[pid_name][pid][f'{src_ip}:{sport}']:
sessions[pid_name][pid][f'{src_ip}:{sport}'][f'{des_ip}:{dport}'] = {
'create_time':'',
'status':''
}
sessions[pid_name][pid][f'{src_ip}:{sport}'][f'{des_ip}:{dport}']['status'] = status
sessions[pid_name][pid][f'{src_ip}:{sport}'][f'{des_ip}:{dport}']['create_time'] = pid_create_time
if is_show:
print(f'[{pid_create_time}] [{pid}] [{pid_name}] [{src_ip}:{sport}] ---> [{des_ip}:{dport}] ---- [{status}]')
return sessions
def sniffer(pname):
transport = FINGERPRINT[pname][0]
fingerprint = FINGERPRINT[pname][1]
pc = pcap.pcap(iface, promisc=True, immediate=True)
pc.setfilter(transport)
if transport.lower() == 'tcp':
check_transport = dpkt.tcp.TCP
else:
check_transport = dpkt.udp.UDP
for timestamp, raw_buf in pc:
eth = dpkt.ethernet.Ethernet(raw_buf)
if not isinstance(eth.data, dpkt.ip.IP):
continue
df = bool(eth.data.off & dpkt.ip.IP_DF)
mf = bool(eth.data.off & dpkt.ip.IP_MF)
offset = eth.data.off & dpkt.ip.IP_OFFMASK
traffic = eth.data.data
if not isinstance(traffic,check_transport):
continue
if not len(traffic.data):
continue
packet = {
'time':time.strftime('%Y-%m-%d %H:%M:%S',(time.localtime(timestamp))),
'src':'%d.%d.%d.%d'%tuple(eth.data.src),
'dst':'%d.%d.%d.%d'%tuple(eth.data.dst),
'protocol':eth.data.p,
'len':eth.data.len,
'ttl':eth.data.ttl,
'df':df,
'mf':mf,
'offset':offset,
'checksum':eth.data.sum,
'traffic':binascii.b2a_hex(traffic.data).decode()
}
if packet['traffic'].startswith(fingerprint):
print()
for key in packet:
length = len(key)
ins_space_left = ''
ins_space_right = ''
if length < 8:
ins_space_left = ' ' * int((8-length)/2)
if length % 2:
ins_space_right = ' '* int((8-length)/2 1)
else:
ins_space_right = ' '* int((8-length)/2)
print(f'[ {pname} ] [ {ins_space_left}{key}{ins_space_right} ] ----> {packet[key]}')
else:
print('r[%s] No capture!' %time.strftime('%Y-%m-%d %H:%M:%S',(time.localtime(time.time()))),end='')
if __name__ == "__main__":
sessions = get_pid_sessions(False)
pid_names = FINGERPRINT.keys()
for pname in pid_names:
if pname in sessions:
sniffer(pname=pname)
break