python 分隔列队机制完美解决TCP粘包分包问题

2023-02-06 10:00:40 浏览数 (2)

通常, TCP接收为210字节(1024 bytes, 213bits), 包括了4 字节的消息头和 1020字节的消息. 那如果超出这个范围呢? 分多次发, python提供了一个接口python.socket.sendall(bytes)。 在套接字中, 由于TCP的优化 Nagle算法机制或者接受最大值(MSS) < 应接收的值,出现粘包,分包现象 将多次间隔较小、数据量较小的数据,合并成一个数据量大的数据块,然后进行封包。那么这样一来,接收端就必须使用高效科学的拆包机制来分辨这些数据。 (如图1)

解决此问题, 可以将发送的内容转换repr (‘somethingn’ -> ‘somethingn"’), 并添加分隔符. 解析的时候, 通过分割分隔符, 组成列队Queue, 先出后进. 如果其中有分隔符, 那么其中的内容进入ReadyQueue,如果分割最后无分隔符, 则进入等待WaitKey, 等下次分割出的第一个相结合, 进入ReadyQueue. (如图2)

情况一 接收 "data1n" 这是一个完整的数据包, 分割出来["data1", ""] 会将"data1"与前面waitKey将(初始化为"")结合进入列队. 并将最后面的""设为waitKey

情况二 接收ata2 这是分包导致的,分割得出["ata2"]将其与前面waitKey结合,不进入列队,等待分隔符。将下一次含有分隔符前端的数据结合进入列队。

情况三 接收ndata3ndata4nd 这是粘包导致的后面还加带了一点点数据 分割得出["", "data3", "data4", "d"]

前面的waitKey与第一个空字符结合,进入列队 带data3与data4进入列队 waitKey设为第四个"d" .

怎么用python解决呢?

导入库 socket, threading

文章目录

  • 导入
  • | SocketHandler类
        • ·初始化
        • ·接收以及异常处理
    • △ 解析(重点)
  • | 测试
  • | 封装

导入

代码语言:javascript复制
import socket
from threading import Thread

| SocketHandler类

代码语言:javascript复制
class SocketHandler(object): #由于 accept为被动接受, 所以不继承 socket.socket
	split_text = "n"  # 类变量, 默认分隔符为回车(n)
·初始化
代码语言:javascript复制
def to_thread(target, Daemon=True) -> callable:
    def run(*args, name=str()) -> Thread:
        thread = Thread(target=target, args=args)
        thread.setDaemon(Daemon)
        if name:
            thread.setName(name)
        return thread

    return run

class SocketHandler(object):
# ...
    def __init__(self, socket=socket.socket(), bufsize=1024, codec="utf8", run=True):
        self.socket, self.bufsize, self.codec = socket, bufsize, codec
        self.waitKey = str()
        self.ReadyQueue = []
        self._closed = False
        if run:
            self.run()

    def run(self):
        self.forever_receive(name=f"客户端{socket}").start()

    @to_thread
    def forever_receive(self) -> (str, None):
        while self.isOpen():
            data = self.__recv()
            if isinstance(data, bytes) and data:
                self.parse_data(self.handle(data))
                continue
            elif data is ConnectionError:
                return
·接收以及异常处理
代码语言:javascript复制
def ignore(function):
    def func(*args, **kwargs):
        try:
            return function(*args, **kwargs)
        except:
            pass

    return func

class SocketHandler(object):
# ...
    def __del__(self):
        self.quit()

    def isOpen(self) -> bool:
        return not (self._closed and getattr(self.socket, "_closed", False))

    def quitEvent(self) -> None:
        pass

    def quit(self) -> None:
        self._closed = True
        self.quitEvent()
        self.socket.close()

    def __recv(self) -> (bytes, ConnectionError):
        try:
            return self.socket.recv(self.bufsize).strip(b" ")  # str.strip()不可用! 会将n省略
        except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
            self.quit()
            return ConnectionError

    def __send(self, data: bytes) -> bool:
        try:
            self.socket.sendall(data)
            return True
        except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
            self.quit()
            return False
    def send(self, data) -> bool:
        if isinstance(data, str):
            data = data.encode(self.codec)
        elif isinstance(data, (set, list, tuple)):
            data = repr(data)
        elif isinstance(data, (int, float)):
            data = str(data).encode(self.codec)
        elif isinstance(data, bytes):
            pass
        else:
            data = bytes(data)
        return self.__send(data   self.split_text.encode(self.codec))

    @ignore
    def connect(self, host: str, port: int):
        assert 0 <= port <= (2 ** 16) - 1
        self.socket.connect((host, port))

△ 解析(重点)

代码语言:javascript复制
    def handle(self, data: bytes):
        return [d.strip() for d in data.decode(self.codec).split(self.split_text)]

    @ignore  # assert bool(list)
    def parse_data(self, generator: (tuple, list, set)) -> None:
        generator = list(generator)
        if len(generator) == 1:  # 列表为1, 表明无间隔符, 则在等待中添加. 
            self.waitKey  = generator[0]
            return
        self.ReadyQueue.append(self.waitKey   generator.pop(0)) #将原先的等待值
        self.waitKey = generator.pop()
        self.ReadyQueue.extend(generator)

    def recv(self) -> str:
        while not self.ReadyQueue:
            pass
        return self.ReadyQueue.pop(0)

    def recv_list(self) -> list:
        queue = self.ReadyQueue[:]
        self.ReadyQueue = []
        return queue

| 测试

代码语言:javascript复制
import time


class Debugger:
    addr = ("127.0.0.1", 429)

    s = socket.socket()
    s.bind(addr)
    s.listen(10)
    user = SocketHandler(bufsize=20, run=False)
    user.connect(*addr)
    server = SocketHandler(s.accept()[0], run=False)
    user.run()

    def __init__(self):
        self.IO()

    def IO(self):
        while True:
            self.server.send(time.time())
            self.server.send(time.time())
            print(self.user.recv_list())
            time.sleep(3)


debug = Debugger()
debug.IO()

| 封装

代码语言:javascript复制
import socket


def ignore(function):
    def func(*args, **kwargs):
        try:
            return function(*args, **kwargs)
        except:
            pass

    return func


class SocketHandler(object):
    split_text = "n"  # 类变量, 默认分隔符为回车(n)

    def __init__(self, socket=socket.socket(), bufsize=1024, codec="utf8", run=True):
        self.socket, self.bufsize, self.codec = socket, bufsize, codec
        self.waitKey = str()
        self.ReadyQueue = []
        self._closed = False
        if run:
            self.forever_receive()

    def __del__(self):
        self.quit()

    def isOpen(self) -> bool:
        return not (self._closed and getattr(self.socket, "_closed", False))

    def quitEvent(self) -> None:
        pass

    def quit(self) -> None:
        self._closed = True
        self.quitEvent()
        self.socket.close()

    def __recv(self) -> (bytes, ConnectionError):
        try:
            return self.socket.recv(self.bufsize).strip(b" ")  # str.strip()不可用! 会将n省略
        except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
            self.quit()
            return ConnectionError

    def __send(self, data: bytes) -> bool:
        try:
            self.socket.sendall(data)
            return True
        except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
            self.quit()
            return False

    def send(self, data) -> bool:
        if isinstance(data, str):
            data = data.encode(self.codec)
        elif isinstance(data, (set, list, tuple)):
            data = repr(data)
        elif isinstance(data, (int, float)):
            data = str(data).encode(self.codec)
        elif isinstance(data, bytes):
            pass
        else:
            data = bytes(data)
        return self.__send(data   self.split_text.encode(self.codec))

    def forever_receive(self) -> (str, None):
        while self.isOpen():
            self.receive_datas()
            
    def receive_datas(self) -> bool:
        data = self.__recv()
        if isinstance(data, bytes) and data:
            self.parse_data(self.handle(data))
            return True
        elif data is ConnectionError:
            return False

    def handle(self, data: bytes):
        return [d.strip() for d in data.decode(self.codec).split(self.split_text)]

    @ignore  
    def parse_data(self, generator: (tuple, list, set)) -> None:
        generator = list(generator)
        if len(generator) == 1:  # 列表为1, 表明无间隔符, 则在等待中添加.
            self.waitKey  = generator[0]
            return
        self.ReadyQueue.append(self.waitKey   generator.pop(0))
        self.waitKey = generator.pop()
        self.ReadyQueue.extend(generator)

    def recv(self) -> str:
        while not self.ReadyQueue:
            self.receive_datas()
        return self.ReadyQueue.pop(0)

    def recv_list(self) -> list:
        queue = self.ReadyQueue[:]
        self.ReadyQueue = []
        return queue

    @ignore
    def connect(self, host: str, port: int):
        assert 0 <= port <= (2 ** 16) - 1
        self.socket.connect((host, port)) 

0 人点赞