python套接字多线程文件传输(大文件切割) - 协程async动态添加任务

2023-02-06 09:58:29 浏览数 (1)

类似于在像QQ一样的文件 聊天服务器中, 既要传输文件,同时要把消息传过去, 而文件太大, 直接阻塞 可以将文件切分为小部分, 协程传递

文章目录

  • 字节(可以二进制读取文件)切分
  • 实现
          • 想必大家都知道TCP服务器 v1.9更啥了吧!

字节(可以二进制读取文件)切分

代码语言:javascript复制
def cut(self, byte:bytes, seg=segment) -> list:
   return [byte[x:x seg] for x in range(0, len(byte), seg)]

十分简洁

实现

自定义的是2kb一切分, 改一下segment即可 0.2秒已更新, 更改 delay

代码语言:javascript复制
import asyncio
import os
import sys
from threading import Thread

base = 1024
segment = base*2
delay = 0.2

new_file = 0
update_file = 1
request_file = 2
normal_text = 3

loop = asyncio.get_event_loop()
main_thread = Thread(target=loop.run_forever)

def save_bytes(file, byte:bytes):
    with open(file, "wb") as f:
        f.write(byte)
def get_eval(str, defined=None):
    try:
        res = eval(str)
        if isinstance(res, type(defined)):
            return res
        raise TypeError
    except:
        return defined
class SEND():
    def __init__(self, index, name, fbytes:bytes, conn:callable, encode='utf8'):
        self.segs = self.cut(fbytes)
        self.encode = encode
        self.total = len(self.segs)
        self.index = index
        self.conn = conn
        self.finish = False
        self.name = name
        asyncio.run_coroutine_threadsafe(self.update(), loop)
    async def update(self):
        self.conn(self.header().encode(self.encode))
        for n, data in enumerate(self.segs):
            self.conn(self.format(n, data).encode(self.encode))
            await asyncio.sleep(delay)
        self.finish = True
    def cut(self, byte:bytes, seg=segment) -> list:
        return [byte[x:x seg] for x in range(0, len(byte), seg)]
    def format(self, process, data) -> str:
        return repr( (update_file, (self.index, process, data)) )
    def header(self) -> str:
        return repr((new_file, (self.index,
                              self.name,
                              self.total)
                     ))

class RECV():
    def __init__(self, index: int, name: str, total: int,):
        self.index,self.name,self.total = index, name, total
        self.progress = -1
        self.file = []
        self.finish = False
    def update(self, p, data):
        if isinstance(p, int) and p - 1 == self.progress:
            self.progress = p
            self.file.append(data)
            if len(self.file) == self.total:
                self.finish = True
                return True

    def mix(self):
        return b''.join(self.file)
    def save(self, path, ):
        if self.finish:
            b = self.mix()
            save_bytes(os.path.join(path, self.name), b)
            self.__save = os.path.join(path, self.name)
            print(f"Save {self.name} at {path}, size {len(b)} b.")
            del self.file  #释放空间
            return True
        return False
    def savepath(self) -> str:
        if self.finish:
            return self.__save
        return False

class send_files():
    def __init__(self, encode='utf8'):
        self.sends = []
        self.encode = encode
    def new_sends(self, name, file, conn):
        if type(file) == str:
                file = file.encode(self.encode)

        self.sends.append(SEND(len(self.sends),name,file,conn)) # index: len(self.sends)-1 1  => len(self.sends)
    def localfile(self, file, conn):
        if os.path.isfile(file):
            _, name = os.path.split(file)
            with open(file, "rb") as f:
                file = f.read()
            self.new_sends(name, file, conn)

class recv_files():
    def __init__(self, decode='utf8', path=None):
        self.recvs = []
        self.decode = decode
        if path is None:
            path = sys.path[0]
        self.path = path
    def new_files(self, index, name, total):
        self.recvs.append(RECV(index, name, total))
    def apply(self, index, progess, data):
        if len(self.recvs) - 1 >= index:
            if self.recvs[index].update(progess, data):
                return self.save(index)

    def save(self, index):
        if len(self.recvs) -1 >= index:
            return self.recvs[index].save(self.path)

class message_handle():
    codec = "utf8"

    def __init__(self, path,):
        self.Sender = send_files(self.codec, )
        self.Receiver = recv_files(self.codec, path)
    def handle(self, data, client):
        _res = get_eval(data.decode(self.codec), (None, ) )
        if len(_res) == 2:
            type, arguments = _res
            if type == new_file:
                self.Receiver.new_files(*arguments)
            elif type == update_file:
                self.Receiver.apply(*arguments)
            elif type == request_file:
                path = self.Receiver.recvs[arguments].savepath()
                if path is False:
                    return
                self.Sender.localfile(path, client._send) # 如若无, 报错False
            elif type == normal_text:
                return arguments
    def send(self, sendpath, conn):
        return self.Sender.localfile(sendpath, conn)
    def get_index(self, index):
        if index   1 <= len(self.Receiver.recvs):
            return self.Receiver.recvs[index]

save_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "resource")
if not os.path.isdir(save_path):
    os.makedirs(save_path)
_handle = message_handle(save_path, )
main_thread.start()
想必大家都知道TCP服务器 v1.9更啥了吧!

0 人点赞