TCP聊天 传输文件服务器服务器套接字v2.2
整个图当封面吧
所有版本记录:
v1.0
: TCP聊天服务器套接字|PyQt5 socket(TCP端口映射 端口放行) logging Thread(含日志,html) anaconda打包32位exe(3.4万字)|python高阶v1.1
: python TCP套接字服务器v1.1-新增服务端命令功能及修改bug(socket PyQt5)v1.2
: python TCP服务器v1.2 - 服务端新增用户登录注册(json, md5加密)v1.3
: python TCP服务器v1.3 - 服务器抗压测试及关闭套接字处理v1.4
: python TCP服务器v1.4 - 客户端连接服务器异常(异常情况分类)处理v1.5
: PyQt5可编辑下拉框(comboBox):editable - python TCP服务器v1.5 - 客户端连接界面增加自定义参数(设置超时, 连接地址可选)v1.6
: Python TCP服务器v1.6 - multiprocessing多进程及Ctrl-c(SIGINT)退出v1.7
: Python TCP服务器v1.7 - PyQt5 server服务端来临v1.8
: python TCP服务器v1.8 - PyQt5登录界面美化 淡入淡出v1.9
: socketTCP协程文件 信息传递 - TCP聊天文件服务器v1.9 - 划时代的版本更新(4.6万字)v2.0
: TCP聊天文件服务器v2.0 - 重大bug修复 PyQt5文件传输可视化v2.1
: TCP聊天文件服务器v2.1 - 服务端线程管理(threading.enumerate)
文章目录
- SocketQueue
-
- Server服务端
-
- 原先:
- 现在:
-
-
-
- socket_queue.py
- server.py Client类的继承
-
- User客户端
-
- 原先
- 现在
-
- 继承关系 SocketQueue -> SocketClient -> Socket
-
-
- socket_queue.py
- user.pyw
- 图片搬家
- 全部代码
-
- 服务端
-
- data.py
- server.pyw
- socket_queue.py
- 客户端
-
- user.pyw
- socket_queue.py
- file.py
- ProgressBar.py
- 资源(0积分)
SocketQueue
python 分隔列队机制完美解决TCP粘包分包问题 - csdn - zmh_program的博文 我的此篇文章已经讲述了大概算法,再次就不在赘述过多, 主要是继承问题
Server服务端
原先:
代码语言:javascript复制class Client(object):
class QuitError(Exception):
def __init__(self, *args):
super().__init__(*args)
def __init__(self, socket, addr, server: Server):
self.socket = socket
self.addr = addr
if not isinstance(server, Server):
raise ValueError
self.server = server
self.encode = self.server.encode
self.max_count = self.server.max_count
self.com = Command_Handler(self)
@self.error
def _recv(self) -> bytes:
return self.socket.recv(self.max_count).decode(encoding=self.encode).strip()
self._recv = lambda: _recv(self)
@self.error
def _send(self, message=None) -> None:
if message:
if isinstance(message,str):
message = repr((normal_text, message)).encode(self.encode)
message = b'n' #防止粘包
self.socket.sendall(message)
self._send = lambda m: _send(self, m)
def __del__(self):
self.socket.close()
def isLogin(self) -> bool:
return hasattr(self, "_login") and self._login
def isOpen(self) -> bool:
return not getattr(self.socket, "_closed", True)
def __filter__(self) -> bool:
"""返回是否在线并已可接受消息"""
return self.isLogin() and self.isOpen()
def recv(self) -> str:
if not hasattr(self, "queue"): self.queue = [] #列队机制
if self.queue:
return self.queue.pop(0)
while True:
result = list(self.server.user_handle.handle(self._recv(), self))
if result:
self.queue.extend(result)
return self.queue.pop(0)
def input(self, text):
self._send(text)
return self.recv()
@ignore
def login(self):
self.username = self.recv()[:15]
if self.server.user_record.__in__(self.username):
if self.server.user_record.handler(self.username, self.input("<font color='red'>请输入您的密码: (右下[send]键发送)</font>")):
self._send(f'<font color="green">欢迎回来, {self.username}.</font>')
else:
self._send('<font color="red">密码错误,请重试.</font>')
self.__del__() # = del self
else:
def normal(string):
return (4 <= len(string) <= 10) and not ('n' in string)
while True:
p1 = self.input("<font color='blue'>[i]提示: 密码需在4 ~ 10位之间, 且不能换行.</font>n<font color='red'>请输入您的密码: (右下[send]键发送)</font>")
if normal(p1):
break
while True:
p2 = self.input("<font color='red'>再次输入您的密码: (右下[send]键发送)</font>")
if p1 == p2:
break
else:
self._send("<font color='red'>密码与前次不符!</font>")
self.server.user_record.handler(self.username, p1)
self._send(f'初来乍到, {self.username}')
self._login = True
self.server.login(self.username, self.addr)
def quit(self) -> None:
if hasattr(self, 'Quitted'):
return
self.Quitted = True
if self.isOpen() is True:
self.socket.close()
self.server.quit(self.username, self.addr)
@ignore
def forever_receive(self):
self.login()
while self.__filter__():
string = self.recv()
if string is None:
continue
elif string == Client.QuitError:
return
elif self.com.iscommand(string):
self._send(self.com.handler(string))
else:
self.server.UserMessage(self.addr, self.username, string)
def error(self, func):
def function(*args, **kwargs):
try:
res = func(*args, **kwargs)
return res
except (ConnectionAbortedError,ConnectionRefusedError,ConnectionResetError, OSError) as e:
self.quit()
except Exception:
logger.exception("error")
return Client.QuitError
return function
def run(self):
self.thread = threading(True,name=f"客户端{self.addr}",target=self.forever_receive)
现在:
socket_queue.py
代码语言:javascript复制import socket
def ignore(function):
def func(*args, **kwargs):
try:
return function(*args, **kwargs)
except:
pass
return func
class SocketQueue:
split_text = "n" # 类变量, 默认分隔符为回车(n)
class QuitError(ConnectionError):
pass
def __init__(self, socket=socket.socket(), bufsize=1024, codec="utf8"):
self.socket, self.bufsize, self.codec = socket, bufsize, codec
self.waitKey = str()
self.ReadyQueue = []
self._closed = False
def __close__(self):
self.quit()
def __del__(self):
self.quit()
def isOpen(self) -> bool:
return not (self._closed and getattr(self.socket, "_closed", False))
def quitEvent(self) -> None:
pass
def quit(self) -> None:
if not self._closed:
self._closed = True
self.quitEvent()
self.socket.close()
def normal_text(self, string: str):
return string.encode(self.codec)
def __recv(self) -> (bytes, ConnectionError):
try:
data = self.socket.recv(self.bufsize).strip(b" ") # str.strip()不可用! 会将n省略
if data:
self.parse_data(self.handle(data))
except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
self.quit()
return self.QuitError
def __send(self, data: bytes) -> bool:
try:
self.socket.sendall(data)
return True
except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
self.quit()
return False
def send(self, data) -> bool:
if isinstance(data, str):
data = self.normal_text(data)
elif isinstance(data, (set, list, tuple)):
data = repr(data)
elif isinstance(data, (int, float)):
data = str(data).encode(self.codec)
elif isinstance(data, bytes):
pass
else:
data = bytes(data)
return self.__send(data self.split_text.encode(self.codec))
def input(self, text):
self.send(text)
return self.recv()
def forever_receive(self) -> (str, None):
while self.isOpen():
self.recv()
def handle(self, data: bytes):
return [d.strip() for d in data.decode(self.codec).split(self.split_text)]
@ignore
def parse_data(self, generator: (tuple, list, set)) -> None:
generator = list(generator)
if len(generator) == 1: # 列表为1, 表明无间隔符, 则在等待中添加.
self.waitKey = generator[0]
return
self.ReadyQueue.append(self.waitKey generator.pop(0))
self.waitKey = generator.pop()
self.ReadyQueue.extend(generator)
def recv(self) -> (str, Exception):
while True:
while not self.ReadyQueue:
self.__recv()
if not self.isOpen():
return self.QuitError
data = self.parse_argument(self.ReadyQueue.pop(0))
if isinstance(data, str) and data:
return data
def parse_argument(self, arg: str) -> str:
return arg.strip()
def recv_list(self) -> list:
queue = self.ReadyQueue[:]
self.ReadyQueue = []
return queue
@ignore
def connect(self, host: str, port: int):
assert 0 <= port <= (2 ** 16) - 1
self.socket.connect((host, port))
server.py Client类的继承
代码语言:javascript复制class Client(SocketQueue):
def __init__(self, socket, addr, server: Server):
super(Client, self).__init__(socket, server.max_count, server.encode)
self.addr = addr
if not isinstance(server, Server):
raise ValueError
self.server = server
self.username = str()
self.com = Command_Handler(self)
self.thread = threading(True, name=f"客户端{self.addr}", target=self.forever_receive)
self._login = False
def normal_text(self, s):
return repr((normal_text, s)).encode(self.codec)
def isLogin(self) -> bool:
return getattr(self, "_login", False)
def __filter__(self) -> bool:
"""返回是否在线并已可接受消息"""
return self.isLogin() and self.isOpen()
@ignore
def login(self):
self.username = self.recv()[:15]
if self.server.user_record.__in__(self.username):
if self.server.user_record.handler(self.username,
self.input("<font color='red'>请输入您的密码: (右下[send]键发送)</font>")):
self.send(f'<font color="green">欢迎回来, {self.username}.</font>')
else:
self.send('<font color="red">密码错误,请重试.</font>')
self.__del__()
else:
while True:
p1 = self.input(
"<font color='blue'>[i]提示: 密码需在4 ~ 10位之间, 且不能换行.</font>n<font color='red'>请输入您的密码: (右下[send]键发送)</font>")
if (4 <= len(p1) <= 10) and not ('n' in p1):
break
while True:
p2 = self.input("<font color='red'>再次输入您的密码: (右下[send]键发送)</font>")
if p1 == p2:
break
else:
self.send("<font color='red'>密码与前次不符!</font>")
self.server.user_record.handler(self.username, p1)
self.send(f'初来乍到, {self.username}')
self._login = True
self.server.login(self.username, self.addr)
def quitEvent(self) -> None:
self.server.quit(self.username, self.addr)
def parse_argument(self, arg: str) -> str:
return self.server.user_handle.handle(arg.strip(), self)
@ignore
def forever_receive(self):
self.login()
while self.__filter__():
string = self.recv()
if string is None:
continue
elif self.com.iscommand(string):
self.send(self.com.handler(string))
else:
self.server.UserMessage(self.addr, self.username, string)
是不是非常简练?
User客户端
原先
代码语言:javascript复制class Socket:
def __init__(self, Function=lambda i: None, code='utf-8'):
self.socket = socket.socket()
self.code = code
self._logger = Function
self.socket.settimeout(TIMEOUT)
self._connect = False
def set_func(self, f):
self._logger = f
def retry(self):
del self.socket
self.socket = socket.socket()
self.socket.settimeout(TIMEOUT)
def socket_connect(self):
if hasattr(self, "_socc"):
self.retry() # socket多次连接不同地址会出现 `OSError: [WinError 10022] 提供了一个无效的参数.`
else:
self._socc = True
try:
self.socket.connect(self.addr)
except socket.gaierror:
return f"获取地址信息失败.请确保{self.addr[0]}是有效地址或ipv4/ipv6"
except socket.timeout:
return f"连接超时({TIMEOUT}s).服务器[{self.addr[0]}:{self.addr[1]}]连接失败."
except OverflowError:
return f"输入的端口号为{self.addr[1]},端口号必须在0~65535间."
except ConnectionResetError:
pass
except OSError as e:
if int(self.addr[1]) == 0:
return "[WinError 10049] 在其上下文中,该请求的地址无效"
return str(e.args[1]).rstrip('。')
except TypeError:
return f"网络地址格式错误: 格式[ip:port] port必须为整型变量0~65535间."
except:
logging.exception(msg=str())
else:
return True
def connect(self, data=None, ip=None, port: int = 0000, show=lambda _: None):
if data is None:
assert hasattr(self, "dt"), "user data is empty."
else:
self.dt = data
if ip is None:
assert hasattr(self, "addr"), "ip address is empty."
else:
self.addr = (ip, port)
result = self.socket_connect()
if result is True:
#show("[{}]: 连接成功!".format(get_time()))
s._connect = True
return True
else:
show(f"[{get_time()}]: {result}")
s._connect = False
return False
def info(self):
return repr(self.dt)
@to_logging
def _recv(self):
result = self.handle.handle(self.socket.recv(base ** 2))
for data in result:
if data and isinstance(data, str):
kb = len(data) / base
self._logger(f'{data} {"<font size=1>%0.2f kb</font>" % kb}')
def _handler(self):
self.handle.send_text(self.dt)
while True:
if self._recv() == "CLOSE":
return
def send(self, b:(str, bytes)):
b = b.strip()
if isinstance(b, str):
b = b.encode(self.code)
if b:
self.socket.send(b b'n')
def run(self): # 线程
self.handle = message_handle(self.send)
self._connect = True
self.thread = threading(True, target=self._handler)
现在
继承关系 SocketQueue -> SocketClient -> Socket
socket_queue.py
代码语言:javascript复制import socket
from datetime import datetime
get_time = lambda: datetime.now().strftime('%H:%M:%S')
TIMEOUT = 2
def ignore(function):
def func(*args, **kwargs):
try:
return function(*args, **kwargs)
except:
pass
return func
class SocketQueue:
split_text = "n" # 类变量, 默认分隔符为回车(n)
class QuitError(ConnectionError):
pass
def __init__(self, sock=socket.socket(), bufsize=1024, codec="utf8"):
self.socket, self.bufsize, self.codec = sock, bufsize, codec
self.waitKey = str()
self.ReadyQueue = []
self._closed = False
def re_setup(self):
self.waitKey = str()
self.ReadyQueue = []
self._closed = False
def __close__(self):
self.quit()
def __del__(self):
self.quit()
def isOpen(self) -> bool:
return not (self._closed and getattr(self.socket, "_closed", False))
def quitEvent(self) -> None:
pass
def quit(self) -> None:
if not self._closed:
self._closed = True
self.quitEvent()
self.socket.close()
def normal_text(self, string: str):
return string.encode(self.codec)
def __recv(self) -> (bytes, ConnectionError):
try:
data = self.socket.recv(self.bufsize).strip(b" ") # str.strip()不可用! 会将n省略
if data:
self.parse_data(self.handle(data))
except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
if isinstance(e, socket.timeout):
return
self.quit()
return self.QuitError
def __send(self, data: bytes) -> bool:
try:
self.socket.sendall(data)
return True
except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
self.quit()
return False
def send(self, data) -> bool:
if isinstance(data, str):
data = self.normal_text(data)
elif isinstance(data, (set, list, tuple)):
data = repr(data).encode(self.codec)
elif isinstance(data, (int, float)):
data = str(data).encode(self.codec)
elif isinstance(data, bytes):
pass
else:
data = bytes(data)
return self.__send(data self.split_text.encode(self.codec))
def input(self, text):
self.send(text)
return self.recv()
def forever_receive(self) -> (str, None):
while self.isOpen():
self.recv()
def handle(self, data: bytes):
return [d.strip() for d in data.decode(self.codec).split(self.split_text)]
@ignore
def parse_data(self, generator: (tuple, list, set)) -> None:
generator = list(generator)
if len(generator) == 1: # 列表为1, 表明无间隔符, 则在等待中添加.
self.waitKey = generator[0]
return
self.ReadyQueue.append(self.waitKey generator.pop(0))
self.waitKey = generator.pop()
self.ReadyQueue.extend(generator)
def recv(self) -> (str, Exception):
while True:
while (not self.ReadyQueue) and self.isOpen():
self.__recv()
if not self.isOpen():
return self.QuitError
data = self.parse_argument(self.ReadyQueue.pop(0))
if isinstance(data, str) and data:
return data
def parse_argument(self, arg: str) -> str:
return arg.strip()
def recv_list(self) -> list:
queue = self.ReadyQueue[:]
self.ReadyQueue = []
return queue
@ignore
def connect(self, host: str, port: int):
assert 0 <= port <= (2 ** 16) - 1
self.socket.connect((host, port))
def err_connect(sock, addr: tuple) -> (str, bool):
try:
sock.connect(addr)
except socket.gaierror:
return f"获取地址信息失败.请确保{addr[0]}是有效地址或ipv4/ipv6"
except socket.timeout:
return f"连接超时({TIMEOUT}s).服务器[{addr[0]}:{addr[1]}]连接失败."
except OverflowError:
return f"输入的端口号为{addr[1]},端口号必须在0~65535间."
except (ConnectionError, ConnectionResetError, ConnectionRefusedError, ConnectionAbortedError):
return "请求拒绝."
except OSError as e:
if int(addr[1]) == 0:
return "[WinError 10049] 在其上下文中,该请求的地址无效"
return str(e.args[1]).rstrip('。')
except TypeError:
return f"网络地址格式错误: 格式[ip:port] port必须为整型变量0~65535间."
except:
return "连接错误"
else:
return True
class SocketClient(SocketQueue):
addr = "localhost", 429
header = ""
def __init__(self, addr, *args, **kwargs):
super(SocketClient, self).__init__(*args, **kwargs)
self.socket.settimeout(TIMEOUT)
self.connected = False
self.__is_connect = False
self._traceback = lambda *_: None
def is_connect(self) -> bool:
return self.__is_connect
def change_address(self, host: str, port: int):
self.addr = host, port
def change_header(self, header: str):
self.header = header
def set_failEvent(self, function):
self._traceback = function
def failEvent(self, reason):
self._traceback(f"[{get_time()}]: {reason}")
def connect(self):
if self.connected:
self.socket = socket.socket()
self.socket.settimeout(TIMEOUT)
self.re_setup()
self.connected = True
_res = err_connect(self.socket, self.addr)
if _res is True:
self.__is_connect = True
else:
self.failEvent(_res)
self.__is_connect = False
return self.__is_connect
def quitEvent(self) -> None:
self.__is_connect = False
user.pyw
代码语言:javascript复制class Socket(SocketClient):
def __init__(self, Function=lambda i: None, code='utf-8'):
super(Socket, self).__init__(code)
self.handler = message_handle(self.send)
def parse_argument(self, arg: str) -> str:
return self.handler.handle(arg.strip())
def recv(self):
result = super(Socket, self).recv()
if isinstance(result, str) and result:
self._traceback(f'{result} <font size=1>{covert(len(result))}</font>')
return self.isOpen()
def forever_receive(self) -> None:
self.handler.send_text(self.header)
while True:
if not self.recv():
return
def run(self): # 线程
threading(True, target=self.forever_receive)
def quitEvent(self):
self.__is_connect = False
if main.is_setup:
main.ConnectionError_signal.emit()
图片搬家
全部代码
这回我觉得一个一个粘贴太麻烦, 尤其是python文件的增加, 所以我同时增加了资源,你要是有vip就解压, 没下载机会也可以直接复制
服务端
data.py
代码语言:javascript复制from json import load, dump
from os import path, mkdir
from hashlib import md5
from time import time
def encode(data: str):
m = md5()
m.update(data.encode('utf8'))
return m.hexdigest()
file = '.clientsdata.json'
folder = '.clients'
if not path.exists(folder):
mkdir(folder)
class user():
def __init__(self):
if path.exists(file):
with open(file, 'r') as f:
self.data = load(f)
else:
self.data = {}
def __get__(self, username, default=None) -> tuple:
return self.data.get(username, default)
def __in__(self, username) -> bool:
return username in self.data.keys()
def __write__(self) -> None:
with open(file, 'w') as f:
dump(self.data, f, indent=4)
def __register__(self, username, password, time: (int, float)) -> None:
self.data[username] = (encode(password), int(time))
self.__write__()
def __login__(self, username, password) -> bool:
return self.data[username][0] == encode(password)
def handler(self, username, password) -> bool:
if self.__in__(username):
return self.__login__(username, password)
else:
self.__register__(username, password, time())
return True
def get_time(self, username):
return self.data[username][1]
server.pyw
代码语言:javascript复制import asyncio
import logging
import os
import socket
import sys
import time
from threading import Thread
from threading import enumerate as _enumerate
from PyQt5 import QtCore, QtGui, QtWidgets
from psutil import cpu_percent
import data # data.py
from socket_queue import SocketQueue # socket_queue.py
__version__ = 2.2
base = 1024
segment = base * 2 # 防止切断
delay = 0.04
new_file = 0
update_file = 1
request_file = 2
normal_text = 3
loop = asyncio.get_event_loop()
class QLogger(logging.Handler):
def __init__(self, *args, **kwargs):
logging.Handler.__init__(self, *args, **kwargs)
self.output = lambda *_: None
self.setFormatter(logging.Formatter(
"[<font color='darkgreen'>%(asctime)s</font>(<font color='blue'>%(levelname)s</font>)]: <font color='brown'>%(message)s</font>"))
def emit(self, record):
record = self.format(record)
if hasattr(self, "output"):
self.output(record)
def connect(self, func):
if callable(func):
self.output = func
def threading(Daemon, name=None, **kwargs):
thread = Thread(**kwargs)
thread.setDaemon(Daemon)
if name:
thread.setName(name)
thread.start()
return thread
file_thread = threading(True, "文件传输", target=loop.run_forever)
def thread_info(thread: Thread):
return f"{str(thread._name).ljust(12)}{thread._ident}({thread.__class__.__name__})"
def ignore(function):
def i(*args, **kwargs):
try:
function(*args, **kwargs)
except:
return
return i
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.DEBUG)
Qlog = QLogger()
logger.addHandler(Qlog)
filehandle = logging.FileHandler("log.txt")
filehandle.setFormatter(logging.Formatter("[%(asctime)s(%(levelname)s)]: %(message)s"))
logger.addHandler(filehandle)
logger.setLevel(logging.DEBUG)
def to_logging(command):
def logs(*args, **kwargs):
try:
_result = command(*args, **kwargs)
if _result is None:
return True
return _result
except:
logger.exception(str())
return False
return logs
class Command_Handler(object):
def __init__(self, bind):
"""Bind Client class"""
assert isinstance(bind, Client)
self.client = bind
def _function(self, _list):
data = {"/info": {"-v": self.get_version(),
"-id": self.get_id(),
"-i": self.info(),
"-h": self.help(),
"-name": self.name()},
}
_dict = data
for n in range(len(_list)):
if type(_dict) == dict:
_dict = _dict.get(_list[n], self.unknown(" ".join(_list)))
else:
break
if type(_dict) == dict:
_dict = "Error:n<font color='blue'>This command must take more arguments. Such as %s.</font>" % list(
_dict.keys())
return _dict
@staticmethod
def help():
return """/info [-v] [-id] [-i]
-v : get version of program.
-id : get your id.
-i : get information.
-h : help.
-name : get your name
For example, <font color=red>/info -id</font>"""
@staticmethod
def get_version():
return "version : " str(__version__)
def get_id(self):
return "Your id is {}.".format(id(self.client))
def name(self):
return "Your name is {}.".format(self.client.username)
def info(self):
return f"Socket Server[version {self.get_version()}] By zmh."
def unknown(self, s):
return """Error:
No command named "%s". Please search [/info -h] to help.
%s""" % (s, self.help())
def cut(self, string):
return string.strip().split()
def handler(self, c):
return "<font color='gray'>[command]</font><font color='brown'>%s</font>n%s" % (
c, str(self._function(self.cut(c))))
def iscommand(self, i):
return i.strip().startswith("/")
class Server:
join_message = "<font color='red'>Server></font> <font color='blue'>%s(%s)</font> 连接服务器. 当前在线人数: <font color='red'>%s</font>"
user_message = "<font color='%s'>%s(%s)%s></font> %s"
quit_message = "%s(%s) 下线了, %s"
def __init__(self, usernumUpdate=lambda _: None):
self.user_num_change = usernumUpdate
def Setup(self, addr, port, backlog=10, max_count=base ** 2, encode='utf8'):
self.user_handle = message_handle(self)
self.address = addr, port
self.backlog = backlog
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind(self.address)
self.socket.listen(backlog)
self.max_count = max_count
self.connect = []
self.encode = encode
self.user_record = data.user()
return self.run()
def clear_socket(self, clear_ms=500):
logger.info(f"Clear the closed socket once every {clear_ms} ms.")
while True:
del_list = list(filter(lambda c: hasattr(c, 'Quitted') or (not c.isOpen()), self.connect))
for user in del_list:
self.connect.remove(user)
# if del_list:
# logger.info(f"Clear the closed client socket, number is {len(del_list)}.")
# else:
# logger.info('None of the sockets have been cleaned.')
time.sleep(clear_ms / 1000)
def run(self):
logger.debug(f"Server [{':'.join(map(str, self.address))}] on.")
logger.info(f"Server pid {os.getpid()}.")
logger.info(f"Max receive length {covert(self.max_count, fine=True)}.")
logger.info(
f"Single file transfer speed: <font color='blue'>{covert(segment * (1 // delay))}/s<font>({covert(segment)} × {int(1 // delay)})")
gui.Database_signal.emit("<font color='gray'>[Transfer speed[-SEGMENT]] = [Maximum load] ÷ 2.</font>")
logger.info("Backlog number: " str(self.backlog))
logger.info('The CODEC is sent as ' self.encode)
logger.info("The server is listening on the port.")
threading(Daemon=True, name="离线清理", target=self.clear_socket)
return threading(Daemon=True, name="监听端口", target=self.accept_client)
def _get_Clients(self) -> list:
def func(c):
return c.__filter__()
return list(filter(func, self.connect))
def _get_sockets(self): # return int
i = len(self._get_Clients())
self.user_num_change(i)
return i
def _str_sockets(self):
return f"当前人数 {self._get_sockets()}"
def ServerMessage(self, mes, inc=True):
for user in self._get_Clients():
if user.__filter__():
user.send(mes)
def UserMessage(self, address, _user, mes, inc=True):
if not mes:
return
for user in self.connect:
if user.__filter__():
username = user.username
send_message = Server.user_message % ("brown" if _user == username else "red",
_user,
address,
"(我自己)" if _user == username else "",
mes)
user.send(send_message)
logger.info(f"{address}[{_user}] : {mes}")
def error_handle(self):
for user in filter(lambda c: not c.isOpen(), self.connect):
self.connect.remove(user)
def accept_client(self):
while True:
sock, (address, _) = self.socket.accept() # 阻塞,等待客户端连接
self.connect.append(Client(sock, address, self))
logger.info(f'The address {address} is connected to the server.')
def quit(self, username, address):
QuitMessage = Server.quit_message % (username, address, self._str_sockets())
logger.info(QuitMessage)
self.ServerMessage(QuitMessage, False)
def login(self, username, address):
logger.info(f"{address}[{username}] 登录服务器 , " self._str_sockets())
self.ServerMessage(Server.join_message % (username, address, self._get_sockets()))
class Client(SocketQueue):
def __init__(self, socket, addr, server: Server):
super(Client, self).__init__(socket, server.max_count, server.encode)
self.addr = addr
if not isinstance(server, Server):
raise ValueError
self.server = server
self.username = str()
self.com = Command_Handler(self)
self.thread = threading(True, name=f"客户端{self.addr}", target=self.forever_receive)
self._login = False
def normal_text(self, s):
return repr((normal_text, s)).encode(self.codec)
def isLogin(self) -> bool:
return getattr(self, "_login", False)
def __filter__(self) -> bool:
"""返回是否在线并已可接受消息"""
return self.isLogin() and self.isOpen()
@ignore
def login(self):
self.username = self.recv()[:15]
if self.server.user_record.__in__(self.username):
if self.server.user_record.handler(self.username,
self.input("<font color='red'>请输入您的密码: (右下[send]键发送)</font>")):
self.send(f'<font color="green">欢迎回来, {self.username}.</font>')
else:
self.send('<font color="red">密码错误,请重试.</font>')
self.__del__()
else:
while True:
p1 = self.input(
"<font color='blue'>[i]提示: 密码需在4 ~ 10位之间, 且不能换行.</font>n<font color='red'>请输入您的密码: (右下[send]键发送)</font>")
if (4 <= len(p1) <= 10) and not ('n' in p1):
break
while True:
p2 = self.input("<font color='red'>再次输入您的密码: (右下[send]键发送)</font>")
if p1 == p2:
break
else:
self.send("<font color='red'>密码与前次不符!</font>")
self.server.user_record.handler(self.username, p1)
self.send(f'初来乍到, {self.username}')
self._login = True
self.server.login(self.username, self.addr)
def quitEvent(self) -> None:
self.server.quit(self.username, self.addr)
def parse_argument(self, arg: str) -> str:
return self.server.user_handle.handle(arg.strip(), self)
@ignore
def forever_receive(self):
self.login()
while self.__filter__():
string = self.recv()
if string is None:
continue
elif self.com.iscommand(string):
self.send(self.com.handler(string))
else:
self.server.UserMessage(self.addr, self.username, string)
def get_host_ip() -> str:
"""get current IP address"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
class Interface(QtWidgets.QMainWindow):
Database_signal = QtCore.pyqtSignal(str)
Usernum_signal = QtCore.pyqtSignal(int)
def __init__(self):
super(Interface, self).__init__()
self.setupUi()
self.show()
def setupUi(self):
self.setObjectName("MainWindow")
self.resize(1088, 685)
font = QtGui.QFont()
font.setFamily("Consolas")
font.setPointSize(11)
self.setFont(font)
self.setStyleSheet("")
self.centralwidget = QtWidgets.QWidget(self)
self.centralwidget.setObjectName("centralwidget")
self.gridLayout = QtWidgets.QGridLayout(self.centralwidget)
self.gridLayout.setObjectName("gridLayout")
self.label_6 = QtWidgets.QLabel(self.centralwidget)
self.label_6.setObjectName("label_6")
self.gridLayout.addWidget(self.label_6, 4, 0, 1, 1)
self.textEdit_2 = QtWidgets.QTextEdit(self.centralwidget)
self.textEdit_2.setObjectName("textEdit_2")
self.gridLayout.addWidget(self.textEdit_2, 5, 0, 1, 1)
self.groupBox = QtWidgets.QGroupBox(self.centralwidget)
self.groupBox.setObjectName("groupBox")
self.formLayout_2 = QtWidgets.QFormLayout(self.groupBox)
self.formLayout_2.setObjectName("formLayout_2")
self.label_2 = QtWidgets.QLabel(self.groupBox)
self.label_2.setObjectName("label_2")
self.formLayout_2.setWidget(0, QtWidgets.QFormLayout.LabelRole, self.label_2)
self.lineEdit = QtWidgets.QLineEdit(self.groupBox)
self.lineEdit.setObjectName("lineEdit")
self.formLayout_2.setWidget(0, QtWidgets.QFormLayout.FieldRole, self.lineEdit)
self.label_8 = QtWidgets.QLabel(self.groupBox)
self.label_8.setObjectName("label_8")
self.formLayout_2.setWidget(1, QtWidgets.QFormLayout.LabelRole, self.label_8)
self.lineEdit_3 = QtWidgets.QLineEdit(self.groupBox)
self.lineEdit_3.setObjectName("lineEdit_3")
self.formLayout_2.setWidget(1, QtWidgets.QFormLayout.FieldRole, self.lineEdit_3)
self.label_7 = QtWidgets.QLabel(self.groupBox)
self.label_7.setObjectName("label_7")
self.formLayout_2.setWidget(2, QtWidgets.QFormLayout.LabelRole, self.label_7)
self.lineEdit_2 = QtWidgets.QLineEdit(self.groupBox)
self.lineEdit_2.setReadOnly(True)
self.lineEdit_2.setObjectName("lineEdit_2")
self.formLayout_2.setWidget(2, QtWidgets.QFormLayout.FieldRole, self.lineEdit_2)
spacerItem = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
self.formLayout_2.setItem(5, QtWidgets.QFormLayout.LabelRole, spacerItem)
self.pushButton = QtWidgets.QPushButton(self.groupBox)
self.pushButton.setObjectName("pushButton")
self.formLayout_2.setWidget(5, QtWidgets.QFormLayout.FieldRole, self.pushButton)
self.lineEdit_4 = QtWidgets.QLineEdit(self.groupBox)
self.lineEdit_4.setObjectName("lineEdit_4")
self.formLayout_2.setWidget(3, QtWidgets.QFormLayout.FieldRole, self.lineEdit_4)
self.label_9 = QtWidgets.QLabel(self.groupBox)
self.label_9.setObjectName("label_9")
self.formLayout_2.setWidget(3, QtWidgets.QFormLayout.LabelRole, self.label_9)
self.label_10 = QtWidgets.QLabel(self.groupBox)
self.label_10.setObjectName("label_10")
self.formLayout_2.setWidget(4, QtWidgets.QFormLayout.LabelRole, self.label_10)
self.lineEdit_5 = QtWidgets.QLineEdit(self.groupBox)
self.lineEdit_5.setObjectName("lineEdit_5")
self.formLayout_2.setWidget(4, QtWidgets.QFormLayout.FieldRole, self.lineEdit_5)
self.gridLayout.addWidget(self.groupBox, 0, 0, 4, 1)
self.line = QtWidgets.QFrame(self.centralwidget)
self.line.setFrameShape(QtWidgets.QFrame.VLine)
self.line.setFrameShadow(QtWidgets.QFrame.Sunken)
self.line.setObjectName("line")
self.gridLayout.addWidget(self.line, 2, 1, 4, 1)
self.label_5 = QtWidgets.QLabel(self.centralwidget)
font = QtGui.QFont()
font.setFamily("Comic Sans MS")
font.setPointSize(14)
font.setBold(False)
font.setWeight(50)
self.label_5.setFont(font)
self.label_5.setStyleSheet(
"background-color:qlineargradient(spread:pad, x1:0, y1:0, x2:1, y2:1, stop:0 rgba(0, 255, 241, 255), stop:0.930348 rgba(0, 158, 255, 255));n"
"color:rgb(85, 0, 255)")
self.label_5.setObjectName("label_5")
self.gridLayout.addWidget(self.label_5, 0, 1, 1, 3)
self.groupBox_2 = QtWidgets.QGroupBox(self.centralwidget)
self.groupBox_2.setEnabled(False)
self.groupBox_2.setObjectName("groupBox_2")
self.gridLayout_2 = QtWidgets.QGridLayout(self.groupBox_2)
self.gridLayout_2.setObjectName("gridLayout_2")
self.lcdNumber = QtWidgets.QLCDNumber(self.groupBox_2)
self.lcdNumber.setObjectName("lcdNumber")
self.gridLayout_2.addWidget(self.lcdNumber, 1, 1, 1, 1)
self.line_3 = QtWidgets.QFrame(self.groupBox_2)
self.line_3.setFrameShape(QtWidgets.QFrame.HLine)
self.line_3.setFrameShadow(QtWidgets.QFrame.Sunken)
self.line_3.setObjectName("line_3")
self.gridLayout_2.addWidget(self.line_3, 3, 0, 1, 2)
self.listView_2 = QtWidgets.QListWidget(self.groupBox_2)
self.listView_2.setObjectName("listView_2")
self.gridLayout_2.addWidget(self.listView_2, 5, 0, 1, 2)
self.label_4 = QtWidgets.QLabel(self.groupBox_2)
self.label_4.setObjectName("label_4")
self.gridLayout_2.addWidget(self.label_4, 4, 0, 1, 1)
self.label_3 = QtWidgets.QLabel(self.groupBox_2)
self.label_3.setObjectName("label_3")
self.gridLayout_2.addWidget(self.label_3, 1, 0, 1, 1)
self.label = QtWidgets.QLabel(self.groupBox_2)
self.label.setObjectName("label")
self.gridLayout_2.addWidget(self.label, 0, 0, 1, 1)
self.progressBar = QtWidgets.QProgressBar(self.groupBox_2)
self.progressBar.setProperty("value", 0)
self.progressBar.setObjectName("progressBar")
self.gridLayout_2.addWidget(self.progressBar, 0, 1, 1, 1)
self.gridLayout.addWidget(self.groupBox_2, 1, 2, 5, 2)
self.textEdit_2.setReadOnly(True)
self.setCentralWidget(self.centralwidget)
self.statusbar = QtWidgets.QStatusBar(self)
self.statusbar.setObjectName("statusbar")
self.setStatusBar(self.statusbar)
self.groupBox_2.setEnabled(False)
self.retranslateUi()
self.pushButton.clicked.connect(self.run)
QtCore.QMetaObject.connectSlotsByName(self)
self.lcdNumber.display(0)
self.cpu = cpuThread()
self.cpu.signal.connect(self.progressUpdate)
self.Database_signal.connect(self.databaseUpdate)
self.Usernum_signal.connect(self.usernumUpdate)
Qlog.connect(self.Database_signal.emit)
self.timer = QtCore.QTimer(self)
self.timer.timeout.connect(self.get_threadings)
self.timer.start(500)
def progressUpdate(self, v):
self.progressBar.setValue(int(v))
@to_logging
def handle(self):
self.max_recv = int(float(self.lineEdit.text()) * 1024) # 单位是kb, 换算为字节.
global segment
segment = self.max_recv // 2
self.backlog = int(self.lineEdit_3.text())
self.addr = self.lineEdit_4.text()
self.port = int(self.lineEdit_5.text())
server.Setup(self.addr, self.port, self.backlog, self.max_recv)
def run(self, _):
if self.handle():
self.groupBox.setEnabled(False)
self.groupBox_2.setEnabled(True)
self.cpu.start()
def get_threadings(self):
_e = _enumerate()
if hasattr(self, "data") and self.data == _e:
return
self.data = _e
self.listView_2.clear()
self.listView_2.addItems(map(thread_info, self.data))
def databaseUpdate(self, data: str):
if data:
time.sleep(0.01) # Qpainter 过快导致死机
self.textEdit_2.append(data.strip())
self.textEdit_2.moveCursor(QtGui.QTextCursor.End)
def usernumUpdate(self, i):
if i != int(self.lcdNumber.value()):
self.lcdNumber.display(i)
def retranslateUi(self, ):
_translate = QtCore.QCoreApplication.translate
self.setWindowTitle(_translate("MainWindow", "Socket Server"))
self.label_6.setText(_translate("MainWindow", "Database(Logging and traceback):"))
self.groupBox.setTitle(_translate("MainWindow", "Server Setup"))
self.label_2.setText(_translate("MainWindow", "Maximum load(kb):"))
self.lineEdit.setText(_translate("MainWindow", str(1024)))
self.label_8.setText(_translate("MainWindow", "backlog:"))
self.lineEdit_3.setText(_translate("MainWindow", "10"))
self.label_7.setText(_translate("MainWindow", "CODEC(Unalterable):"))
self.lineEdit_2.setText(_translate("MainWindow", "utf8"))
self.pushButton.setText(_translate("MainWindow", "Run"))
self.lineEdit_4.setText(_translate("MainWindow", "127.0.0.1"))
self.label_9.setText(_translate("MainWindow", "Address:"))
self.label_10.setText(_translate("MainWindow", "Port:"))
self.lineEdit_5.setText(_translate("MainWindow", "429"))
self.label_5.setText(_translate("MainWindow", f"TCP Server v{__version__}"))
self.groupBox_2.setTitle(_translate("MainWindow", "Run"))
self.label_4.setText(_translate("MainWindow", "Threadings:"))
self.label_3.setText(_translate("MainWindow", "Online user(s):"))
self.label.setText(_translate("MainWindow", "Running memory with CPU"))
class cpuThread(QtCore.QThread):
signal = QtCore.pyqtSignal(int)
def run(self) -> None:
while True:
self.signal.emit(int(cpu_percent(interval=1)))
def save_bytes(file, byte: bytes):
with open(file, "wb") as f:
f.write(byte)
def get_eval(str, defined=None):
try:
res = eval(str)
if isinstance(res, type(defined)):
return res
raise TypeError
except:
return defined
class SEND:
def __init__(self, index, name, fbytes: bytes, conn: callable, encode='utf8'):
self.segs = self.cut(fbytes)
self.encode = encode
self.total = len(self.segs)
self.index = index
self.conn = conn
self.finish = False
self.name = name
self.size = len(fbytes)
asyncio.run_coroutine_threadsafe(self.update(), loop)
async def update(self):
self.conn(self.header().encode(self.encode))
for n, data in enumerate(self.segs):
self.conn(self.format(n, data).encode(self.encode))
await asyncio.sleep(delay)
self.finish = True
def cut(self, byte: bytes, seg=segment) -> list:
return [byte[x:x seg] for x in range(0, len(byte), seg)]
def format(self, process, data) -> str:
return repr((update_file, (self.index, process, data)))
def header(self) -> str:
return repr((new_file, (self.index,
self.name,
self.total,
self.size)
))
class RECV:
def __init__(self, index: int, name: str, total: int, size: int):
self.index, self.name, self.total, self.size = index, name, total, size
self.progress = -1
self.file = []
self.finish = False
def update(self, p, data):
if isinstance(p, int) and p - 1 == self.progress:
self.progress = p
self.file.append(data)
if len(self.file) == self.total:
self.finish = True
return True
def mix(self):
return b''.join(self.file)
def save(self, path, ):
if self.finish:
b = self.mix()
save_bytes(os.path.join(path, self.name), b)
self.__save = os.path.join(path, self.name)
logger.info(f"Save {self.name} at {path}, size {len(b)} b.")
self.file = [] # 不可采取 del self.file. #释放空间
return True
return False
def savepath(self) -> str:
if self.finish:
return self.__save
return ""
class send_files:
def __init__(self, encode='utf8'):
self.sends = []
self.encode = encode
def new_sends(self, name, file, conn):
if type(file) == str:
file = file.encode(self.encode)
self.sends.append(SEND(len(self.sends), name, file, conn)) # index: len(self.sends)-1 1 => len(self.sends)
def localfile(self, file, conn):
if os.path.isfile(file):
_, name = os.path.split(file)
with open(file, "rb") as f:
file = f.read()
self.new_sends(name, file, conn)
class recv_files:
def __init__(self, decode='utf8', path=None):
self.recvs = []
self.decode = decode
if path is None:
path = sys.path[0]
self.path = path
def new_files(self, index, name, total, size):
self.recvs.append(RECV(index, name, total, size))
logger.info(f"New file - {name} - {covert(size, fine=True)}.")
def apply(self, index, progress, data):
if len(self.recvs) - 1 >= index:
if self.recvs[index].update(progress, data):
if self.save(index):
return index, self.recvs[index].name
else:
return False
def save(self, index):
if len(self.recvs) - 1 >= index:
return self.recvs[index].save(self.path)
class message_handle:
codec = "utf8"
def __init__(self, server: Server, path=None):
if path is None:
path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "resource")
if not os.path.isdir(path):
os.makedirs(path)
self.Sender = send_files(self.codec, )
self.Receiver = recv_files(self.codec, path)
self.files_record = {}
self.server = server
@to_logging
def handle(self, data, client: Client):
_res = get_eval(data, tuple())
if len(_res) == 2:
type, arguments = _res
if type == new_file:
index, name, total, size = arguments
if not client.username in self.files_record:
if index == 0:
self.files_record[client.username] = [len(self.Receiver.recvs), ]
else:
self.files_record[client.username].append(len(self.Receiver.recvs))
self.Receiver.new_files(len(self.Receiver.recvs), name, total, size)
elif type == update_file:
index, progress, data = arguments
if client.username in self.files_record:
if not len(self.files_record[client.username]) >= index 1:
index = len(self.files_record[client.username]) - 1
_res = self.Receiver.apply(self.files_record[client.username][index], progress, data)
if _res:
INDEX, NAME = _res
self.server.UserMessage(client.addr, client.username, f'<a href="{INDEX}">{NAME}</a>')
elif type == request_file:
path = self.Receiver.recvs[arguments].savepath()
if path:
self.Sender.localfile(path, client.send) # 如若无, 报错False
elif type == normal_text:
return arguments
def send(self, sendpath, conn):
return self.Sender.localfile(sendpath, conn)
def get_index(self, index):
if index 1 <= len(self.Receiver.recvs):
return self.Receiver.recvs[index]
def covert(byte, fine=False):
"""
位 bit (比特)(Binary Digits):存放一位二进制数,即 0 或 1,最小的存储单位。
字节 byte:8个二进制位为一个字节(B),最常用的单位。
其中1024=2^10 ( 2 的10次方),
1KB (Kilobyte 千字节)=1024B,
1MB (Megabyte 兆字节 简称“兆”)=1024KB,
1GB (Gigabyte 吉字节 又称“千兆”)=1024MB,
1TB (Trillionbyte 万亿字节 太字节)=1024GB,
1PB(Petabyte 千万亿字节 拍字节)=1024TB,
1EB(Exabyte 百亿亿字节 艾字节)=1024PB,
1ZB (Zettabyte 十万亿亿字节 泽字节)= 1024 EB,
1YB (Jottabyte 一亿亿亿字节 尧字节)= 1024 ZB,
1BB (Brontobyte 一千亿亿亿字节)= 1024 YB.
「山木兮」"""
if not isinstance(byte, (int, float)):
byte = len(byte)
DEI = f"{byte} 字节"
base = 1024
units = ["b",
"Kb",
"Mb",
"Gb",
"Tb",
"Pb",
"Eb"]
index = 0
while True:
if byte < 1024 or index 1 >= len(units):
break
byte /= base
index = 1
if index == 0:
return DEI
else:
if fine:
return "%0.1f%s(%s)" % (byte, units[index], DEI)
else:
return "%0.1f%s" % (byte, units[index])
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
app.setStyle("fusion")
gui = Interface()
server = Server(gui.Usernum_signal.emit)
sys.exit(app.exec_())
socket_queue.py
代码语言:javascript复制import socket
def ignore(function):
def func(*args, **kwargs):
try:
return function(*args, **kwargs)
except:
pass
return func
class SocketQueue:
split_text = "n" # 类变量, 默认分隔符为回车(n)
class QuitError(ConnectionError):
pass
def __init__(self, socket=socket.socket(), bufsize=1024, codec="utf8"):
self.socket, self.bufsize, self.codec = socket, bufsize, codec
self.waitKey = str()
self.ReadyQueue = []
self._closed = False
def __close__(self):
self.quit()
def __del__(self):
self.quit()
def isOpen(self) -> bool:
return not (self._closed and getattr(self.socket, "_closed", False))
def quitEvent(self) -> None:
pass
def quit(self) -> None:
if not self._closed:
self._closed = True
self.quitEvent()
self.socket.close()
def normal_text(self, string: str):
return string.encode(self.codec)
def __recv(self) -> (bytes, ConnectionError):
try:
data = self.socket.recv(self.bufsize).strip(b" ") # str.strip()不可用! 会将n省略
if data:
self.parse_data(self.handle(data))
except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
self.quit()
return self.QuitError
def __send(self, data: bytes) -> bool:
try:
self.socket.sendall(data)
return True
except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
self.quit()
return False
def send(self, data) -> bool:
if isinstance(data, str):
data = self.normal_text(data)
elif isinstance(data, (set, list, tuple)):
data = repr(data)
elif isinstance(data, (int, float)):
data = str(data).encode(self.codec)
elif isinstance(data, bytes):
pass
else:
data = bytes(data)
return self.__send(data self.split_text.encode(self.codec))
def input(self, text):
self.send(text)
return self.recv()
def forever_receive(self) -> (str, None):
while self.isOpen():
self.recv()
def handle(self, data: bytes):
return [d.strip() for d in data.decode(self.codec).split(self.split_text)]
@ignore
def parse_data(self, generator: (tuple, list, set)) -> None:
generator = list(generator)
if len(generator) == 1: # 列表为1, 表明无间隔符, 则在等待中添加.
self.waitKey = generator[0]
return
self.ReadyQueue.append(self.waitKey generator.pop(0))
self.waitKey = generator.pop()
self.ReadyQueue.extend(generator)
def recv(self) -> (str, Exception):
while True:
while not self.ReadyQueue:
self.__recv()
if not self.isOpen():
return self.QuitError
data = self.parse_argument(self.ReadyQueue.pop(0))
if isinstance(data, str) and data:
return data
def parse_argument(self, arg: str) -> str:
return arg.strip()
def recv_list(self) -> list:
queue = self.ReadyQueue[:]
self.ReadyQueue = []
return queue
@ignore
def connect(self, host: str, port: int):
assert 0 <= port <= (2 ** 16) - 1
self.socket.connect((host, port))
客户端
user.pyw
代码语言:javascript复制# -*- coding: utf-8 -*-
# Form implementation generated from reading ui files 'USER.ui', 'Connect.ui'
# Created by: PyQt5 UI code generator 5.15.4
# writer : Zmh
import asyncio
import logging
import os
import socket
import sys
from datetime import datetime
from threading import Thread
from time import sleep
from traceback import format_exc
from PyQt5 import QtCore, QtGui, QtWidgets
from ProgressBar import Animation
from socket_queue import SocketClient
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')
TIMEOUT = 2
base = 1024
segment = base * 10 # 防止切断
bufsize = base ** 2
delay = 0.04
new_file = 0
update_file = 1
request_file = 2
normal_text = 3
loop = asyncio.get_event_loop()
file_thread = Thread(target=loop.run_forever)
def get_host_ip() -> str:
"""get current IP address"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
Username = str()
dicts = {"127.0.0.1:429 (默认)": "127.0.0.1:429",
f"103.46.128.21:51203 (公网)": "103.46.128.21:51203",
f"{get_host_ip()}:429 (私网)": f"{get_host_ip()}:429",
"EXAM-41:429 (微机室)": "EXAM-41:429"}
def threading(Daemon, **kwargs):
thread = Thread(**kwargs)
thread.setDaemon(Daemon)
thread.start()
return thread
def to_logging(command):
def logs(*args, **kwargs):
try:
_result = command(*args, **kwargs)
if _result is None:
return True
return _result
except socket.timeout:
return
except (ConnectionResetError, ConnectionRefusedError, ConnectionAbortedError):
if "main" in globals():
main.ConnectionError_signal.emit()
return "CLOSE"
except:
if "main" in globals():
main.MessageUpdate_signal.emit(format_exc())
else:
logging.exception(str())
return False
return logs
@to_logging
def address_split(address):
def func(ip="", port=0) -> (str, int):
return ip, int(port)
ip, port = func(*address.strip().split(':')[:2])
return ip, port
class Socket(SocketClient):
def __init__(self, Function=lambda i: None, code='utf-8'):
super(Socket, self).__init__(code)
self.handler = message_handle(self.send)
def parse_argument(self, arg: str) -> str:
return self.handler.handle(arg.strip())
def recv(self):
result = super(Socket, self).recv()
if isinstance(result, str) and result:
self._traceback(f'{result} <font size=1>{covert(len(result))}</font>')
return self.isOpen()
def forever_receive(self) -> None:
self.handler.send_text(self.header)
while True:
if not self.recv():
return
def run(self): # 线程
threading(True, target=self.forever_receive)
def quitEvent(self):
self.__is_connect = False
if main.is_setup:
main.ConnectionError_signal.emit()
class login(QtWidgets.QDialog):
def __init__(self, parent=None):
super(login, self).__init__(parent)
self.Fadein = Animation(self)
self.setupUi()
self.setWindowIcon(QtGui.QIcon("images/login.ico"))
def destory_window(self):
if hasattr(self, "success"):
main.SetupUi()
self.close()
def closeEvent(self, event):
if not hasattr(self, "Fadeout"):
self.Fadeout = Animation(self, from_value=1, to_value=0, connect=self.destory_window)
event.ignore()
return
def setupUi(self):
self.setObjectName("Dialog")
self.resize(252, 339)
font = QtGui.QFont()
font.setFamily("Consolas")
self.setFont(font)
self.setStyleSheet("QWidgetn"
"{border:15px;n"
"margin:10px;n"
"background-color:rgb(255, 255, 255);n"
"}n"
"QWidget#right_widgetn"
"{border-radius:15;}n"
"n"
"")
self.gridLayout = QtWidgets.QGridLayout(self)
self.gridLayout.setObjectName("gridLayout")
self.label_3 = QtWidgets.QLabel(self)
font = QtGui.QFont()
font.setFamily("Adobe Arabic")
font.setPointSize(11)
self.label_3.setFont(font)
self.label_3.setStyleSheet("font-color:rgb(0, 0, 0)n"
"font-size:20px")
self.label_3.setObjectName("label_3")
self.gridLayout.addWidget(self.label_3, 2, 0, 1, 1)
self.label_2 = QtWidgets.QLabel(self)
self.label_2.setStyleSheet("color: rgb(99, 99, 99);")
self.label_2.setObjectName("label_2")
self.gridLayout.addWidget(self.label_2, 3, 0, 1, 3)
self.label = QtWidgets.QLabel(self)
font = QtGui.QFont()
font.setFamily("Consolas")
font.setPointSize(22)
font.setBold(False)
font.setWeight(50)
self.label.setFont(font)
self.label.setStyleSheet("color: rgb(0, 0, 0);")
self.label.setObjectName("label")
self.gridLayout.addWidget(self.label, 0, 0, 1, 3)
self.lineEdit = QtWidgets.QLineEdit(self)
self.lineEdit.setStyleSheet(" QLineEditn"
" {border:0px;n"
" border-radius:0;n"
" margin:10px;n"
" border-bottom: 2px solid #B3B3B3;n"
" font-family:'等线';n"
" font-size:20px;n"
" font-weight:bold;}n"
" QLineEdit:hover{n"
" border-bottom:3px solid #66A3FF;n"
" }n"
" QLineEdit:focus{n"
" border-bottom:3px solid #E680BDn"
" }")
self.lineEdit.setClearButtonEnabled(True)
self.lineEdit.setObjectName("lineEdit")
self.gridLayout.addWidget(self.lineEdit, 1, 0, 1, 3)
self.pushButton = QtWidgets.QPushButton(self)
self.pushButton.setEnabled(False)
self.pushButton.setAccessibleName("")
self.pushButton.setStyleSheet(" QPushButtonn"
" {text-align : center;n"
" background-color:qlineargradient(spread:pad, x1:0, y1:0, x2:1, y2:0, stop:0 #fbc2eb, stop:1 #a6c1ee);n"
" font: bold;n"
" border-color: grey;n"
" border-width: 2px;n"
" border-radius: 10px;n"
" padding: 6px;n"
" height: 28px;n"
" border-style: outset;n"
" margin-left:30px;n"
" margin-right:30px;n"
" font-family:'黑体';n"
" font : 18px;}n"
"n"
" QPushButton:pressedn"
" {text-align : center;n"
" background-color : light gray;n"
" background-color:qlineargradient(spread:pad, x1:0, y1:0, x2:1, y2:0, stop:0 #e1aad2, stop:1 #92adda);n"
" font: bold;n"
" color:lightblue;n"
" border-color: gray;n"
" border-width: 2px;n"
" border-radius: 10px;n"
" padding: 6px;n"
" height : 28px;n"
" border-style: outset;n"
" font-family:'黑体';n"
" font : 18px;}n"
" QPushButton:hover:!pressedn"
" {color:red;}")
self.pushButton.setObjectName("pushButton")
self.gridLayout.addWidget(self.pushButton, 4, 0, 1, 3)
self.comboBox = QtWidgets.QComboBox(self)
self.comboBox.addItems(dicts)
self.comboBox.setStyleSheet(" QComboBoxn"
" {border:0px;n"
" border-radius:0;n"
" margin:10px;n"
" border-bottom: 2px solid #B3B3B3;n"
" font-family:'等线';n"
" font-size:20px;}n"
" QComboBox:hover{n"
" border-bottom:3px solid #66A3FF;n"
" }n"
" QComboBox:focus{n"
" border-bottom:3px solid #E680BD;n"
" }n"
"")
self.comboBox.setEditable(True)
self.comboBox.setObjectName("comboBox")
self.gridLayout.addWidget(self.comboBox, 2, 1, 1, 2)
QtCore.QMetaObject.connectSlotsByName(self)
_translate = QtCore.QCoreApplication.translate
self.setWindowTitle(_translate("Dialog", "Login"))
self.label_3.setText(_translate("Dialog", "连接地址"))
self.label_2.setText(_translate("Dialog", "[!]未注册将会自动注册"))
self.label.setText(_translate("Dialog", "Zmh server"))
self.lineEdit.setPlaceholderText(_translate("Dialog", "用户名(3-15字符)"))
self.pushButton.setText(_translate("Dialog", " 登录"))
self.lineEdit.textChanged.connect(self.setEnableds)
self.pushButton.clicked.connect(self._connect)
self.show() # 有动画了!
def setEnableds(self, _):
if 3 <= len(self.lineEdit.text().strip()) <= 15:
self.pushButton.setEnabled(True)
@to_logging
def _connect(self, i):
texts = self.comboBox.currentText()
addr = address_split(dicts.get(texts, texts))
if addr is False:
QtWidgets.QMessageBox.warning(self, "Error", f"[IP地址:端口]格式不正确!")
user = self.lineEdit.text().strip()
self.pushButton.setEnabled(False)
QtWidgets.QApplication.processEvents()
s.change_address(*addr)
s.change_header(user)
s.set_failEvent(self.info)
if s.connect():
self.success = 1
self.close()
return
self.pushButton.setEnabled(True)
def info(self, text):
QtWidgets.QMessageBox.information(self, "Login", text)
def main_surface(self):
main.SetupUi()
self.destory_window()
class MainTalk(QtWidgets.QMainWindow):
ConnectionError_signal = QtCore.pyqtSignal()
MessageUpdate_signal = QtCore.pyqtSignal(str)
def __init__(self):
super(MainTalk, self).__init__()
self.ConnectionError_signal.connect(self.ConnectionError)
self.MessageUpdate_signal.connect(self.Show_Message)
self.is_setup = False
def SetupUi(self):
self.anim = Animation(self)
self.setObjectName("MainWindow")
self.resize(800, 619)
font = QtGui.QFont()
font.setFamily("Consolas")
self.setFont(font)
self.centralwidget = QtWidgets.QWidget(self)
self.centralwidget.setObjectName("centralwidget")
self.gridLayout = QtWidgets.QGridLayout(self.centralwidget)
self.gridLayout.setObjectName("gridLayout")
self.lineEdit_3 = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit_3.setReadOnly(True)
self.lineEdit_3.setObjectName("lineEdit_3")
self.gridLayout.addWidget(self.lineEdit_3, 7, 3, 1, 1)
self.pushButton = QtWidgets.QPushButton(self.centralwidget)
self.pushButton.setStyleSheet("background-color:rgb(44, 176, 13);n"
"color:rgb(255, 255, 255);n"
"font: 200 10pt "Consolas";")
self.pushButton.setObjectName("pushButton")
self.gridLayout.addWidget(self.pushButton, 8, 6, 1, 1)
spacerItem = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
self.gridLayout.addItem(spacerItem, 8, 5, 1, 1)
self.label_2 = QtWidgets.QLabel(self.centralwidget)
self.label_2.setObjectName("label_2")
self.gridLayout.addWidget(self.label_2, 6, 1, 1, 1)
spacerItem1 = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
self.gridLayout.addItem(spacerItem1, 8, 3, 1, 1)
self.lineEdit_2 = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit_2.setReadOnly(True)
self.lineEdit_2.setObjectName("lineEdit_2")
self.gridLayout.addWidget(self.lineEdit_2, 6, 3, 1, 1)
self.label = QtWidgets.QLabel(self.centralwidget)
self.label.setObjectName("label")
self.gridLayout.addWidget(self.label, 5, 1, 1, 1)
self.lineEdit = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit.setCursor(QtGui.QCursor(QtCore.Qt.IBeamCursor))
self.lineEdit.setDragEnabled(False)
self.lineEdit.setReadOnly(True)
self.lineEdit.setObjectName("lineEdit")
self.gridLayout.addWidget(self.lineEdit, 5, 3, 1, 1)
self.label_3 = QtWidgets.QLabel(self.centralwidget)
self.label_3.setObjectName("label_3")
self.gridLayout.addWidget(self.label_3, 7, 1, 1, 1)
self.line = QtWidgets.QFrame(self.centralwidget)
self.line.setFrameShape(QtWidgets.QFrame.VLine)
self.line.setFrameShadow(QtWidgets.QFrame.Sunken)
self.line.setObjectName("line")
self.gridLayout.addWidget(self.line, 5, 4, 3, 1)
self.textEdit = QtWidgets.QTextEdit(self.centralwidget)
self.textEdit.setObjectName("textEdit")
self.gridLayout.addWidget(self.textEdit, 5, 5, 3, 2)
spacerItem2 = QtWidgets.QSpacerItem(20, 40, QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Expanding)
self.gridLayout.addItem(spacerItem2, 1, 1, 1, 1)
self.textEdit_2 = QtWidgets.QTextBrowser(self.centralwidget)
self.textEdit_2.setObjectName("textEdit_2")
self.textEdit_2.setReadOnly(True)
self.gridLayout.addWidget(self.textEdit_2, 0, 3, 2, 4)
self.pushButton_2 = QtWidgets.QPushButton(self.centralwidget)
self.pushButton_2.setObjectName("pushButton_2")
#
self.sendButton = QtWidgets.QPushButton(QtGui.QIcon("images/upload.png"), "上传文件", self.centralwidget)
self.sendButton.setObjectName("send - pushButton")
self.gridLayout.addWidget(self.sendButton, 8, 5)
#
self.gridLayout.addWidget(self.pushButton_2, 8, 1, 1, 1)
self.setCentralWidget(self.centralwidget)
self.menubar = QtWidgets.QMenuBar(self)
self.menubar.setGeometry(QtCore.QRect(0, 0, 800, 24))
self.menubar.setObjectName("menubar")
self.menu = QtWidgets.QMenu(self.menubar)
self.menu.setObjectName("menu")
self.menulanguage = QtWidgets.QMenu(self.menu)
self.menulanguage.setObjectName("menulanguage")
self.setMenuBar(self.menubar)
self.statusbar = QtWidgets.QStatusBar(self)
self.statusbar.setObjectName("statusbar")
self.setStatusBar(self.statusbar)
self.actionsocket_connet = QtWidgets.QAction(self)
self.actionsocket_connet.setObjectName("actionsocket_connet")
self.actionChinese = QtWidgets.QAction(self)
self.actionChinese.setObjectName("actionChinese")
self.actionip_socket_gethostbyname_socket_gethostname = QtWidgets.QAction(self)
self.actionip_socket_gethostbyname_socket_gethostname.setObjectName(
"actionip_socket_gethostbyname_socket_gethostname")
self.menulanguage.addSeparator()
self.menulanguage.addAction(self.actionChinese)
self.menu.addSeparator()
self.menu.addAction(self.menulanguage.menuAction())
self.menu.addAction(self.actionip_socket_gethostbyname_socket_gethostname)
self.menubar.addAction(self.menu.menuAction())
self.socket_peername = s.addr[0]
self.retranslateUi()
QtCore.QMetaObject.connectSlotsByName(self)
self.pushButton.clicked.connect(self.send("MSG"))
self.pushButton_2.clicked.connect(self.re_connect)
self.textEdit.textChanged.connect(self.tc)
self.textEdit_2.setOpenLinks(False)
self.textEdit_2.setOpenExternalLinks(False)
self.textEdit_2.anchorClicked.connect(self.anchor)
self.sendButton.clicked.connect(self.send("FILE"))
self.connectEnabled(True)
self.is_setup = True
self.show()
def anchor(self, res: QtCore.QUrl):
index = res.toString()
s.handler.request_file(index)
@to_logging
def sendfile(self):
for file in QtWidgets.QFileDialog.getOpenFileNames(self, "上传文件")[0]:
if os.path.isfile(file):
s.handler.send(file)
@to_logging
def sendmsg(self):
s.handler.send_text(self.textEdit.toPlainText().strip())
self.textEdit.clear()
def tc(self, _=0):
if 0 < len(self.textEdit.toPlainText().strip()) <= 1000:
self.pushButton.setEnabled(True)
else:
self.pushButton.setEnabled(False)
self.pushButton.setText(
QtCore.QCoreApplication.translate("MainWindow", f"发送({len(self.textEdit.toPlainText().strip())} / 1000)"))
def send(self, type):
_call = {"MSG": self.sendmsg, "FILE": self.sendfile}.get(type, lambda: True)
@to_logging
def function(_):
if s.is_connect():
if not _call():
self.ConnectionError_signal.emit()
else:
self.MessageUpdate_signal.emit("<font color='red'>发送异常. 未连接至服务器.请点击[重新连接服务器]按钮尝试重新连接.</font>")
return function
def ConnectionError(self):
QtWidgets.QMessageBox.information(self, 'TraceBack', f'Socket Server<{self.socket_peername}> 断开连接')
self.connectEnabled(False)
def retranslateUi(self):
_translate = QtCore.QCoreApplication.translate
self.setWindowTitle(_translate("MainWindow", "Socket"))
self.lineEdit_2.setText(socket.gethostname())
self.lineEdit.setText(socket.gethostbyname(socket.gethostname()))
self.lineEdit_3.setText(self.socket_peername)
self.tc()
self.label_2.setText(_translate("MainWindow", "主机名:"))
self.label.setText(_translate("MainWindow", "本地端口:"))
self.label_3.setText(_translate("MainWindow", "连接端口:"))
self.pushButton_2.setText(_translate("MainWindow", "重新连接服务器"))
self.menu.setTitle(_translate("MainWindow", "设置"))
self.menulanguage.setTitle(_translate("MainWindow", "language"))
self.actionsocket_connet.setText(_translate("MainWindow", "socket connect"))
self.actionChinese.setText(_translate("MainWindow", "Chinese"))
self.MessageUpdate_signal.emit(f'<font color="red">欢迎来到服务器[{s.socket.getpeername()[0]}].您的ip地址为{s.addr[0]}')
self.actionip_socket_gethostbyname_socket_gethostname.setText(
_translate("MainWindow", "ip: " socket.gethostbyname(socket.gethostname())))
s.set_failEvent(self.MessageUpdate_signal.emit)
s.run()
@to_logging
def re_connect(self, _):
self.MessageUpdate_signal.emit(
"[{}]: 尝试连接服务器[{}],最大超时报错 {}s".format(datetime.now().strftime('%Y %m %d %H:%M:%S'), s.addr[0], TIMEOUT))
QtWidgets.QApplication.processEvents()
s.set_failEvent(self.MessageUpdate_signal.emit)
status = s.connect()
self.connectEnabled(status)
if status:
s.run()
def connectEnabled(self, status):
self.pushButton_2.setEnabled(not status)
def Show_Message(self, data: str) -> None:
# self.MessageUpdate_signal -> [signal]; self.MessageUpdate_signal.emit(self, *args, **kwargs). #
if data:
for i in data.split('n'):
if i:
sleep(0.06 / len(data.split('n'))) # 防止信息过快使Textedit刷新空白
self.textEdit_2.append(i)
self.textEdit_2.moveCursor(QtGui.QTextCursor.End)
def save_bytes(file, byte: bytes):
with open(file, "wb") as f:
f.write(byte)
def get_eval(str, defined=None):
try:
res = eval(str)
if isinstance(res, type(defined)):
return res
raise TypeError
except:
return defined
class SEND:
def __init__(self, index, name, fbytes: bytes, conn: callable, encode='utf8', localfile=""):
self.segs = self.cut(fbytes)
self.encode = encode
self.total = len(self.segs)
self.index = index
self.conn = conn
self.finish = False
self.name = name
self.size = len(fbytes)
asyncio.run_coroutine_threadsafe(self.update(), loop)
self.fd_index = len(file_dialog.filedict) # pyqtSignal无返回值
file_dialog.new_file.emit([self.name, self.total, self.size, (file_dialog.UPLOAD, localfile)])
async def update(self):
self.conn(self.header().encode(self.encode))
for n, data in enumerate(self.segs):
file_dialog.update_file.emit(self.fd_index)
self.conn(self.format(n, data).encode(self.encode))
await asyncio.sleep(delay)
self.finish = True
def cut(self, byte: bytes, seg=segment) -> list:
return [byte[x:x seg] for x in range(0, len(byte), seg)]
def format(self, process, data) -> str:
return repr((update_file, (self.index, process, data)))
def header(self) -> str:
return repr((new_file, (self.index,
self.name,
self.total,
self.size)
))
class RECV:
def __init__(self, index: int, name: str, total: int, size: int):
self.index, self.name, self.total, self.size = index, name, total, size
self.progress = -1
self.file = []
self.finish = False
self.fd_index = len(file_dialog.filedict) # pyqtSignal无返回值
file_dialog.new_file.emit(
[self.name, self.total, self.size, (file_dialog.DOWNLOAD, os.path.join(save_path, self.name))])
def update(self, p, data):
if isinstance(p, int) and p - 1 == self.progress:
self.progress = p
self.file.append(data)
file_dialog.update_file.emit(self.fd_index)
if len(self.file) == self.total:
self.finish = True
return True
def mix(self):
return b''.join(self.file)
def save(self, path, ):
if self.finish:
b = self.mix()
save_bytes(os.path.join(path, self.name), b)
self.__save = os.path.join(path, self.name)
del self.file # 释放空间
return True
return False
def savepath(self) -> (str, bool):
if self.finish:
return self.__save
return False
def start(self):
if self.finish:
os.startfile(self.savepath())
class send_files:
def __init__(self, encode='utf8'):
self.sends = []
self.encode = encode
def new_sends(self, name, file, conn, locfile=""):
if type(file) == str:
file = file.encode(self.encode)
self.sends.append(SEND(len(self.sends), name, file, conn,
localfile=locfile)) # index: len(self.sends)-1 1 => len(self.sends)
def localfile(self, file, conn):
if os.path.isfile(file):
_, name = os.path.split(file)
with open(file, "rb") as f:
file = f.read()
self.new_sends(name, file, conn, name)
class recv_files:
def __init__(self, decode='utf8', path=None):
self.recvs = []
self.decode = decode
if path is None:
path = sys.path[0]
self.path = path
def new_files(self, index, name, total, size):
self.recvs.append(RECV(index, name, total, size))
def get_index(self,
index: int) -> RECV: # 致命bug, 如若用self.recvs[index] 排序方法不一, 用户端不可能从第一个文件开始请求, 然后排着序请求1,2,3,4,5,6... 一个不能少, 所以用以下方法
for x in self.recvs:
if x.index == index:
return x
def in_list(self, index) -> bool:
return bool(self.get_index(index))
def update(self, index, progess, data) -> bool:
if self.in_list(index):
if self.get_index(index).update(progess, data):
return self.save(index)
def save(self, index):
if self.in_list(index):
return self.get_index(index).save(self.path)
class message_handle:
codec = "utf8"
def __init__(self, func: callable = lambda _: True):
self.Sender = send_files(self.codec, )
self.Receiver = recv_files(self.codec, save_path)
self.func = func
self.Progress = []
def handle(self, data):
_res = get_eval(data, (None,))
if len(_res) == 2:
type, arguments = _res
if type == new_file:
self.Receiver.new_files(*arguments)
elif type == update_file:
self.Receiver.update(*arguments)
elif type == request_file:
path = self.Receiver.get_index(arguments).savepath()
if path:
self.Sender.localfile(path, self.func) # 如若无, 报错False
elif type == normal_text:
return arguments
def send(self, sendpath):
return self.Sender.localfile(sendpath, self.func)
def send_text(self, mes: str):
return self.func(repr((normal_text, mes)).encode(self.codec))
def request_file(self, name):
index = get_eval(name, 0)
if self.in_list(index):
self.get_index(index).start()
return
self.func(repr((request_file, index)).encode(self.codec))
def get_index(self, index) -> RECV:
return self.Receiver.get_index(index)
def in_list(self, index):
return self.Receiver.in_list(index)
def covert(byte, fine=False):
"""
位 bit (比特)(Binary Digits):存放一位二进制数,即 0 或 1,最小的存储单位。
字节 byte:8个二进制位为一个字节(B),最常用的单位。
其中1024=2^10 ( 2 的10次方),
1KB (Kilobyte 千字节)=1024B,
1MB (Megabyte 兆字节 简称“兆”)=1024KB,
1GB (Gigabyte 吉字节 又称“千兆”)=1024MB,
1TB (Trillionbyte 万亿字节 太字节)=1024GB,
1PB(Petabyte 千万亿字节 拍字节)=1024TB,
1EB(Exabyte 百亿亿字节 艾字节)=1024PB,
1ZB (Zettabyte 十万亿亿字节 泽字节)= 1024 EB,
1YB (Jottabyte 一亿亿亿字节 尧字节)= 1024 ZB,
1BB (Brontobyte 一千亿亿亿字节)= 1024 YB.
「山木兮」"""
if not isinstance(byte, (int, float)):
byte = len(byte)
DEI = f"{byte} 字节"
base = 1024
units = ["b",
"Kb",
"Mb",
"Gb",
"Tb",
"Pb",
"Eb"]
index = 0
while True:
if byte < 1024 or index 1 >= len(units):
break
byte /= base
index = 1
if index == 0:
return DEI
else:
if fine:
return "%0.1f%s(%s)" % (byte, units[index], DEI)
else:
return "%0.1f%s" % (byte, units[index])
class ListWidgetLayout(QtWidgets.QWidget):
def __init__(self, name, total, parent=None):
super(ListWidgetLayout, self).__init__(parent)
hbox = QtWidgets.QHBoxLayout(self)
hbox.addWidget(QtWidgets.QLabel(name, self))
progress = QtWidgets.QProgressBar()
progress.setMaximum(total)
progress.setStyleSheet("QProgressBar{n"
"text-align: center;n"
"}")
hbox.addWidget(progress)
progress.setTextVisible(True)
progress.setRange(0, 0)
self.total = total
self.index = 0
## 最大值和最小值如果都是0, 则进入繁忙提示
# https://blog.csdn.net/weixin_43496130/article/details/104238051?ops_request_misc=%7B%22request%5Fid%22%3A%22165251738516782395331630%22%2C%22scm%22%3A%2220140713.130102334..%22%7D&request_id=165251738516782395331630&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-2-104238051-null-null.142^v9^pc_search_result_control_group,157^v4^control&utm_term=pyqt5 qprogressbar&spm=1018.2226.3001.4187
self.progress = progress
self.setLayout(hbox)
font = QtGui.QFont()
font.setFamily("Consolas")
self.setFont(font)
progress.setFont(font)
def get_progress(self) -> QtWidgets.QProgressBar:
return self.progress
def is_finished(self) -> bool:
return bool(self.index >= self.total)
def update(self) -> None:
if self.index == 0:
self.progress.setMaximum(self.total)
if self.is_finished():
return
self.index = 1
self.progress.setValue(self.index)
def get_total(self) -> int:
return self.total
def get_index(self) -> int:
return self.index
class FileDialog(QtWidgets.QDialog):
update_file = QtCore.pyqtSignal(int)
new_file = QtCore.pyqtSignal(list)
def __init__(self, save_path, parent=None):
self.current = ""
super(FileDialog, self).__init__(parent)
self.icon = QtGui.QIcon("images/file.png")
self.download = QtGui.QIcon("images/download.png")
self.upload = QtGui.QIcon("images/upload.png")
self.DOWNLOAD = 0
self.UPLOAD = 1
self.LOAD_dict = {self.DOWNLOAD: self.download,
self.UPLOAD: self.upload}
self.setWindowFlags(QtCore.Qt.WindowMinimizeButtonHint) # 只显示最小化按钮
self.save_path = save_path
self.filedict = {}
self.pathdict = {}
self.namedict = {}
self.update_file.connect(self.fileUpdate)
self.new_file.connect(self.newFile)
self.setObjectName("Dialog")
self.resize(666, 421)
font = QtGui.QFont()
font.setFamily("Consolas")
self.setFont(font)
self.setWindowIcon(self.icon)
self.gridLayout = QtWidgets.QGridLayout(self)
self.gridLayout.setObjectName("gridLayout")
self.label_4 = QtWidgets.QLabel(self)
self.label_4.setObjectName("label_4")
self.gridLayout.addWidget(self.label_4, 1, 2, 1, 1)
self.listWidget_2 = QtWidgets.QListWidget(self)
self.listWidget_2.setObjectName("listWidget_2")
self.gridLayout.addWidget(self.listWidget_2, 0, 0, 4, 1)
self.groupBox = QtWidgets.QGroupBox(self)
self.groupBox.setObjectName("groupBox")
self.gridLayout_2 = QtWidgets.QGridLayout(self.groupBox)
self.gridLayout_2.setObjectName("gridLayout_2")
self.label = QtWidgets.QLabel(self.groupBox)
self.label.setObjectName("label")
self.gridLayout_2.addWidget(self.label, 1, 0, 1, 2)
self.progressBar = QtWidgets.QProgressBar(self.groupBox)
font = QtGui.QFont()
font.setFamily("Consolas")
self.progressBar.setFont(font)
self.progressBar.setProperty("value", 0)
self.progressBar.setTextVisible(True)
self.progressBar.setObjectName("progressBar")
self.gridLayout_2.addWidget(self.progressBar, 0, 0, 1, 2)
self.label_3 = QtWidgets.QLabel(self.groupBox)
self.label_3.setObjectName("label_3")
self.gridLayout_2.addWidget(self.label_3, 2, 0, 1, 2)
self.pushButton = QtWidgets.QPushButton(self.groupBox)
self.pushButton.setObjectName("pushButton")
self.pushButton.setEnabled(False)
self.gridLayout_2.addWidget(self.pushButton, 4, 0, 1, 1)
self.label_2 = QtWidgets.QLabel(self.groupBox)
self.label_2.setObjectName("label_2")
self.gridLayout_2.addWidget(self.label_2, 3, 0, 1, 2)
self.pushButton_2 = QtWidgets.QPushButton(self.groupBox)
self.pushButton_2.setObjectName("pushButton_2")
self.gridLayout_2.addWidget(self.pushButton_2, 4, 1, 1, 1)
self.gridLayout.addWidget(self.groupBox, 0, 2, 1, 1)
self.lineEdit = QtWidgets.QLineEdit(self)
self.lineEdit.setReadOnly(True)
self.lineEdit.setObjectName("lineEdit")
self.gridLayout.addWidget(self.lineEdit, 2, 2, 1, 1)
self.line = QtWidgets.QFrame(self)
self.line.setFrameShape(QtWidgets.QFrame.VLine)
self.line.setFrameShadow(QtWidgets.QFrame.Sunken)
self.line.setObjectName("line")
self.gridLayout.addWidget(self.line, 0, 1, 4, 1)
spacerItem = QtWidgets.QSpacerItem(20, 40, QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Expanding)
self.gridLayout.addItem(spacerItem, 3, 2, 1, 1)
self.retranslateUi()
QtCore.QMetaObject.connectSlotsByName(self)
def retranslateUi(self):
_translate = QtCore.QCoreApplication.translate
self.setWindowTitle(_translate("Dialog", "Files"))
self.pushButton.setText(_translate("Dialog", "打开"))
self.pushButton_2.setText(_translate("Dialog", "打开文件夹"))
self.pushButton.clicked.connect(self.startfile)
self.pushButton_2.clicked.connect(self.startpath)
self.label_4.setText(_translate("Dialog", f"下载位置:"))
self.lineEdit.setText(_translate("Dialog", self.save_path))
self.listWidget_2.itemClicked.connect(self.fileChanged)
def newFile(self, args: list):
self.activateWindow() # 窗口置顶
index = len(self.filedict)
name, total, size, (type, path) = args
png = self.LOAD_dict.get(type, self.UPLOAD)
layout = ListWidgetLayout(name, total)
progress = layout.get_progress()
self.filedict[index] = (layout, size)
self.pathdict[index] = path
Item = QtWidgets.QListWidgetItem(png, "", self.listWidget_2)
Item.index_name = index # <-
Item.setSizeHint(QtCore.QSize(200, 40))
self.listWidget_2.addItem(Item)
self.listWidget_2.setItemWidget(Item, layout)
self.namedict[index] = name
# if not self.current:
# self.changeCurrent(0)
self.changeCurrent(index)
def changeCurrent(self, index: int):
self.current = index
self.changeGroupBox()
self.show()
def fileChanged(self, widget: QtWidgets.QListWidgetItem):
self.current = widget.index_name
self.changeGroupBox()
def changeGroupBox(self):
name = self.namedict[self.current]
layout, size = self.filedict[self.current]
layout: ListWidgetLayout
size = covert(size, True)
_translate = QtCore.QCoreApplication.translate
self.groupBox.setTitle(_translate("Dialog", f"{name}"))
self.label.setText(_translate("Dialog", f"文件大小: {size}"))
self.label_3.setText(_translate("Dialog", f"文件名: {name}"))
self.label_2.setText(_translate("Dialog", f"位置: {os.path.join(self.save_path, name)}"))
self.progressBar.setMaximum(layout.get_total())
self.progressBar.setValue(layout.get_index())
self.pushButton.setEnabled(layout.is_finished())
self.fpath = self.pathdict[self.current]
self.path = os.path.dirname(self.fpath)
def fileUpdate(self, index: int):
layout, size = self.filedict[index]
layout: ListWidgetLayout
layout.update()
if index == self.current:
self.changeGroupBox()
def startfile(self, _):
if hasattr(self, "fpath") and os.path.isfile(self.fpath):
os.startfile(self.fpath)
def startpath(self, _):
if hasattr(self, "path") and os.path.isdir(self.path):
os.startfile(self.path)
save_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "resource")
if not os.path.isdir(save_path):
os.makedirs(save_path)
if __name__ == "__main__":
file_thread.start()
s = Socket()
app = QtWidgets.QApplication(sys.argv)
conn = login()
main = MainTalk()
file_dialog = FileDialog(save_path)
sys.exit(app.exec_())
socket_queue.py
代码语言:javascript复制import socket
from datetime import datetime
get_time = lambda: datetime.now().strftime('%H:%M:%S')
TIMEOUT = 2
def ignore(function):
def func(*args, **kwargs):
try:
return function(*args, **kwargs)
except:
pass
return func
class SocketQueue:
split_text = "n" # 类变量, 默认分隔符为回车(n)
class QuitError(ConnectionError):
pass
def __init__(self, sock=socket.socket(), bufsize=1024, codec="utf8"):
self.socket, self.bufsize, self.codec = sock, bufsize, codec
self.waitKey = str()
self.ReadyQueue = []
self._closed = False
def re_setup(self):
self.waitKey = str()
self.ReadyQueue = []
self._closed = False
def __close__(self):
self.quit()
def __del__(self):
self.quit()
def isOpen(self) -> bool:
return not (self._closed and getattr(self.socket, "_closed", False))
def quitEvent(self) -> None:
pass
def quit(self) -> None:
if not self._closed:
self._closed = True
self.quitEvent()
self.socket.close()
def normal_text(self, string: str):
return string.encode(self.codec)
def __recv(self) -> (bytes, ConnectionError):
try:
data = self.socket.recv(self.bufsize).strip(b" ") # str.strip()不可用! 会将n省略
if data:
self.parse_data(self.handle(data))
except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
if isinstance(e, socket.timeout):
return
self.quit()
return self.QuitError
def __send(self, data: bytes) -> bool:
try:
self.socket.sendall(data)
return True
except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
self.quit()
return False
def send(self, data) -> bool:
if isinstance(data, str):
data = self.normal_text(data)
elif isinstance(data, (set, list, tuple)):
data = repr(data).encode(self.codec)
elif isinstance(data, (int, float)):
data = str(data).encode(self.codec)
elif isinstance(data, bytes):
pass
else:
data = bytes(data)
return self.__send(data self.split_text.encode(self.codec))
def input(self, text):
self.send(text)
return self.recv()
def forever_receive(self) -> (str, None):
while self.isOpen():
self.recv()
def handle(self, data: bytes):
return [d.strip() for d in data.decode(self.codec).split(self.split_text)]
@ignore
def parse_data(self, generator: (tuple, list, set)) -> None:
generator = list(generator)
if len(generator) == 1: # 列表为1, 表明无间隔符, 则在等待中添加.
self.waitKey = generator[0]
return
self.ReadyQueue.append(self.waitKey generator.pop(0))
self.waitKey = generator.pop()
self.ReadyQueue.extend(generator)
def recv(self) -> (str, Exception):
while True:
while (not self.ReadyQueue) and self.isOpen():
self.__recv()
if not self.isOpen():
return self.QuitError
data = self.parse_argument(self.ReadyQueue.pop(0))
if isinstance(data, str) and data:
return data
def parse_argument(self, arg: str) -> str:
return arg.strip()
def recv_list(self) -> list:
queue = self.ReadyQueue[:]
self.ReadyQueue = []
return queue
@ignore
def connect(self, host: str, port: int):
assert 0 <= port <= (2 ** 16) - 1
self.socket.connect((host, port))
def err_connect(sock, addr: tuple) -> (str, bool):
try:
sock.connect(addr)
except socket.gaierror:
return f"获取地址信息失败.请确保{addr[0]}是有效地址或ipv4/ipv6"
except socket.timeout:
return f"连接超时({TIMEOUT}s).服务器[{addr[0]}:{addr[1]}]连接失败."
except OverflowError:
return f"输入的端口号为{addr[1]},端口号必须在0~65535间."
except (ConnectionError, ConnectionResetError, ConnectionRefusedError, ConnectionAbortedError):
return "请求拒绝."
except OSError as e:
if int(addr[1]) == 0:
return "[WinError 10049] 在其上下文中,该请求的地址无效"
return str(e.args[1]).rstrip('。')
except TypeError:
return f"网络地址格式错误: 格式[ip:port] port必须为整型变量0~65535间."
except:
return "连接错误"
else:
return True
class SocketClient(SocketQueue):
addr = "localhost", 429
header = ""
def __init__(self, addr, *args, **kwargs):
super(SocketClient, self).__init__(*args, **kwargs)
self.socket.settimeout(TIMEOUT)
self.connected = False
self.__is_connect = False
self._traceback = lambda *_: None
def is_connect(self) -> bool:
return self.__is_connect
def change_address(self, host: str, port: int):
self.addr = host, port
def change_header(self, header: str):
self.header = header
def set_failEvent(self, function):
self._traceback = function
def failEvent(self, reason):
self._traceback(f"[{get_time()}]: {reason}")
def connect(self):
if self.connected:
self.socket = socket.socket()
self.socket.settimeout(TIMEOUT)
self.re_setup()
self.connected = True
_res = err_connect(self.socket, self.addr)
if _res is True:
self.__is_connect = True
else:
self.failEvent(_res)
self.__is_connect = False
return self.__is_connect
def quitEvent(self) -> None:
self.__is_connect = False
file.py
代码语言:javascript复制# -*- coding: utf-8 -*-
# Form implementation generated from reading ui file 'file.ui'
#
# Created by: PyQt5 UI code generator 5.15.4
#
# WARNING: Any manual changes made to this file will be lost when pyuic5 is
# run again. Do not edit this file unless you know what you are doing.
import sys
from PyQt5 import QtCore, QtGui, QtWidgets
import os
def covert(byte, fine=False):
"""
位 bit (比特)(Binary Digits):存放一位二进制数,即 0 或 1,最小的存储单位。
字节 byte:8个二进制位为一个字节(B),最常用的单位。
其中1024=2^10 ( 2 的10次方),
1KB (Kilobyte 千字节)=1024B,
1MB (Megabyte 兆字节 简称“兆”)=1024KB,
1GB (Gigabyte 吉字节 又称“千兆”)=1024MB,
1TB (Trillionbyte 万亿字节 太字节)=1024GB,
1PB(Petabyte 千万亿字节 拍字节)=1024TB,
1EB(Exabyte 百亿亿字节 艾字节)=1024PB,
1ZB (Zettabyte 十万亿亿字节 泽字节)= 1024 EB,
1YB (Jottabyte 一亿亿亿字节 尧字节)= 1024 ZB,
1BB (Brontobyte 一千亿亿亿字节)= 1024 YB.
「山木兮」"""
if not isinstance(byte, (int, float)):
byte = len(byte)
DEI = f"{byte} 字节"
base = 1024
units = ["b",
"Kb",
"Mb",
"Gb",
"Tb",
"Pb",
"Eb"]
index = 0
while True :
if byte < 1024 or index 1 >= len(units):
break
byte /= base;index = 1
if index == 0:
return DEI
else:
if fine:
return "%0.1f%s(%s)" % (byte, units[index], DEI)
else:
return "%0.1f%s" % (byte, units[index])
class ListWidgetLayout(QtWidgets.QWidget):
def __init__(self, name, total, parent=None):
super(ListWidgetLayout, self).__init__(parent)
hbox = QtWidgets.QHBoxLayout(self)
hbox.addWidget(QtWidgets.QLabel(name, self))
progress = QtWidgets.QProgressBar()
progress.setMaximum(total)
progress.setStyleSheet("QProgressBar{n"
"text-align: center;n"
"}")
hbox.addWidget(progress)
progress.setTextVisible(True)
progress.setRange(0 , 0)
self.total = total
self.index = 0
## 最大值和最小值如果都是0, 则进入繁忙提示
#https://blog.csdn.net/weixin_43496130/article/details/104238051?ops_request_misc=%7B%22request%5Fid%22%3A%22165251738516782395331630%22%2C%22scm%22%3A%2220140713.130102334..%22%7D&request_id=165251738516782395331630&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-2-104238051-null-null.142^v9^pc_search_result_control_group,157^v4^control&utm_term=pyqt5 qprogressbar&spm=1018.2226.3001.4187
self.progress = progress
self.setLayout(hbox)
font = QtGui.QFont()
font.setFamily("Consolas")
self.setFont(font)
progress.setFont(font)
def get_progress(self) -> QtWidgets.QProgressBar:
return self.progress
def is_finished(self) -> bool:
return bool(self.index >= self.total)
def update(self) -> None:
if self.index == 0:
self.progress.setMaximum(self.total)
if self.is_finished():
return
self.index = 1
self.progress.setValue(self.index)
def get_total(self) -> int:
return self.total
def get_index(self) -> int:
return self.index
class FileDialog(QtWidgets.QDialog):
update_file = QtCore.pyqtSignal(str)
new_file = QtCore.pyqtSignal(list)
def __init__(self,save_path, parent=None):
self.current = ""
super(FileDialog, self).__init__(parent)
self.setWindowFlags(QtCore.Qt.WindowMinimizeButtonHint) #只显示最小化按钮
self.save_path = save_path
self.filedict = {}
self.namedict = {}
self.update_file.connect(self.fileUpdate)
self.new_file.connect(self.newFile)
self.setObjectName("Dialog")
self.resize(666, 421)
font = QtGui.QFont()
font.setFamily("Consolas")
self.setFont(font)
icon = QtGui.QIcon.fromTheme("fusion")
self.setWindowIcon(icon)
self.gridLayout = QtWidgets.QGridLayout(self)
self.gridLayout.setObjectName("gridLayout")
self.label_4 = QtWidgets.QLabel(self)
self.label_4.setObjectName("label_4")
self.gridLayout.addWidget(self.label_4, 1, 2, 1, 1)
self.listWidget_2 = QtWidgets.QListWidget(self)
self.listWidget_2.setObjectName("listWidget_2")
self.gridLayout.addWidget(self.listWidget_2, 0, 0, 4, 1)
self.groupBox = QtWidgets.QGroupBox(self)
self.groupBox.setObjectName("groupBox")
self.gridLayout_2 = QtWidgets.QGridLayout(self.groupBox)
self.gridLayout_2.setObjectName("gridLayout_2")
self.label = QtWidgets.QLabel(self.groupBox)
self.label.setObjectName("label")
self.gridLayout_2.addWidget(self.label, 1, 0, 1, 2)
self.progressBar = QtWidgets.QProgressBar(self.groupBox)
font = QtGui.QFont()
font.setFamily("Consolas")
self.progressBar.setFont(font)
self.progressBar.setProperty("value", 0)
self.progressBar.setTextVisible(True)
self.progressBar.setObjectName("progressBar")
self.gridLayout_2.addWidget(self.progressBar, 0, 0, 1, 2)
self.label_3 = QtWidgets.QLabel(self.groupBox)
self.label_3.setObjectName("label_3")
self.gridLayout_2.addWidget(self.label_3, 2, 0, 1, 2)
self.pushButton = QtWidgets.QPushButton(self.groupBox)
self.pushButton.setObjectName("pushButton")
self.pushButton.setEnabled(False)
self.gridLayout_2.addWidget(self.pushButton, 4, 0, 1, 1)
self.label_2 = QtWidgets.QLabel(self.groupBox)
self.label_2.setObjectName("label_2")
self.gridLayout_2.addWidget(self.label_2, 3, 0, 1, 2)
self.pushButton_2 = QtWidgets.QPushButton(self.groupBox)
self.pushButton_2.setObjectName("pushButton_2")
self.gridLayout_2.addWidget(self.pushButton_2, 4, 1, 1, 1)
self.gridLayout.addWidget(self.groupBox, 0, 2, 1, 1)
self.lineEdit = QtWidgets.QLineEdit(self)
self.lineEdit.setReadOnly(True)
self.lineEdit.setObjectName("lineEdit")
self.gridLayout.addWidget(self.lineEdit, 2, 2, 1, 1)
self.line = QtWidgets.QFrame(self)
self.line.setFrameShape(QtWidgets.QFrame.VLine)
self.line.setFrameShadow(QtWidgets.QFrame.Sunken)
self.line.setObjectName("line")
self.gridLayout.addWidget(self.line, 0, 1, 4, 1)
spacerItem = QtWidgets.QSpacerItem(20, 40, QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Expanding)
self.gridLayout.addItem(spacerItem, 3, 2, 1, 1)
self.retranslateUi()
QtCore.QMetaObject.connectSlotsByName(self)
def retranslateUi(self):
_translate = QtCore.QCoreApplication.translate
self.setWindowTitle(_translate("Dialog", "Files"))
self.pushButton.setText(_translate("Dialog", "打开"))
self.pushButton_2.setText(_translate("Dialog", "打开文件夹"))
self.pushButton.clicked.connect(self.startfile)
self.pushButton_2.clicked.connect(self.startpath)
self.label_4.setText(_translate("Dialog", f"下载位置:"))
self.lineEdit.setText(_translate("Dialog", self.save_path))
self.listWidget_2.itemClicked.connect(self.fileChanged)
def newFile(self, args:list):
index = len(self.filedict)
name, total, size = args
layout = ListWidgetLayout(name, total)
progress = layout.get_progress()
self.filedict[index] = (layout, size)
Item = QtWidgets.QListWidgetItem(QtGui.QIcon("file.png"),"",self.listWidget_2)
Item.myname = index # <-
Item.setSizeHint(QtCore.QSize(200, 40))
self.listWidget_2.addItem(Item)
self.listWidget_2.setItemWidget(Item, layout)
self.namedict[index] = name
if not self.current:
self.current = 0
self.changeGroupBox()
self.show()
def fileChanged(self, widget:QtWidgets.QListWidgetItem):
self.current = widget.myname
self.changeGroupBox()
def changeGroupBox(self):
name = self.namedict[self.current]
layout, size = self.filedict[self.current]
layout: ListWidgetLayout
size = covert(size, True)
_translate = QtCore.QCoreApplication.translate
self.groupBox.setTitle(_translate("Dialog", f"{name}"))
self.label.setText(_translate("Dialog", f"文件大小: {size}"))
self.label_3.setText(_translate("Dialog", f"文件名: {name}"))
self.label_2.setText(_translate("Dialog", f"位置: {os.path.join(self.save_path, name)}"))
self.progressBar.setMaximum(layout.get_total())
self.progressBar.setValue(layout.get_index())
self.pushButton.setEnabled(layout.is_finished())
self.fpath = os.path.join(self.save_path, name)
def fileUpdate(self, index:int):
print(index)
layout,size = self.filedict[index]
layout: ListWidgetLayout
layout.update()
if index == self.current:
self.changeGroupBox()
def startfile(self, _):
if hasattr(self, "fpath") and os.path.isfile(self.fpath):
os.startfile(self.fpath)
def startpath(self, _):
if os.path.isdir(self.save_path):
os.startfile(self.save_path)
if __name__ == "__main__":
app = QtWidgets.QApplication([])
a = FileDialog("")
a.new_file.emit(["asdf",10,100])
sys.exit(app.exec_())
ProgressBar.py
代码语言:javascript复制import logging
import sys
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
import math
from threading import Thread
font_name = "Share-TechMono"
def threading(Daemon, **kwargs):
thread = Thread(**kwargs)
thread.setDaemon(Daemon)
thread.start()
return thread
def Animation(parent, type=b"windowOpacity", from_value=0, to_value=1, ms=1000, connect=None):
anim = QPropertyAnimation(parent, type)
anim.setDuration(ms)
anim.setStartValue(from_value)
anim.setEndValue(to_value)
if connect:
anim.finished.connect(connect)
anim.start()
return anim
class ProgressThread(QThread):
signal = pyqtSignal(int)
class QProgress(QWidget):
fsize = 10
bg_color = QColor("#95BBFF")
m_waterOffset = 0.005
m_offset = 50
m_borderwidth = 10
percent = 0
pen = QPen()
pen.setWidth(8)
pen.setCapStyle(Qt.RoundCap)
gradient = QConicalGradient(50, 50, 91)
gradient.setColorAt(0, QColor(255, 10, 10))
gradient.setColorAt(1, QColor(255, 201, 14))
pen.setBrush(gradient)
font = QFont()
font.setFamily(font_name) # Share-TechMono
def __init__(self,size,parent=None):
super(QProgress, self).__init__(parent)
self.resize(*size)
self.size = size
self.font.setPointSize(self.size[0] // 4)
def paintEvent(self, _):
width, height = self.size
rect = QRectF(self.fsize, self.fsize, width-self.fsize*2, height-self.fsize*2)
painter = QPainter(self)
#painter.begin(self)
rotateAngle = 360 * self.percent / 100
# 绘制准备工作,启用反锯齿
painter.setRenderHints(QPainter.Antialiasing)
painter.setPen(self.pen)
painter.drawArc(rect, (90 - 0) * 16, -rotateAngle * 16) # 画圆环
painter.setFont(self.font)
painter.setPen(QColor(153 - 1.53*self.percent ,
217-0.55*self.percent,
234-0.02*self.percent)) # r:255, g:201 - 10/100 * percent, b: 14-4 /100*percent 当前渐变
painter.drawText(rect, Qt.AlignCenter, f"{self.percent}%" ) # 显示进度条当前进度
painter.setPen(Qt.NoPen)
#获取窗口的宽度和高度
percentage = 1 - self.percent/100
# 水波走向:正弦函数 y = A(wx l) k
# w 表示 周期,值越大密度越大
w = 2 * math.pi / (width)
# A 表示振幅 ,理解为水波的上下振幅
A = height * self.m_waterOffset
# k 表示 y 的偏移量,可理解为进度
k = height *percentage
water1 = QPainterPath()
water2 = QPainterPath()
#起始点
water1.moveTo(5,height)
water2.moveTo(5,height)
self.m_offset = 0.6
if(self.m_offset >(width/2)):
self.m_offset = 0
i = 5
rect = QRectF(self.fsize 2,self.fsize 2,width - self.fsize*2 -4, height - self.fsize * 2 -4)
while(i < width-5):
water1.lineTo(i, A*math.sin(w*i self.m_offset ) k)
water2.lineTo(i, A*math.sin(w*i self.m_offset width/2*w) k)
i = 1
water1.lineTo(width-5,height)
water2.lineTo(width-5,height)
totalpath = QPainterPath()
painter.drawRect(self.rect())
painter.save()
totalpath.addEllipse(rect)
totalpath.intersected(water1)
painter.setPen(Qt.NoPen)
#设置水波的透明度
watercolor1 =QColor(self.bg_color)
watercolor1.setAlpha(100)
watercolor2 = QColor(self.bg_color)
watercolor2.setAlpha(150)
path = totalpath.intersected(water1)
painter.setBrush(watercolor1)
painter.drawPath(path)
path = totalpath.intersected(water2)
painter.setBrush(watercolor2)
painter.drawPath(path)
painter.restore()
painter.end()
self.update()
def update_percent(self, p):
self.percent = p
if self.m_waterOffset < 0.05:
self.m_waterOffset = 0.001
return p
class Progress(QDialog):
percent = 0
def __init__(self, text="",parent=None):
super(Progress, self).__init__()
Font = QFont()
Font.setFamily("Consolas")
Font.setPointSize(12)
self.setFont(Font)
self.setWindowFlags(Qt.FramelessWindowHint) # 去边框
self.setAttribute(Qt.WA_TranslucentBackground) # 设置窗口背景透明
self.ProgressThread = ProgressThread()
self.ProgressThread.signal.connect(self.percentUpdate)
self.ProgressThread.start()
width, height = 230, 230
self.resize(width, height)
self.pg = QProgress((width, height), self)
self.label = QLabel(self)
self.label.move((width-self.label.width())/3, height/3)
self.button = QPushButton(self)
self.button.setText(QCoreApplication.translate("Dialog", text))
self.button.move((width-self.button.width())/3, height/3*2)
self.button.setEnabled(False)
self.button.clicked.connect(self.start)
QMetaObject.connectSlotsByName(self)
#self.anim = Animation(self, )
threading(True, target=self.show)
def percentUpdate(self, num:int=0):
self.percent = num
if self.percent == 100:
self.button.setEnabled(True)
self.pg.update_percent(self.percent)
def updateStrProgress(self, text):
self.label.setText(QCoreApplication.translate("Dialog", text))
def start(self, _=None):
"""Do nothing."""
if __name__ == '__main__':
app = QApplication(sys.argv)
r = Progress("write something")
sys.exit(app.exec_())
资源(0积分)
资源 - TCP聊天文件服务器v2.2 - 服务端客户端套接字解决分包/粘包问题 - SocketQueue继承以及减少冗余