通常, TCP接收为210字节(1024 bytes, 213bits), 包括了4 字节的消息头和 1020字节的消息. 那如果超出这个范围呢? 分多次发, python提供了一个接口python.socket.sendall(bytes)
。
在套接字中, 由于TCP的优化 Nagle算法机制
或者接受最大值(MSS) < 应接收的值
,出现粘包
,分包
现象
将多次间隔较小、数据量较小的数据,合并成一个数据量大的数据块,然后进行封包。那么这样一来,接收端就必须使用高效科学的拆包机制来分辨这些数据。
(如图1)
解决此问题, 可以将发送的内容转换repr (‘somethingn’ -> ‘somethingn"’), 并添加分隔符.
解析的时候, 通过分割分隔符, 组成列队Queue
, 先出后进. 如果其中有分隔符, 那么其中的内容进入ReadyQueue
,如果分割最后无分隔符, 则进入等待WaitKey
, 等下次分割出的第一个相结合, 进入ReadyQueue
. (如图2)
情况一
接收 "data1n"
这是一个完整的数据包, 分割出来["data1", ""]
会将"data1"
与前面waitKey
将(初始化为""
)结合进入列队. 并将最后面的""
设为waitKey
情况二
接收ata2
这是分包导致的,分割得出["ata2"]
将其与前面waitKey
结合,不进入列队,等待分隔符。将下一次含有分隔符前端的数据结合进入列队。
情况三
接收ndata3ndata4nd
这是粘包导致的后面还加带了一点点数据
分割得出["", "data3", "data4", "d"]
前面的waitKey
与第一个空字符结合,进入列队
带data3与data4进入列队
waitKey
设为第四个"d" .
怎么用python解决呢?
导入库 socket, threading
文章目录
- 导入
- | SocketHandler类
- ·初始化
- ·接收以及异常处理
- △ 解析(重点)
- | 测试
- | 封装
导入
代码语言:javascript复制import socket
from threading import Thread
| SocketHandler类
代码语言:javascript复制class SocketHandler(object): #由于 accept为被动接受, 所以不继承 socket.socket
split_text = "n" # 类变量, 默认分隔符为回车(n)
·初始化
代码语言:javascript复制def to_thread(target, Daemon=True) -> callable:
def run(*args, name=str()) -> Thread:
thread = Thread(target=target, args=args)
thread.setDaemon(Daemon)
if name:
thread.setName(name)
return thread
return run
class SocketHandler(object):
# ...
def __init__(self, socket=socket.socket(), bufsize=1024, codec="utf8", run=True):
self.socket, self.bufsize, self.codec = socket, bufsize, codec
self.waitKey = str()
self.ReadyQueue = []
self._closed = False
if run:
self.run()
def run(self):
self.forever_receive(name=f"客户端{socket}").start()
@to_thread
def forever_receive(self) -> (str, None):
while self.isOpen():
data = self.__recv()
if isinstance(data, bytes) and data:
self.parse_data(self.handle(data))
continue
elif data is ConnectionError:
return
·接收以及异常处理
代码语言:javascript复制def ignore(function):
def func(*args, **kwargs):
try:
return function(*args, **kwargs)
except:
pass
return func
class SocketHandler(object):
# ...
def __del__(self):
self.quit()
def isOpen(self) -> bool:
return not (self._closed and getattr(self.socket, "_closed", False))
def quitEvent(self) -> None:
pass
def quit(self) -> None:
self._closed = True
self.quitEvent()
self.socket.close()
def __recv(self) -> (bytes, ConnectionError):
try:
return self.socket.recv(self.bufsize).strip(b" ") # str.strip()不可用! 会将n省略
except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
self.quit()
return ConnectionError
def __send(self, data: bytes) -> bool:
try:
self.socket.sendall(data)
return True
except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
self.quit()
return False
def send(self, data) -> bool:
if isinstance(data, str):
data = data.encode(self.codec)
elif isinstance(data, (set, list, tuple)):
data = repr(data)
elif isinstance(data, (int, float)):
data = str(data).encode(self.codec)
elif isinstance(data, bytes):
pass
else:
data = bytes(data)
return self.__send(data self.split_text.encode(self.codec))
@ignore
def connect(self, host: str, port: int):
assert 0 <= port <= (2 ** 16) - 1
self.socket.connect((host, port))
△ 解析(重点)
代码语言:javascript复制 def handle(self, data: bytes):
return [d.strip() for d in data.decode(self.codec).split(self.split_text)]
@ignore # assert bool(list)
def parse_data(self, generator: (tuple, list, set)) -> None:
generator = list(generator)
if len(generator) == 1: # 列表为1, 表明无间隔符, 则在等待中添加.
self.waitKey = generator[0]
return
self.ReadyQueue.append(self.waitKey generator.pop(0)) #将原先的等待值
self.waitKey = generator.pop()
self.ReadyQueue.extend(generator)
def recv(self) -> str:
while not self.ReadyQueue:
pass
return self.ReadyQueue.pop(0)
def recv_list(self) -> list:
queue = self.ReadyQueue[:]
self.ReadyQueue = []
return queue
| 测试
代码语言:javascript复制import time
class Debugger:
addr = ("127.0.0.1", 429)
s = socket.socket()
s.bind(addr)
s.listen(10)
user = SocketHandler(bufsize=20, run=False)
user.connect(*addr)
server = SocketHandler(s.accept()[0], run=False)
user.run()
def __init__(self):
self.IO()
def IO(self):
while True:
self.server.send(time.time())
self.server.send(time.time())
print(self.user.recv_list())
time.sleep(3)
debug = Debugger()
debug.IO()
| 封装
代码语言:javascript复制import socket
def ignore(function):
def func(*args, **kwargs):
try:
return function(*args, **kwargs)
except:
pass
return func
class SocketHandler(object):
split_text = "n" # 类变量, 默认分隔符为回车(n)
def __init__(self, socket=socket.socket(), bufsize=1024, codec="utf8", run=True):
self.socket, self.bufsize, self.codec = socket, bufsize, codec
self.waitKey = str()
self.ReadyQueue = []
self._closed = False
if run:
self.forever_receive()
def __del__(self):
self.quit()
def isOpen(self) -> bool:
return not (self._closed and getattr(self.socket, "_closed", False))
def quitEvent(self) -> None:
pass
def quit(self) -> None:
self._closed = True
self.quitEvent()
self.socket.close()
def __recv(self) -> (bytes, ConnectionError):
try:
return self.socket.recv(self.bufsize).strip(b" ") # str.strip()不可用! 会将n省略
except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
self.quit()
return ConnectionError
def __send(self, data: bytes) -> bool:
try:
self.socket.sendall(data)
return True
except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
self.quit()
return False
def send(self, data) -> bool:
if isinstance(data, str):
data = data.encode(self.codec)
elif isinstance(data, (set, list, tuple)):
data = repr(data)
elif isinstance(data, (int, float)):
data = str(data).encode(self.codec)
elif isinstance(data, bytes):
pass
else:
data = bytes(data)
return self.__send(data self.split_text.encode(self.codec))
def forever_receive(self) -> (str, None):
while self.isOpen():
self.receive_datas()
def receive_datas(self) -> bool:
data = self.__recv()
if isinstance(data, bytes) and data:
self.parse_data(self.handle(data))
return True
elif data is ConnectionError:
return False
def handle(self, data: bytes):
return [d.strip() for d in data.decode(self.codec).split(self.split_text)]
@ignore
def parse_data(self, generator: (tuple, list, set)) -> None:
generator = list(generator)
if len(generator) == 1: # 列表为1, 表明无间隔符, 则在等待中添加.
self.waitKey = generator[0]
return
self.ReadyQueue.append(self.waitKey generator.pop(0))
self.waitKey = generator.pop()
self.ReadyQueue.extend(generator)
def recv(self) -> str:
while not self.ReadyQueue:
self.receive_datas()
return self.ReadyQueue.pop(0)
def recv_list(self) -> list:
queue = self.ReadyQueue[:]
self.ReadyQueue = []
return queue
@ignore
def connect(self, host: str, port: int):
assert 0 <= port <= (2 ** 16) - 1
self.socket.connect((host, port))