TCP聊天 传输文件服务器服务器套接字v2.4
所有版本记录:
v1.0
: TCP聊天服务器套接字|PyQt5 socket(TCP端口映射 端口放行) logging Thread(含日志,html) anaconda打包32位exe(3.4万字)|python高阶v1.1
: python TCP套接字服务器v1.1-新增服务端命令功能及修改bug(socket PyQt5)v1.2
: python TCP服务器v1.2 - 服务端新增用户登录注册(json, md5加密)v1.3
: python TCP服务器v1.3 - 服务器抗压测试及关闭套接字处理v1.4
: python TCP服务器v1.4 - 客户端连接服务器异常(异常情况分类)处理v1.5
: PyQt5可编辑下拉框(comboBox):editable - python TCP服务器v1.5 - 客户端连接界面增加自定义参数(设置超时, 连接地址可选)v1.6
: Python TCP服务器v1.6 - multiprocessing多进程及Ctrl-c(SIGINT)退出v1.7
: Python TCP服务器v1.7 - PyQt5 server服务端来临v1.8
: python TCP服务器v1.8 - PyQt5登录界面美化 淡入淡出v1.9
: socketTCP协程文件 信息传递 - TCP聊天文件服务器v1.9 - 划时代的版本更新(4.6万字)v2.0
: TCP聊天文件服务器v2.0 - 重大bug修复 PyQt5文件传输可视化v2.1
: TCP聊天文件服务器v2.1 - 服务端线程管理(threading.enumerate)v2.2
: TCP聊天文件服务器v2.2 - 服务端客户端套接字解决分包/粘包问题 - SocketQueue继承以及减少冗余v2.3
: gzip的使用 - TCP聊天文件服务器v2.3 - 文件传输建立缓存制度和.gz的解压缩/压缩解决运行内存过大
速度 = 传送字段大小 进度) / 结束时间(若无完成,取当前时间) - 起始时间
def get_time(self) -> float:
return getattr(self, "end_time", time.time()) - self.start_time # math.ceil(...)
def timers(self, *args) -> None:
timeit = self.get_time()
if timeit == 0:
return # ZeroDivisionError: integer division or modulo by zero
size = self.index * segment
contect = f"{covert(int(size / timeit))}/s ({self.str_time()} 秒)"
self.label.setText(contect)
Server服务端
代码语言:javascript复制class Item(QtWidgets.QListWidgetItem):
def __init__(self, *args, **kwargs):
super(Item, self).__init__(*args, **kwargs)
self.setSizeHint(QtCore.QSize(200, 80))
class FileArgumentWidget(QtWidgets.QWidget):
def __init__(self, name, total, itemindex, parent=None):
super(FileArgumentWidget, self).__init__(parent)
self.setContextMenuPolicy(QtCore.Qt.CustomContextMenu)
hbox = QtWidgets.QHBoxLayout(self)
vbox = QtWidgets.QVBoxLayout()
vbox.addWidget(QtWidgets.QLabel(name f"n({covert(total * segment)})", self))
progress = QtWidgets.QProgressBar()
progress.setMaximum(total)
progress.setStyleSheet("QProgressBar{n"
"text-align: center;n"
'font: 9pt "Consolas";n'
"}")
vbox.setObjectName("name, speed Info")
hbox.setObjectName("Info and Progress")
progress.setTextVisible(True)
progress.setRange(0, 0)
self.label = QtWidgets.QLabel(self)
self.label.setStyleSheet("color: rgb(60, 112, 255);")
vbox.addWidget(self.label)
self.total = total
self.index = 0
self.progress = progress
vbox.setSpacing(2)
hbox.addLayout(vbox)
hbox.addWidget(progress)
self.setLayout(hbox)
fonts = QtGui.QFont()
fonts.setFamily("Consolas")
fonts.setPointSize(9)
self.setFont(fonts)
self.label.setFont(fonts)
self.start_time = time.time()
self.timer = QtCore.QTimer(self)
self.timer.timeout.connect(self.timers)
self.timer.start(50)
item = Item(parent.icon, "", parent)
item.index_name = itemindex
parent.addItem(item)
parent.setItemWidget(item, self)
def timers(self, *args) -> None:
timeit = self.get_time()
if timeit == 0:
return # ZeroDivisionError: integer division or modulo by zero
size = self.index * segment
contect = f"{covert(int(size / timeit))}/s ({self.str_time()} 秒)"
self.label.setText(contect)
def get_progress(self) -> QtWidgets.QProgressBar:
return self.progress
def is_finished(self) -> bool:
return bool(self.index >= self.total)
def update(self) -> None:
if self.index == 0:
self.progress.setMaximum(self.total)
if self.is_finished():
return
self.index = 1
if self.is_finished():
self.end_time = time.time()
self.progress.setValue(self.index)
def get_total(self) -> int:
return self.total
def get_time(self) -> float:
return getattr(self, "end_time", time.time()) - self.start_time
def str_time(self) -> str:
return "%0.1f" % (self.get_time())
def get_index(self) -> int:
return self.index
def customContextMenuRequested(self, pos: QtCore.QPoint) -> None:
print(pos)
class FileListWidget(QtWidgets.QListWidget):
def __init__(self, parent=None):
super(FileListWidget, self).__init__(parent)
self.icon = QtGui.QIcon("images/file.png")
self.files = {}
def new_file(self, name, size, index):
widget = FileArgumentWidget(name, size, index, self)
self.files[index] = widget
def update_file(self, index) -> None:
self.files[index].update()
Client客户端
代码语言:javascript复制class ListWidgetLayout(QtWidgets.QWidget):
def __init__(self, name, total, parent=None):
super(ListWidgetLayout, self).__init__(parent)
hbox = QtWidgets.QHBoxLayout(self)
vbox = QtWidgets.QVBoxLayout()
vbox.addWidget(QtWidgets.QLabel(name f"n({covert(total * segment)})", self))
progress = QtWidgets.QProgressBar()
progress.setMaximum(total)
progress.setStyleSheet("QProgressBar{n"
"text-align: center;n"
'font: 9pt "Consolas";n'
"}")
vbox.setObjectName("name, speed Info")
hbox.setObjectName("Info and Progress")
progress.setTextVisible(True)
progress.setRange(0, 0)
self.label = QtWidgets.QLabel(self)
self.label.setStyleSheet("color: rgb(60, 112, 255);")
vbox.addWidget(self.label)
self.total = total
self.index = 0
self.progress = progress
vbox.setSpacing(2)
hbox.addLayout(vbox)
hbox.addWidget(progress)
self.setLayout(hbox)
fonts = QtGui.QFont()
fonts.setFamily("Consolas")
fonts.setPointSize(9)
self.setFont(fonts)
self.label.setFont(fonts)
self.start_time = time.time()
self.timer = QtCore.QTimer(self)
self.timer.timeout.connect(self.timers)
self.timer.start(50)
def timers(self, *args) -> None:
timeit = self.get_time()
if timeit == 0:
return # ZeroDivisionError: integer division or modulo by zero
size = self.index * segment
contect = f"{covert(int(size / timeit))}/s ({self.str_time()} 秒)"
self.label.setText(contect)
def get_progress(self) -> QtWidgets.QProgressBar:
return self.progress
def is_finished(self) -> bool:
return bool(self.index >= self.total)
def update(self) -> None:
if self.index == 0:
self.progress.setMaximum(self.total)
if self.is_finished():
return
self.index = 1
if self.is_finished():
self.end_time = time.time()
self.progress.setValue(self.index)
def get_total(self) -> int:
return self.total
def get_time(self) -> float:
return getattr(self, "end_time", time.time()) - self.start_time
def str_time(self) -> str:
return "%0.1f" % (self.get_time())
def get_index(self) -> int:
return self.index
class FileDialog(QtWidgets.QDialog):
update_file = QtCore.pyqtSignal(int)
new_file = QtCore.pyqtSignal(list)
def __init__(self, save_path, parent=None):
self.current = ""
super(FileDialog, self).__init__(parent)
self.icon = QtGui.QIcon("images/file.png")
self.download = QtGui.QIcon("images/download.png")
self.upload = QtGui.QIcon("images/upload.png")
self.DOWNLOAD = 0
self.UPLOAD = 1
self.LOAD_dict = {self.DOWNLOAD: self.download,
self.UPLOAD: self.upload}
self.setWindowFlags(QtCore.Qt.WindowMinimizeButtonHint) # 只显示最小化按钮
self.save_path = save_path
self.filedict = {}
self.pathdict = {}
self.namedict = {}
self.update_file.connect(self.fileUpdate)
self.new_file.connect(self.newFile)
self.setObjectName("Dialog")
self.resize(666, 421)
font = QtGui.QFont()
font.setFamily("Consolas")
self.setFont(font)
self.setWindowIcon(self.icon)
self.gridLayout = QtWidgets.QGridLayout(self)
self.gridLayout.setObjectName("gridLayout")
self.label_4 = QtWidgets.QLabel(self)
self.label_4.setObjectName("label_4")
self.gridLayout.addWidget(self.label_4, 1, 2, 1, 1)
self.listWidget_2 = QtWidgets.QListWidget(self)
self.listWidget_2.setObjectName("listWidget_2")
self.gridLayout.addWidget(self.listWidget_2, 0, 0, 4, 1)
self.groupBox = QtWidgets.QGroupBox(self)
self.groupBox.setObjectName("groupBox")
self.gridLayout_2 = QtWidgets.QGridLayout(self.groupBox)
self.gridLayout_2.setObjectName("gridLayout_2")
self.label = QtWidgets.QLabel(self.groupBox)
self.label.setObjectName("label")
self.gridLayout_2.addWidget(self.label, 1, 0, 1, 2)
self.progressBar = QtWidgets.QProgressBar(self.groupBox)
font = QtGui.QFont()
font.setFamily("Consolas")
self.progressBar.setFont(font)
self.progressBar.setProperty("value", 0)
self.progressBar.setTextVisible(True)
self.progressBar.setObjectName("progressBar")
self.gridLayout_2.addWidget(self.progressBar, 0, 0, 1, 2)
self.label_3 = QtWidgets.QLabel(self.groupBox)
self.label_3.setObjectName("label_3")
self.gridLayout_2.addWidget(self.label_3, 2, 0, 1, 2)
self.pushButton = QtWidgets.QPushButton(self.groupBox)
self.pushButton.setObjectName("pushButton")
self.pushButton.setEnabled(False)
self.gridLayout_2.addWidget(self.pushButton, 4, 0, 1, 1)
self.label_2 = QtWidgets.QLabel(self.groupBox)
self.label_2.setObjectName("label_2")
self.gridLayout_2.addWidget(self.label_2, 3, 0, 1, 2)
self.pushButton_2 = QtWidgets.QPushButton(self.groupBox)
self.pushButton_2.setObjectName("pushButton_2")
self.gridLayout_2.addWidget(self.pushButton_2, 4, 1, 1, 1)
self.gridLayout.addWidget(self.groupBox, 0, 2, 1, 1)
self.lineEdit = QtWidgets.QLineEdit(self)
self.lineEdit.setReadOnly(True)
self.lineEdit.setObjectName("lineEdit")
self.gridLayout.addWidget(self.lineEdit, 2, 2, 1, 1)
self.line = QtWidgets.QFrame(self)
self.line.setFrameShape(QtWidgets.QFrame.VLine)
self.line.setFrameShadow(QtWidgets.QFrame.Sunken)
self.line.setObjectName("line")
self.gridLayout.addWidget(self.line, 0, 1, 4, 1)
spacerItem = QtWidgets.QSpacerItem(20, 40, QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Expanding)
self.gridLayout.addItem(spacerItem, 3, 2, 1, 1)
self.retranslateUi()
QtCore.QMetaObject.connectSlotsByName(self)
def retranslateUi(self):
_translate = QtCore.QCoreApplication.translate
self.setWindowTitle(_translate("Dialog", "Files"))
self.pushButton.setText(_translate("Dialog", "打开"))
self.pushButton_2.setText(_translate("Dialog", "打开文件夹"))
self.pushButton.clicked.connect(self.startfile)
self.pushButton_2.clicked.connect(self.startpath)
self.label_4.setText(_translate("Dialog", f"下载位置:"))
self.lineEdit.setText(_translate("Dialog", self.save_path))
self.listWidget_2.itemClicked.connect(self.fileChanged)
def newFile(self, args: list):
self.activateWindow() # 窗口置顶
index = len(self.filedict)
name, total, size, (type, path) = args
png = self.LOAD_dict.get(type, self.UPLOAD)
layout = ListWidgetLayout(name, total)
progress = layout.get_progress()
self.filedict[index] = (layout, size)
self.pathdict[index] = path
Item = QtWidgets.QListWidgetItem(png, "", self.listWidget_2)
Item.index_name = index # <-
Item.setSizeHint(QtCore.QSize(200, 80))
self.listWidget_2.addItem(Item)
self.listWidget_2.setItemWidget(Item, layout)
self.namedict[index] = name
# if not self.current:
# self.changeCurrent(0)
self.changeCurrent(index)
def changeCurrent(self, index: int):
self.current = index
self.changeGroupBox()
self.show()
def fileChanged(self, widget: QtWidgets.QListWidgetItem):
self.current = widget.index_name
self.changeGroupBox()
def changeGroupBox(self):
name = self.namedict[self.current]
layout, size = self.filedict[self.current]
layout: ListWidgetLayout
size = covert(size, True)
_translate = QtCore.QCoreApplication.translate
self.groupBox.setTitle(_translate("Dialog", f"{name}"))
self.label.setText(_translate("Dialog", f"文件大小: {size}"))
self.label_3.setText(_translate("Dialog", f"文件名: {name}"))
self.label_2.setText(_translate("Dialog", f"位置: {os.path.join(self.save_path, name)}"))
self.progressBar.setMaximum(layout.get_total())
self.progressBar.setValue(layout.get_index())
self.pushButton.setEnabled(layout.is_finished())
self.fpath = self.pathdict[self.current]
self.path = os.path.dirname(self.fpath)
def fileUpdate(self, index: int):
layout, size = self.filedict[index]
layout: ListWidgetLayout
layout.update()
if index == self.current:
self.changeGroupBox()
def startfile(self, _):
if hasattr(self, "fpath") and os.path.isfile(self.fpath):
os.startfile(self.fpath)
def startpath(self, _):
if hasattr(self, "path") and os.path.isdir(self.path):
os.startfile(self.path)
全部代码
行了, 其他文件都没改, 就只有两个主文件server.pyw和user.pyw更改
服务端
代码语言:javascript复制import asyncio
import logging
import math
import os
import socket
import sys
import time
from threading import Thread
from PyQt5 import QtCore, QtGui, QtWidgets
from psutil import cpu_percent
import data # data.py
from socket_queue import SocketQueue # socket_queue.py
from gzip import compress, decompress
__version__ = 2.4
base = 1024
segment = base * 2 # 防止切断
delay = 0.04
new_file = 0
update_file = 1
request_file = 2
normal_text = 3
loop = asyncio.get_event_loop()
class QLogger(logging.Handler):
def __init__(self, *args, **kwargs):
logging.Handler.__init__(self, *args, **kwargs)
self.output = lambda *_: None
self.setFormatter(logging.Formatter(
"[<font color='darkgreen'>%(asctime)s</font>(<font color='blue'>%(levelname)s</font>)]: <font color='brown'>%(message)s</font>"))
def emit(self, record):
record = self.format(record)
if hasattr(self, "output"):
self.output(record)
def connect(self, func):
if callable(func):
self.output = func
def threading(Daemon, name=None, **kwargs):
thread = Thread(**kwargs)
thread.setDaemon(Daemon)
if name:
thread.setName(name)
thread.start()
return thread
file_thread = threading(True, "文件传输", target=loop.run_forever)
def thread_info(thread: Thread):
return f"{str(thread._name).ljust(12)}{thread._ident}({thread.__class__.__name__})"
def ignore(function):
def i(*args, **kwargs):
try:
function(*args, **kwargs)
except:
return
return i
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.DEBUG)
Qlog = QLogger()
logger.addHandler(Qlog)
filehandle = logging.FileHandler("log.txt")
filehandle.setFormatter(logging.Formatter("[%(asctime)s(%(levelname)s)]: %(message)s"))
logger.addHandler(filehandle)
logger.setLevel(logging.DEBUG)
def to_logging(command):
def logs(*args, **kwargs):
try:
_result = command(*args, **kwargs)
if _result is None:
return True
return _result
except:
logger.exception(str())
return False
return logs
class Command_Handler(object):
def __init__(self, bind):
"""Bind Client class"""
assert isinstance(bind, Client)
self.client = bind
def _function(self, _list):
data = {"/info": {"-v": self.get_version(),
"-id": self.get_id(),
"-i": self.info(),
"-h": self.help(),
"-name": self.name()},
}
_dict = data
for n in range(len(_list)):
if type(_dict) == dict:
_dict = _dict.get(_list[n], self.unknown(" ".join(_list)))
else:
break
if type(_dict) == dict:
_dict = "Error:n<font color='blue'>This command must take more arguments. Such as %s.</font>" % list(
_dict.keys())
return _dict
@staticmethod
def help():
return """/info [-v] [-id] [-i]
-v : get version of program.
-id : get your id.
-i : get information.
-h : help.
-name : get your name
For example, <font color=red>/info -id</font>"""
@staticmethod
def get_version():
return "version : " str(__version__)
def get_id(self):
return "Your id is {}.".format(id(self.client))
def name(self):
return "Your name is {}.".format(self.client.username)
def info(self):
return f"Socket Server[version {self.get_version()}] By zmh."
def unknown(self, s):
return """Error:
No command named "%s". Please search [/info -h] to help.
%s""" % (s, self.help())
def cut(self, string):
return string.strip().split()
def handler(self, c):
return "<font color='gray'>[command]</font><font color='brown'>%s</font>n%s" % (
c, str(self._function(self.cut(c))))
def iscommand(self, i):
return i.strip().startswith("/")
class Server:
join_message = "<font color='red'>Server></font> <font color='blue'>%s(%s)</font> 连接服务器. 当前在线人数: <font color='red'>%s</font>"
user_message = "<font color='%s'>%s(%s)%s></font> %s"
quit_message = "%s(%s) 下线了, %s"
def __init__(self, usernumUpdate=lambda _: None):
self.user_num_change = usernumUpdate
def Setup(self, addr, port, backlog=10, max_count=base ** 2, encode='utf8'):
self.user_handle = message_handle(self)
self.address = addr, port
self.backlog = backlog
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind(self.address)
self.socket.listen(backlog)
self.max_count = max_count
self.connect = []
self.encode = encode
self.user_record = data.user()
return self.run()
def clear_socket(self, clear_ms=500):
logger.info(f"Clear the closed socket once every {clear_ms} ms.")
while True:
del_list = list(filter(lambda c: hasattr(c, 'Quitted') or (not c.isOpen()), self.connect))
for user in del_list:
self.connect.remove(user)
# if del_list:
# logger.info(f"Clear the closed client socket, number is {len(del_list)}.")
# else:
# logger.info('None of the sockets have been cleaned.')
time.sleep(clear_ms / 1000)
def run(self):
logger.debug(f"Server [{':'.join(map(str, self.address))}] on.")
logger.info(f"Server pid {os.getpid()}.")
logger.info(f"Max receive length {covert(self.max_count, fine=True)}.")
logger.info(
f"Single file transfer speed: <font color='blue'>{covert(segment * (1 // delay))}/s<font>({covert(segment)} × {int(1 // delay)})")
gui.Database_signal.emit("<font color='gray'>[Transfer speed[-SEGMENT]] = [Maximum load] ÷ 2.</font>")
logger.info("Backlog number: " str(self.backlog))
logger.info('The CODEC is sent as ' self.encode)
logger.info("The server is listening on the port.")
threading(Daemon=True, name="离线清理", target=self.clear_socket)
return threading(Daemon=True, name="监听端口", target=self.accept_client)
def _get_Clients(self) -> list:
def func(c):
return c.__filter__()
return list(filter(func, self.connect))
def _get_sockets(self): # return int
i = len(self._get_Clients())
self.user_num_change(i)
return i
def _str_sockets(self):
return f"当前人数 {self._get_sockets()}"
def ServerMessage(self, mes, inc=True):
for user in self._get_Clients():
if user.__filter__():
user.send(mes)
def UserMessage(self, address, _user, mes, inc=True):
if not mes:
return
for user in self.connect:
if user.__filter__():
username = user.username
send_message = Server.user_message % ("brown" if _user == username else "red",
_user,
address,
"(我自己)" if _user == username else "",
mes)
user.send(send_message)
logger.info(f"{address}[{_user}] : {mes}")
def error_handle(self):
for user in filter(lambda c: not c.isOpen(), self.connect):
self.connect.remove(user)
def accept_client(self):
while True:
sock, (address, _) = self.socket.accept() # 阻塞,等待客户端连接
self.connect.append(Client(sock, address, self))
logger.info(f'The address {address} is connected to the server.')
def quit(self, username, address):
QuitMessage = Server.quit_message % (username, address, self._str_sockets())
logger.info(QuitMessage)
self.ServerMessage(QuitMessage, False)
def login(self, username, address):
logger.info(f"{address}[{username}] 登录服务器 , " self._str_sockets())
self.ServerMessage(Server.join_message % (username, address, self._get_sockets()))
class Client(SocketQueue):
def __init__(self, socket, addr, server: Server):
super(Client, self).__init__(socket, server.max_count, server.encode)
self.addr = addr
if not isinstance(server, Server):
raise ValueError
self.server = server
self.username = str()
self.com = Command_Handler(self)
self.thread = threading(True, name=f"客户端{self.addr}", target=self.forever_receive)
self._login = False
def normal_text(self, s):
return repr((normal_text, s)).encode(self.codec)
def isLogin(self) -> bool:
return getattr(self, "_login", False)
def __filter__(self) -> bool:
"""返回是否在线并已可接受消息"""
return self.isLogin() and self.isOpen()
@ignore
def login(self):
self.username = self.recv()[:15]
if self.server.user_record.__in__(self.username):
if self.server.user_record.handler(self.username,
self.input("<font color='red'>请输入您的密码: (右下[send]键发送)</font>")):
self.send(f'<font color="green">欢迎回来, {self.username}.</font>')
else:
self.send('<font color="red">密码错误,请重试.</font>')
self.__del__()
else:
while True:
p1 = self.input(
"<font color='blue'>[i]提示: 密码需在4 ~ 10位之间, 且不能换行.</font>n<font color='red'>请输入您的密码: (右下[send]键发送)</font>")
if (4 <= len(p1) <= 10) and not ('n' in p1):
break
while True:
p2 = self.input("<font color='red'>再次输入您的密码: (右下[send]键发送)</font>")
if p1 == p2:
break
else:
self.send("<font color='red'>密码与前次不符!</font>")
self.server.user_record.handler(self.username, p1)
self.send(f'初来乍到, {self.username}')
self._login = True
self.server.login(self.username, self.addr)
def quitEvent(self) -> None:
self.server.quit(self.username, self.addr)
def parse_argument(self, arg: str) -> str:
return self.server.user_handle.handle(arg.strip(), self)
@ignore
def forever_receive(self):
self.login()
while self.__filter__():
string = self.recv()
if string is None:
continue
elif self.com.iscommand(string):
self.send(self.com.handler(string))
else:
self.server.UserMessage(self.addr, self.username, string)
def get_host_ip() -> str:
"""get current IP address"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
class Interface(QtWidgets.QMainWindow):
Database_signal = QtCore.pyqtSignal(str)
Usernum_signal = QtCore.pyqtSignal(int)
New_file_signal = QtCore.pyqtSignal(str, int, int)
Update_file_signal = QtCore.pyqtSignal(int)
def __init__(self):
super(Interface, self).__init__()
self.setWindowIcon(QtGui.QIcon("images/server.png"))
self.setupUi()
self.show()
def setupUi(self):
self.setObjectName("MainWindow")
self.resize(1088, 685)
font = QtGui.QFont()
font.setFamily("Consolas")
font.setPointSize(11)
self.setFont(font)
self.setStyleSheet("")
self.centralwidget = QtWidgets.QWidget(self)
self.centralwidget.setObjectName("centralwidget")
self.gridLayout = QtWidgets.QGridLayout(self.centralwidget)
self.gridLayout.setObjectName("gridLayout")
self.label_6 = QtWidgets.QLabel(self.centralwidget)
self.label_6.setObjectName("label_6")
self.gridLayout.addWidget(self.label_6, 4, 0, 1, 1)
self.textEdit_2 = QtWidgets.QTextEdit(self.centralwidget)
self.textEdit_2.setObjectName("textEdit_2")
self.gridLayout.addWidget(self.textEdit_2, 5, 0, 1, 1)
self.groupBox = QtWidgets.QGroupBox(self.centralwidget)
self.groupBox.setObjectName("groupBox")
self.formLayout_2 = QtWidgets.QFormLayout(self.groupBox)
self.formLayout_2.setObjectName("formLayout_2")
self.label_2 = QtWidgets.QLabel(self.groupBox)
self.label_2.setObjectName("label_2")
self.formLayout_2.setWidget(0, QtWidgets.QFormLayout.LabelRole, self.label_2)
self.lineEdit = QtWidgets.QLineEdit(self.groupBox)
self.lineEdit.setObjectName("lineEdit")
self.formLayout_2.setWidget(0, QtWidgets.QFormLayout.FieldRole, self.lineEdit)
self.label_8 = QtWidgets.QLabel(self.groupBox)
self.label_8.setObjectName("label_8")
self.formLayout_2.setWidget(1, QtWidgets.QFormLayout.LabelRole, self.label_8)
self.lineEdit_3 = QtWidgets.QLineEdit(self.groupBox)
self.lineEdit_3.setObjectName("lineEdit_3")
self.formLayout_2.setWidget(1, QtWidgets.QFormLayout.FieldRole, self.lineEdit_3)
self.label_7 = QtWidgets.QLabel(self.groupBox)
self.label_7.setObjectName("label_7")
self.formLayout_2.setWidget(2, QtWidgets.QFormLayout.LabelRole, self.label_7)
self.lineEdit_2 = QtWidgets.QLineEdit(self.groupBox)
self.lineEdit_2.setReadOnly(True)
self.lineEdit_2.setObjectName("lineEdit_2")
self.formLayout_2.setWidget(2, QtWidgets.QFormLayout.FieldRole, self.lineEdit_2)
spacerItem = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
self.formLayout_2.setItem(5, QtWidgets.QFormLayout.LabelRole, spacerItem)
self.pushButton = QtWidgets.QPushButton(self.groupBox)
self.pushButton.setObjectName("pushButton")
self.formLayout_2.setWidget(5, QtWidgets.QFormLayout.FieldRole, self.pushButton)
self.lineEdit_4 = QtWidgets.QLineEdit(self.groupBox)
self.lineEdit_4.setObjectName("lineEdit_4")
self.formLayout_2.setWidget(3, QtWidgets.QFormLayout.FieldRole, self.lineEdit_4)
self.label_9 = QtWidgets.QLabel(self.groupBox)
self.label_9.setObjectName("label_9")
self.formLayout_2.setWidget(3, QtWidgets.QFormLayout.LabelRole, self.label_9)
self.label_10 = QtWidgets.QLabel(self.groupBox)
self.label_10.setObjectName("label_10")
self.formLayout_2.setWidget(4, QtWidgets.QFormLayout.LabelRole, self.label_10)
self.lineEdit_5 = QtWidgets.QLineEdit(self.groupBox)
self.lineEdit_5.setObjectName("lineEdit_5")
self.formLayout_2.setWidget(4, QtWidgets.QFormLayout.FieldRole, self.lineEdit_5)
self.gridLayout.addWidget(self.groupBox, 0, 0, 4, 1)
self.line = QtWidgets.QFrame(self.centralwidget)
self.line.setFrameShape(QtWidgets.QFrame.VLine)
self.line.setFrameShadow(QtWidgets.QFrame.Sunken)
self.line.setObjectName("line")
self.gridLayout.addWidget(self.line, 2, 1, 4, 1)
self.label_5 = QtWidgets.QLabel(self.centralwidget)
font = QtGui.QFont()
font.setFamily("Comic Sans MS")
font.setPointSize(14)
font.setBold(False)
font.setWeight(50)
self.label_5.setFont(font)
self.label_5.setStyleSheet(
"background-color:qlineargradient(spread:pad, x1:0, y1:0, x2:1, y2:1, stop:0 rgba(0, 255, 241, 255), stop:0.930348 rgba(0, 158, 255, 255));n"
"color:rgb(85, 0, 255)")
self.label_5.setObjectName("label_5")
self.gridLayout.addWidget(self.label_5, 0, 1, 1, 3)
self.groupBox_2 = QtWidgets.QGroupBox(self.centralwidget)
self.groupBox_2.setEnabled(False)
self.groupBox_2.setObjectName("groupBox_2")
self.gridLayout_2 = QtWidgets.QGridLayout(self.groupBox_2)
self.gridLayout_2.setObjectName("gridLayout_2")
self.lcdNumber = QtWidgets.QLCDNumber(self.groupBox_2)
self.lcdNumber.setObjectName("lcdNumber")
self.gridLayout_2.addWidget(self.lcdNumber, 1, 1, 1, 1)
self.line_3 = QtWidgets.QFrame(self.groupBox_2)
self.line_3.setFrameShape(QtWidgets.QFrame.HLine)
self.line_3.setFrameShadow(QtWidgets.QFrame.Sunken)
self.line_3.setObjectName("line_3")
self.gridLayout_2.addWidget(self.line_3, 3, 0, 1, 2)
self.listView_2 = FileListWidget(self.groupBox_2)
self.listView_2.setObjectName("file_list")
self.gridLayout_2.addWidget(self.listView_2, 5, 0, 1, 2)
self.label_4 = QtWidgets.QLabel(self.groupBox_2)
self.label_4.setObjectName("label_4")
self.gridLayout_2.addWidget(self.label_4, 4, 0, 1, 1)
self.label_3 = QtWidgets.QLabel(self.groupBox_2)
self.label_3.setObjectName("label_3")
self.gridLayout_2.addWidget(self.label_3, 1, 0, 1, 1)
self.label = QtWidgets.QLabel(self.groupBox_2)
self.label.setObjectName("label")
self.gridLayout_2.addWidget(self.label, 0, 0, 1, 1)
self.progressBar = QtWidgets.QProgressBar(self.groupBox_2)
self.progressBar.setProperty("value", 0)
self.progressBar.setObjectName("progressBar")
self.gridLayout_2.addWidget(self.progressBar, 0, 1, 1, 1)
self.gridLayout.addWidget(self.groupBox_2, 1, 2, 5, 2)
self.textEdit_2.setReadOnly(True)
self.setCentralWidget(self.centralwidget)
self.statusbar = QtWidgets.QStatusBar(self)
self.statusbar.setObjectName("statusbar")
self.setStatusBar(self.statusbar)
self.groupBox_2.setEnabled(False)
self.retranslateUi()
self.pushButton.clicked.connect(self.run)
QtCore.QMetaObject.connectSlotsByName(self)
self.lcdNumber.display(0)
self.cpu = cpuThread()
self.cpu.signal.connect(self.progressUpdate)
self.Database_signal.connect(self.databaseUpdate)
self.Usernum_signal.connect(self.usernumUpdate)
Qlog.connect(self.Database_signal.emit)
self.New_file_signal.connect(self.listView_2.new_file)
self.Update_file_signal.connect(self.listView_2.update_file)
def progressUpdate(self, v):
self.progressBar.setValue(int(v))
@to_logging
def handle(self):
self.max_recv = int(float(self.lineEdit.text()) * 1024) # 单位是kb, 换算为字节.
global segment
segment = self.max_recv // 2
self.backlog = int(self.lineEdit_3.text())
self.addr = self.lineEdit_4.text()
self.port = int(self.lineEdit_5.text())
server.Setup(self.addr, self.port, self.backlog, self.max_recv)
def run(self, _):
if self.handle():
self.groupBox.setEnabled(False)
self.groupBox_2.setEnabled(True)
self.cpu.start()
def databaseUpdate(self, data: str):
if data:
time.sleep(0.01) # Qpainter 过快导致死机
self.textEdit_2.append(data.strip())
self.textEdit_2.moveCursor(QtGui.QTextCursor.End)
def usernumUpdate(self, i):
if i != int(self.lcdNumber.value()):
self.lcdNumber.display(i)
def retranslateUi(self, ):
_translate = QtCore.QCoreApplication.translate
self.setWindowTitle(_translate("MainWindow", "Socket Server"))
self.label_6.setText(_translate("MainWindow", "Database(Logging and traceback):"))
self.groupBox.setTitle(_translate("MainWindow", "Server Setup"))
self.label_2.setText(_translate("MainWindow", "Maximum load(kb):"))
self.lineEdit.setText(_translate("MainWindow", str(1024)))
self.label_8.setText(_translate("MainWindow", "backlog:"))
self.lineEdit_3.setText(_translate("MainWindow", "10"))
self.label_7.setText(_translate("MainWindow", "CODEC(Unalterable):"))
self.lineEdit_2.setText(_translate("MainWindow", "utf8"))
self.pushButton.setText(_translate("MainWindow", "Run"))
self.lineEdit_4.setText(_translate("MainWindow", "127.0.0.1"))
self.label_9.setText(_translate("MainWindow", "Address:"))
self.label_10.setText(_translate("MainWindow", "Port:"))
self.lineEdit_5.setText(_translate("MainWindow", "429"))
self.label_5.setText(_translate("MainWindow", f"TCP Server v{__version__}"))
self.groupBox_2.setTitle(_translate("MainWindow", "Run"))
self.label_4.setText(_translate("MainWindow", "Receive files:"))
self.label_3.setText(_translate("MainWindow", "Online user(s):"))
self.label.setText(_translate("MainWindow", "Running memory with CPU"))
class Item(QtWidgets.QListWidgetItem):
def __init__(self, *args, **kwargs):
super(Item, self).__init__(*args, **kwargs)
self.setSizeHint(QtCore.QSize(200, 80))
class FileArgumentWidget(QtWidgets.QWidget):
def __init__(self, name, total, itemindex, parent=None):
super(FileArgumentWidget, self).__init__(parent)
self.setContextMenuPolicy(QtCore.Qt.CustomContextMenu)
hbox = QtWidgets.QHBoxLayout(self)
vbox = QtWidgets.QVBoxLayout()
vbox.addWidget(QtWidgets.QLabel(name f"n({covert(total * segment)})", self))
progress = QtWidgets.QProgressBar()
progress.setMaximum(total)
progress.setStyleSheet("QProgressBar{n"
"text-align: center;n"
'font: 9pt "Consolas";n'
"}")
vbox.setObjectName("name, speed Info")
hbox.setObjectName("Info and Progress")
progress.setTextVisible(True)
progress.setRange(0, 0)
self.label = QtWidgets.QLabel(self)
self.label.setStyleSheet("color: rgb(60, 112, 255);")
vbox.addWidget(self.label)
self.total = total
self.index = 0
self.progress = progress
vbox.setSpacing(2)
hbox.addLayout(vbox)
hbox.addWidget(progress)
self.setLayout(hbox)
fonts = QtGui.QFont()
fonts.setFamily("Consolas")
fonts.setPointSize(9)
self.setFont(fonts)
self.label.setFont(fonts)
self.start_time = time.time()
self.timer = QtCore.QTimer(self)
self.timer.timeout.connect(self.timers)
self.timer.start(50)
item = Item(parent.icon, "", parent)
item.index_name = itemindex
parent.addItem(item)
parent.setItemWidget(item, self)
def timers(self, *args) -> None:
timeit = self.get_time()
if timeit == 0:
return # ZeroDivisionError: integer division or modulo by zero
size = self.index * segment
contect = f"{covert(int(size / timeit))}/s ({self.str_time()} 秒)"
self.label.setText(contect)
def get_progress(self) -> QtWidgets.QProgressBar:
return self.progress
def is_finished(self) -> bool:
return bool(self.index >= self.total)
def update(self) -> None:
if self.index == 0:
self.progress.setMaximum(self.total)
if self.is_finished():
return
self.index = 1
if self.is_finished():
self.end_time = time.time()
self.progress.setValue(self.index)
def get_total(self) -> int:
return self.total
def get_time(self) -> float:
return getattr(self, "end_time", time.time()) - self.start_time
def str_time(self) -> str:
return "%0.1f" % (self.get_time())
def get_index(self) -> int:
return self.index
def customContextMenuRequested(self, pos: QtCore.QPoint) -> None:
print(pos)
class FileListWidget(QtWidgets.QListWidget):
def __init__(self, parent=None):
super(FileListWidget, self).__init__(parent)
self.icon = QtGui.QIcon("images/file.png")
self.files = {}
def new_file(self, name, size, index):
widget = FileArgumentWidget(name, size, index, self)
self.files[index] = widget
def update_file(self, index) -> None:
self.files[index].update()
class cpuThread(QtCore.QThread):
signal = QtCore.pyqtSignal(int)
def run(self) -> None:
while True:
self.signal.emit(int(cpu_percent(interval=1)))
def save_bytes(file, byte: bytes):
with open(file, "wb") as f:
f.write(byte)
def get_eval(str, defined=None):
try:
res = eval(str)
if isinstance(res, type(defined)):
return res
raise TypeError
except:
return defined
class SEND:
def __init__(self, index, name, conn: callable, encode='utf8', localfile=""):
self.localfile = localfile
self.encode = encode
self.size = os.path.getsize(self.localfile)
self.total = math.ceil(self.size / segment)
self.index = index
self.conn = conn
self.finish = False
self.name = name
asyncio.run_coroutine_threadsafe(self.update(), loop)
async def update(self):
self.conn(self.header().encode(self.encode))
with open(self.localfile, "rb") as f:
for n in range(self.total):
self.conn(self.format(n, compress(f.read(segment))).encode(self.encode))
await asyncio.sleep(delay)
self.finish = True
def cut(self, byte: bytes, seg=segment) -> list:
return [byte[x:x seg] for x in range(0, len(byte), seg)]
def format(self, process, data) -> str:
return repr((update_file, (self.index, process, data)))
def header(self) -> str:
return repr((new_file, (self.index,
self.name,
self.total,
self.size)
))
class RECV:
def __init__(self, index: int, name: str, total: int, size: int):
self.index, self.name, self.total, self.size = index, name, total, size
self.progress = -1
self.file = []
self.finish = False
self.save_path = os.path.join(save_path, self.name)
gui.New_file_signal.emit(name, total, index)
def update(self, p, data):
if isinstance(p, int) and p - 1 == self.progress:
self.progress = p
self.file.append(decompress(data))
gui.Update_file_signal.emit(self.index)
self.update_file()
if len(self.file) == self.total:
logger.info(f"Save {self.name} at {self.save_path}, size {self.size} b.")
self.finish = True
return True
def update_file(self):
with open(self.save_path, "ab") as f:
f.write(self.file[-1])
def save(self):
return self.finish
def savepath(self) -> str:
if self.finish:
return self.save_path
return ""
class send_files:
def __init__(self, encode='utf8'):
self.sends = []
self.encode = encode
def localfile(self, file, conn):
if os.path.isfile(file):
_, name = os.path.split(file)
self.sends.append(
SEND(len(self.sends), name, conn, localfile=file)) # index: len(self.sends)-1 1 => len(self.sends)
class recv_files:
def __init__(self, decode='utf8', path=None):
self.recvs = []
self.decode = decode
if path is None:
path = sys.path[0]
self.path = path
def new_files(self, index, name, total, size):
self.recvs.append(RECV(index, name, total, size))
logger.info(f"New file - {name} - {covert(size, fine=True)}.")
def apply(self, index, progress, data):
if len(self.recvs) - 1 >= index:
if self.recvs[index].update(progress, data):
if self.save(index):
return index, self.recvs[index].name
else:
return False
def save(self, index):
if len(self.recvs) - 1 >= index:
return self.recvs[index].save()
class message_handle:
codec = "utf8"
def __init__(self, server: Server):
if not os.path.isdir(save_path):
os.makedirs(save_path)
self.Sender = send_files(self.codec, )
self.Receiver = recv_files(self.codec, save_path)
self.files_record = {}
self.server = server
@to_logging
def handle(self, data, client: Client):
_res = get_eval(data, tuple())
if len(_res) == 2:
type, arguments = _res
if type == new_file:
index, name, total, size = arguments
if not client.username in self.files_record:
if index == 0:
self.files_record[client.username] = [len(self.Receiver.recvs), ]
else:
self.files_record[client.username].append(len(self.Receiver.recvs))
self.Receiver.new_files(len(self.Receiver.recvs), name, total, size)
elif type == update_file:
index, progress, data = arguments
if client.username in self.files_record:
if not len(self.files_record[client.username]) >= index 1:
index = len(self.files_record[client.username]) - 1
_res = self.Receiver.apply(self.files_record[client.username][index], progress, data)
if _res:
INDEX, NAME = _res
self.server.UserMessage(client.addr, client.username, f'<a href="{INDEX}">{NAME}</a>')
elif type == request_file:
path = self.Receiver.recvs[arguments].savepath()
if path:
self.Sender.localfile(path, client.send) # 如若无, 报错False
elif type == normal_text:
return arguments
def send(self, sendpath, conn):
return self.Sender.localfile(sendpath, conn)
def get_index(self, index):
if index 1 <= len(self.Receiver.recvs):
return self.Receiver.recvs[index]
def covert(byte, fine=False):
"""
位 bit (比特)(Binary Digits):存放一位二进制数,即 0 或 1,最小的存储单位。
字节 byte:8个二进制位为一个字节(B),最常用的单位。
其中1024=2^10 ( 2 的10次方),
1KB (Kilobyte 千字节)=1024B,
1MB (Megabyte 兆字节 简称“兆”)=1024KB,
1GB (Gigabyte 吉字节 又称“千兆”)=1024MB,
1TB (Trillionbyte 万亿字节 太字节)=1024GB,
1PB(Petabyte 千万亿字节 拍字节)=1024TB,
1EB(Exabyte 百亿亿字节 艾字节)=1024PB,
1ZB (Zettabyte 十万亿亿字节 泽字节)= 1024 EB,
1YB (Jottabyte 一亿亿亿字节 尧字节)= 1024 ZB,
1BB (Brontobyte 一千亿亿亿字节)= 1024 YB.
「山木兮」"""
if not isinstance(byte, (int, float)):
byte = len(byte)
DEI = f"{byte} bytes"
base = 1024
units = ["b",
"Kb",
"Mb",
"Gb",
"Tb",
"Pb",
"Eb"]
index = 0
while True:
if byte < 1024 or index 1 >= len(units):
break
byte /= base
index = 1
if index == 0:
return DEI
else:
if fine:
return "%0.1f%s(%s)" % (byte, units[index], DEI)
else:
return "%0.1f%s" % (byte, units[index])
if __name__ == "__main__":
save_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "resource")
app = QtWidgets.QApplication(sys.argv)
app.setStyle("fusion")
gui = Interface()
server = Server(gui.Usernum_signal.emit)
sys.exit(app.exec_())
客户端
代码语言:javascript复制# -*- coding: utf-8 -*-
# Form implementation generated from reading ui files 'USER.ui', 'Connect.ui'
# Created by: PyQt5 UI code generator 5.15.4
# writer : Zmh
import asyncio
import logging
import math
import os
import socket
import sys
import time
from datetime import datetime
from threading import Thread
from time import sleep
from traceback import format_exc
from PyQt5 import QtCore, QtGui, QtWidgets
from ProgressBar import Animation
from socket_queue import SocketClient
from gzip import compress, decompress
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')
TIMEOUT = 2
base = 1024
segment = base * 10 # 防止切断
bufsize = base ** 2
delay = 0.04
new_file = 0
update_file = 1
request_file = 2
normal_text = 3
loop = asyncio.get_event_loop()
file_thread = Thread(target=loop.run_forever)
def get_host_ip() -> str:
"""get current IP address"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
Username = str()
dicts = {"127.0.0.1:429 (默认)": "127.0.0.1:429",
f"103.46.128.21:51203 (公网)": "103.46.128.21:51203",
f"{get_host_ip()}:429 (私网)": f"{get_host_ip()}:429",
"EXAM-41:429 (微机室)": "EXAM-41:429"}
def threading(Daemon, **kwargs):
thread = Thread(**kwargs)
thread.setDaemon(Daemon)
thread.start()
return thread
def to_logging(command):
def logs(*args, **kwargs):
try:
_result = command(*args, **kwargs)
if _result is None:
return True
return _result
except socket.timeout:
return
except (ConnectionResetError, ConnectionRefusedError, ConnectionAbortedError):
if "main" in globals():
main.ConnectionError_signal.emit()
return "CLOSE"
except:
if "main" in globals():
main.MessageUpdate_signal.emit(format_exc())
else:
logging.exception(str())
return False
return logs
@to_logging
def address_split(address):
def func(ip="", port=0) -> (str, int):
return ip, int(port)
ip, port = func(*address.strip().split(':')[:2])
return ip, port
class Socket(SocketClient):
def __init__(self, Function=lambda i: None, code='utf-8'):
super(Socket, self).__init__(code)
self.handler = message_handle(self.send)
def parse_argument(self, arg: str) -> str:
return self.handler.handle(arg.strip())
def recv(self):
result = super(Socket, self).recv()
if isinstance(result, str) and result:
self._traceback(f'{result} <font size=1>{covert(len(result))}</font>')
return self.isOpen()
def forever_receive(self) -> None:
self.handler.send_text(self.header)
while True:
if not self.recv():
return
def run(self): # 线程
threading(True, target=self.forever_receive)
def quitEvent(self):
self.__is_connect = False
if main.is_setup:
main.ConnectionError_signal.emit()
class login(QtWidgets.QDialog):
def __init__(self, parent=None):
super(login, self).__init__(parent)
self.Fadein = Animation(self)
self.setupUi()
self.setWindowIcon(QtGui.QIcon("images/login.ico"))
def destory_window(self):
if hasattr(self, "success"):
main.SetupUi()
self.close()
def closeEvent(self, event):
if not hasattr(self, "Fadeout"):
self.Fadeout = Animation(self, from_value=1, to_value=0, connect=self.destory_window)
event.ignore()
return
def setupUi(self):
self.setObjectName("Dialog")
self.resize(252, 339)
font = QtGui.QFont()
font.setFamily("Consolas")
self.setFont(font)
self.setStyleSheet("QWidgetn"
"{border:15px;n"
"margin:10px;n"
"background-color:rgb(255, 255, 255);n"
"}n"
"QWidget#right_widgetn"
"{border-radius:15;}n"
"n"
"")
self.gridLayout = QtWidgets.QGridLayout(self)
self.gridLayout.setObjectName("gridLayout")
self.label_3 = QtWidgets.QLabel(self)
font = QtGui.QFont()
font.setFamily("Adobe Arabic")
font.setPointSize(11)
self.label_3.setFont(font)
self.label_3.setStyleSheet("font-color:rgb(0, 0, 0)n"
"font-size:20px")
self.label_3.setObjectName("label_3")
self.gridLayout.addWidget(self.label_3, 2, 0, 1, 1)
self.label_2 = QtWidgets.QLabel(self)
self.label_2.setStyleSheet("color: rgb(99, 99, 99);")
self.label_2.setObjectName("label_2")
self.gridLayout.addWidget(self.label_2, 3, 0, 1, 3)
self.label = QtWidgets.QLabel(self)
font = QtGui.QFont()
font.setFamily("Consolas")
font.setPointSize(22)
font.setBold(False)
font.setWeight(50)
self.label.setFont(font)
self.label.setStyleSheet("color: rgb(0, 0, 0);")
self.label.setObjectName("label")
self.gridLayout.addWidget(self.label, 0, 0, 1, 3)
self.lineEdit = QtWidgets.QLineEdit(self)
self.lineEdit.setStyleSheet(" QLineEditn"
" {border:0px;n"
" border-radius:0;n"
" margin:10px;n"
" border-bottom: 2px solid #B3B3B3;n"
" font-family:'等线';n"
" font-size:20px;n"
" font-weight:bold;}n"
" QLineEdit:hover{n"
" border-bottom:3px solid #66A3FF;n"
" }n"
" QLineEdit:focus{n"
" border-bottom:3px solid #E680BDn"
" }")
self.lineEdit.setClearButtonEnabled(True)
self.lineEdit.setObjectName("lineEdit")
self.gridLayout.addWidget(self.lineEdit, 1, 0, 1, 3)
self.pushButton = QtWidgets.QPushButton(self)
self.pushButton.setEnabled(False)
self.pushButton.setAccessibleName("")
self.pushButton.setStyleSheet(" QPushButtonn"
" {text-align : center;n"
" background-color:qlineargradient(spread:pad, x1:0, y1:0, x2:1, y2:0, stop:0 #fbc2eb, stop:1 #a6c1ee);n"
" font: bold;n"
" border-color: grey;n"
" border-width: 2px;n"
" border-radius: 10px;n"
" padding: 6px;n"
" height: 28px;n"
" border-style: outset;n"
" margin-left:30px;n"
" margin-right:30px;n"
" font-family:'黑体';n"
" font : 18px;}n"
"n"
" QPushButton:pressedn"
" {text-align : center;n"
" background-color : light gray;n"
" background-color:qlineargradient(spread:pad, x1:0, y1:0, x2:1, y2:0, stop:0 #e1aad2, stop:1 #92adda);n"
" font: bold;n"
" color:lightblue;n"
" border-color: gray;n"
" border-width: 2px;n"
" border-radius: 10px;n"
" padding: 6px;n"
" height : 28px;n"
" border-style: outset;n"
" font-family:'黑体';n"
" font : 18px;}n"
" QPushButton:hover:!pressedn"
" {color:red;}")
self.pushButton.setObjectName("pushButton")
self.gridLayout.addWidget(self.pushButton, 4, 0, 1, 3)
self.comboBox = QtWidgets.QComboBox(self)
self.comboBox.addItems(dicts)
self.comboBox.setStyleSheet(" QComboBoxn"
" {border:0px;n"
" border-radius:0;n"
" margin:10px;n"
" border-bottom: 2px solid #B3B3B3;n"
" font-family:'等线';n"
" font-size:20px;}n"
" QComboBox:hover{n"
" border-bottom:3px solid #66A3FF;n"
" }n"
" QComboBox:focus{n"
" border-bottom:3px solid #E680BD;n"
" }n"
"")
self.comboBox.setEditable(True)
self.comboBox.setObjectName("comboBox")
self.gridLayout.addWidget(self.comboBox, 2, 1, 1, 2)
QtCore.QMetaObject.connectSlotsByName(self)
_translate = QtCore.QCoreApplication.translate
self.setWindowTitle(_translate("Dialog", "Login"))
self.label_3.setText(_translate("Dialog", "连接地址"))
self.label_2.setText(_translate("Dialog", "[!]未注册将会自动注册"))
self.label.setText(_translate("Dialog", "Zmh server"))
self.lineEdit.setPlaceholderText(_translate("Dialog", "用户名(3-15字符)"))
self.pushButton.setText(_translate("Dialog", " 登录"))
self.lineEdit.textChanged.connect(self.setEnableds)
self.pushButton.clicked.connect(self._connect)
self.show() # 有动画了!
def setEnableds(self, _):
if 3 <= len(self.lineEdit.text().strip()) <= 15:
self.pushButton.setEnabled(True)
@to_logging
def _connect(self, i):
texts = self.comboBox.currentText()
addr = address_split(dicts.get(texts, texts))
if addr is False:
QtWidgets.QMessageBox.warning(self, "Error", f"[IP地址:端口]格式不正确!")
user = self.lineEdit.text().strip()
self.pushButton.setEnabled(False)
QtWidgets.QApplication.processEvents()
s.change_address(*addr)
s.change_header(user)
s.set_failEvent(self.info)
if s.connect():
self.success = 1
self.close()
return
self.pushButton.setEnabled(True)
def info(self, text):
QtWidgets.QMessageBox.information(self, "Login", text)
def main_surface(self):
main.SetupUi()
self.destory_window()
class MainTalk(QtWidgets.QMainWindow):
ConnectionError_signal = QtCore.pyqtSignal()
MessageUpdate_signal = QtCore.pyqtSignal(str)
def __init__(self):
super(MainTalk, self).__init__()
self.ConnectionError_signal.connect(self.ConnectionError)
self.MessageUpdate_signal.connect(self.Show_Message)
self.is_setup = False
def SetupUi(self):
self.anim = Animation(self)
self.setObjectName("MainWindow")
self.resize(800, 619)
font = QtGui.QFont()
font.setFamily("Consolas")
self.setFont(font)
self.centralwidget = QtWidgets.QWidget(self)
self.centralwidget.setObjectName("centralwidget")
self.gridLayout = QtWidgets.QGridLayout(self.centralwidget)
self.gridLayout.setObjectName("gridLayout")
self.lineEdit_3 = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit_3.setReadOnly(True)
self.lineEdit_3.setObjectName("lineEdit_3")
self.gridLayout.addWidget(self.lineEdit_3, 7, 3, 1, 1)
self.pushButton = QtWidgets.QPushButton(self.centralwidget)
self.pushButton.setStyleSheet("background-color:rgb(44, 176, 13);n"
"color:rgb(255, 255, 255);n"
"font: 200 10pt "Consolas";")
self.pushButton.setObjectName("pushButton")
self.gridLayout.addWidget(self.pushButton, 8, 6, 1, 1)
spacerItem = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
self.gridLayout.addItem(spacerItem, 8, 5, 1, 1)
self.label_2 = QtWidgets.QLabel(self.centralwidget)
self.label_2.setObjectName("label_2")
self.gridLayout.addWidget(self.label_2, 6, 1, 1, 1)
spacerItem1 = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
self.gridLayout.addItem(spacerItem1, 8, 3, 1, 1)
self.lineEdit_2 = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit_2.setReadOnly(True)
self.lineEdit_2.setObjectName("lineEdit_2")
self.gridLayout.addWidget(self.lineEdit_2, 6, 3, 1, 1)
self.label = QtWidgets.QLabel(self.centralwidget)
self.label.setObjectName("label")
self.gridLayout.addWidget(self.label, 5, 1, 1, 1)
self.lineEdit = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit.setCursor(QtGui.QCursor(QtCore.Qt.IBeamCursor))
self.lineEdit.setDragEnabled(False)
self.lineEdit.setReadOnly(True)
self.lineEdit.setObjectName("lineEdit")
self.gridLayout.addWidget(self.lineEdit, 5, 3, 1, 1)
self.label_3 = QtWidgets.QLabel(self.centralwidget)
self.label_3.setObjectName("label_3")
self.gridLayout.addWidget(self.label_3, 7, 1, 1, 1)
self.line = QtWidgets.QFrame(self.centralwidget)
self.line.setFrameShape(QtWidgets.QFrame.VLine)
self.line.setFrameShadow(QtWidgets.QFrame.Sunken)
self.line.setObjectName("line")
self.gridLayout.addWidget(self.line, 5, 4, 3, 1)
self.textEdit = QtWidgets.QTextEdit(self.centralwidget)
self.textEdit.setObjectName("textEdit")
self.gridLayout.addWidget(self.textEdit, 5, 5, 3, 2)
spacerItem2 = QtWidgets.QSpacerItem(20, 40, QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Expanding)
self.gridLayout.addItem(spacerItem2, 1, 1, 1, 1)
self.textEdit_2 = QtWidgets.QTextBrowser(self.centralwidget)
self.textEdit_2.setObjectName("textEdit_2")
self.textEdit_2.setReadOnly(True)
self.gridLayout.addWidget(self.textEdit_2, 0, 3, 2, 4)
self.pushButton_2 = QtWidgets.QPushButton(self.centralwidget)
self.pushButton_2.setObjectName("pushButton_2")
#
self.sendButton = QtWidgets.QPushButton(QtGui.QIcon("images/upload.png"), "上传文件", self.centralwidget)
self.sendButton.setObjectName("send - pushButton")
self.gridLayout.addWidget(self.sendButton, 8, 5)
#
self.gridLayout.addWidget(self.pushButton_2, 8, 1, 1, 1)
self.setCentralWidget(self.centralwidget)
self.menubar = QtWidgets.QMenuBar(self)
self.menubar.setGeometry(QtCore.QRect(0, 0, 800, 24))
self.menubar.setObjectName("menubar")
self.menu = QtWidgets.QMenu(self.menubar)
self.menu.setObjectName("menu")
self.menulanguage = QtWidgets.QMenu(self.menu)
self.menulanguage.setObjectName("menulanguage")
self.setMenuBar(self.menubar)
self.statusbar = QtWidgets.QStatusBar(self)
self.statusbar.setObjectName("statusbar")
self.setStatusBar(self.statusbar)
self.actionsocket_connet = QtWidgets.QAction(self)
self.actionsocket_connet.setObjectName("actionsocket_connet")
self.actionChinese = QtWidgets.QAction(self)
self.actionChinese.setObjectName("actionChinese")
self.actionip_socket_gethostbyname_socket_gethostname = QtWidgets.QAction(self)
self.actionip_socket_gethostbyname_socket_gethostname.setObjectName(
"actionip_socket_gethostbyname_socket_gethostname")
self.menulanguage.addSeparator()
self.menulanguage.addAction(self.actionChinese)
self.menu.addSeparator()
self.menu.addAction(self.menulanguage.menuAction())
self.menu.addAction(self.actionip_socket_gethostbyname_socket_gethostname)
self.menubar.addAction(self.menu.menuAction())
self.socket_peername = s.addr[0]
self.retranslateUi()
QtCore.QMetaObject.connectSlotsByName(self)
self.pushButton.clicked.connect(self.send("MSG"))
self.pushButton_2.clicked.connect(self.re_connect)
self.textEdit.textChanged.connect(self.tc)
self.textEdit_2.setOpenLinks(False)
self.textEdit_2.setOpenExternalLinks(False)
self.textEdit_2.anchorClicked.connect(self.anchor)
self.sendButton.clicked.connect(self.send("FILE"))
self.connectEnabled(True)
self.is_setup = True
self.show()
def anchor(self, res: QtCore.QUrl):
index = res.toString()
s.handler.request_file(index)
@to_logging
def sendfile(self):
for file in QtWidgets.QFileDialog.getOpenFileNames(self, "上传文件")[0]:
if os.path.isfile(file):
s.handler.send(file)
@to_logging
def sendmsg(self):
s.handler.send_text(self.textEdit.toPlainText().strip())
self.textEdit.clear()
def tc(self, _=0):
if 0 < len(self.textEdit.toPlainText().strip()) <= 1000:
self.pushButton.setEnabled(True)
else:
self.pushButton.setEnabled(False)
self.pushButton.setText(
QtCore.QCoreApplication.translate("MainWindow", f"发送({len(self.textEdit.toPlainText().strip())} / 1000)"))
def send(self, type):
_call = {"MSG": self.sendmsg, "FILE": self.sendfile}.get(type, lambda: True)
@to_logging
def function(_):
if s.is_connect():
if not _call():
self.ConnectionError_signal.emit()
else:
self.MessageUpdate_signal.emit("<font color='red'>发送异常. 未连接至服务器.请点击[重新连接服务器]按钮尝试重新连接.</font>")
return function
def ConnectionError(self):
QtWidgets.QMessageBox.information(self, 'TraceBack', f'Socket Server<{self.socket_peername}> 断开连接')
self.connectEnabled(False)
def retranslateUi(self):
_translate = QtCore.QCoreApplication.translate
self.setWindowTitle(_translate("MainWindow", "Socket"))
self.lineEdit_2.setText(socket.gethostname())
self.lineEdit.setText(socket.gethostbyname(socket.gethostname()))
self.lineEdit_3.setText(self.socket_peername)
self.tc()
self.label_2.setText(_translate("MainWindow", "主机名:"))
self.label.setText(_translate("MainWindow", "本地端口:"))
self.label_3.setText(_translate("MainWindow", "连接端口:"))
self.pushButton_2.setText(_translate("MainWindow", "重新连接服务器"))
self.menu.setTitle(_translate("MainWindow", "设置"))
self.menulanguage.setTitle(_translate("MainWindow", "language"))
self.actionsocket_connet.setText(_translate("MainWindow", "socket connect"))
self.actionChinese.setText(_translate("MainWindow", "Chinese"))
self.MessageUpdate_signal.emit(f'<font color="red">欢迎来到服务器[{s.socket.getpeername()[0]}].您的ip地址为{s.addr[0]}')
self.actionip_socket_gethostbyname_socket_gethostname.setText(
_translate("MainWindow", "ip: " socket.gethostbyname(socket.gethostname())))
s.set_failEvent(self.MessageUpdate_signal.emit)
s.run()
@to_logging
def re_connect(self, _):
self.MessageUpdate_signal.emit(
"[{}]: 尝试连接服务器[{}],最大超时报错 {}s".format(datetime.now().strftime('%Y %m %d %H:%M:%S'), s.addr[0], TIMEOUT))
QtWidgets.QApplication.processEvents()
s.set_failEvent(self.MessageUpdate_signal.emit)
status = s.connect()
self.connectEnabled(status)
if status:
s.run()
def connectEnabled(self, status):
self.pushButton_2.setEnabled(not status)
def Show_Message(self, data: str) -> None:
# self.MessageUpdate_signal -> [signal]; self.MessageUpdate_signal.emit(self, *args, **kwargs). #
if data:
for i in data.split('n'):
if i:
sleep(0.06 / len(data.split('n'))) # 防止信息过快使Textedit刷新空白
self.textEdit_2.append(i)
self.textEdit_2.moveCursor(QtGui.QTextCursor.End)
def save_bytes(file, byte: bytes):
with open(file, "wb") as f:
f.write(byte)
def get_eval(str, defined=None):
try:
res = eval(str)
if isinstance(res, type(defined)):
return res
raise TypeError
except:
return defined
class SEND:
def __init__(self, index, name, conn: callable, encode='utf8', localfile=""):
self.localfile = localfile
self.encode = encode
self.size = os.path.getsize(self.localfile)
self.total = math.ceil(self.size / segment)
self.index = index
self.conn = conn
self.finish = False
self.name = name
asyncio.run_coroutine_threadsafe(self.update(), loop)
self.fd_index = len(file_dialog.filedict) # pyqtSignal无返回值
file_dialog.new_file.emit([self.name, self.total, self.size, (file_dialog.UPLOAD, localfile)])
async def update(self):
self.conn(self.header().encode(self.encode))
with open(self.localfile, "rb") as f:
for n in range(self.total):
self.conn(self.format(n, compress(f.read(segment))).encode(self.encode))
file_dialog.update_file.emit(self.fd_index)
await asyncio.sleep(delay)
self.finish = True
def cut(self, byte: bytes, seg=segment) -> list:
return [byte[x:x seg] for x in range(0, len(byte), seg)]
def format(self, process, data) -> str:
return repr((update_file, (self.index, process, data)))
def header(self) -> str:
return repr((new_file, (self.index,
self.name,
self.total,
self.size)
))
class RECV:
def __init__(self, index: int, name: str, total: int, size: int):
self.index, self.name, self.total, self.size = index, name, total, size
self.progress = -1
self.file = []
self.finish = False
self.fd_index = len(file_dialog.filedict) # pyqtSignal无返回值
self.save_path = os.path.join(save_path, self.name)
file_dialog.new_file.emit(
[self.name, self.total, self.size, (file_dialog.DOWNLOAD, os.path.join(save_path, self.name))])
def update(self, p, data):
if isinstance(p, int) and p - 1 == self.progress:
self.progress = p
self.file.append(decompress(data))
self.update_file()
file_dialog.update_file.emit(self.fd_index)
if len(self.file) == self.total:
logging.info(f"Save {self.name} at {self.save_path}, size {self.size} b.")
self.finish = True
return True
def update_file(self):
with open(self.save_path, "ab") as f:
f.write(self.file[-1])
def save(self):
return self.finish
def savepath(self) -> (str, bool):
if self.finish:
return self.save_path
return False
def start(self):
if self.finish:
os.startfile(self.savepath())
class send_files:
def __init__(self, encode='utf8'):
self.sends = []
self.encode = encode
def localfile(self, file, conn):
if os.path.isfile(file):
_, name = os.path.split(file)
self.sends.append(SEND(len(self.sends), name, conn, localfile=file)) # index: len(self.sends)-1 1 => len(self.sends)
class recv_files:
def __init__(self, decode='utf8', path=None):
self.recvs = []
self.decode = decode
if path is None:
path = sys.path[0]
self.path = path
def new_files(self, index, name, total, size):
self.recvs.append(RECV(index, name, total, size))
def get_index(self,
index: int) -> RECV: # 致命bug, 如若用self.recvs[index] 排序方法不一, 用户端不可能从第一个文件开始请求, 然后排着序请求1,2,3,4,5,6... 一个不能少, 所以用以下方法
for x in self.recvs:
if x.index == index:
return x
def in_list(self, index) -> bool:
return bool(self.get_index(index))
def update(self, index, progess, data) -> bool:
if self.in_list(index):
if self.get_index(index).update(progess, data):
return self.save(index)
def save(self, index):
if self.in_list(index):
return self.get_index(index).save()
class message_handle:
codec = "utf8"
def __init__(self, func: callable = lambda _: True):
self.Sender = send_files(self.codec, )
self.Receiver = recv_files(self.codec, save_path)
self.func = func
self.Progress = []
def handle(self, data):
_res = get_eval(data, (None,))
if len(_res) == 2:
type, arguments = _res
if type == new_file:
self.Receiver.new_files(*arguments)
elif type == update_file:
self.Receiver.update(*arguments)
elif type == request_file:
path = self.Receiver.get_index(arguments).savepath()
if path:
self.Sender.localfile(path, self.func) # 如若无, 报错False
elif type == normal_text:
return arguments
def send(self, sendpath):
return self.Sender.localfile(sendpath, self.func)
def send_text(self, mes: str):
return self.func(repr((normal_text, mes)).encode(self.codec))
def request_file(self, name):
index = get_eval(name, 0)
if self.in_list(index):
self.get_index(index).start()
return
self.func(repr((request_file, index)).encode(self.codec))
def get_index(self, index) -> RECV:
return self.Receiver.get_index(index)
def in_list(self, index):
return self.Receiver.in_list(index)
def covert(byte, fine=False):
"""
位 bit (比特)(Binary Digits):存放一位二进制数,即 0 或 1,最小的存储单位。
字节 byte:8个二进制位为一个字节(B),最常用的单位。
其中1024=2^10 ( 2 的10次方),
1KB (Kilobyte 千字节)=1024B,
1MB (Megabyte 兆字节 简称“兆”)=1024KB,
1GB (Gigabyte 吉字节 又称“千兆”)=1024MB,
1TB (Trillionbyte 万亿字节 太字节)=1024GB,
1PB(Petabyte 千万亿字节 拍字节)=1024TB,
1EB(Exabyte 百亿亿字节 艾字节)=1024PB,
1ZB (Zettabyte 十万亿亿字节 泽字节)= 1024 EB,
1YB (Jottabyte 一亿亿亿字节 尧字节)= 1024 ZB,
1BB (Brontobyte 一千亿亿亿字节)= 1024 YB.
「山木兮」"""
if not isinstance(byte, (int, float)):
byte = len(byte)
DEI = f"{byte} bytes"
base = 1024
units = ["b",
"Kb",
"Mb",
"Gb",
"Tb",
"Pb",
"Eb"]
index = 0
while True:
if byte < 1024 or index 1 >= len(units):
break
byte /= base
index = 1
if index == 0:
return DEI
else:
if fine:
return "%0.1f%s(%s)" % (byte, units[index], DEI)
else:
return "%0.1f%s" % (byte, units[index])
class ListWidgetLayout(QtWidgets.QWidget):
def __init__(self, name, total, parent=None):
super(ListWidgetLayout, self).__init__(parent)
hbox = QtWidgets.QHBoxLayout(self)
vbox = QtWidgets.QVBoxLayout()
vbox.addWidget(QtWidgets.QLabel(name f"n({covert(total * segment)})", self))
progress = QtWidgets.QProgressBar()
progress.setMaximum(total)
progress.setStyleSheet("QProgressBar{n"
"text-align: center;n"
'font: 9pt "Consolas";n'
"}")
vbox.setObjectName("name, speed Info")
hbox.setObjectName("Info and Progress")
progress.setTextVisible(True)
progress.setRange(0, 0)
self.label = QtWidgets.QLabel(self)
self.label.setStyleSheet("color: rgb(60, 112, 255);")
vbox.addWidget(self.label)
self.total = total
self.index = 0
self.progress = progress
vbox.setSpacing(2)
hbox.addLayout(vbox)
hbox.addWidget(progress)
self.setLayout(hbox)
fonts = QtGui.QFont()
fonts.setFamily("Consolas")
fonts.setPointSize(9)
self.setFont(fonts)
self.label.setFont(fonts)
self.start_time = time.time()
self.timer = QtCore.QTimer(self)
self.timer.timeout.connect(self.timers)
self.timer.start(50)
def timers(self, *args) -> None:
timeit = self.get_time()
if timeit == 0:
return # ZeroDivisionError: integer division or modulo by zero
size = self.index * segment
contect = f"{covert(int(size / timeit))}/s ({self.str_time()} 秒)"
self.label.setText(contect)
def get_progress(self) -> QtWidgets.QProgressBar:
return self.progress
def is_finished(self) -> bool:
return bool(self.index >= self.total)
def update(self) -> None:
if self.index == 0:
self.progress.setMaximum(self.total)
if self.is_finished():
return
self.index = 1
if self.is_finished():
self.end_time = time.time()
self.progress.setValue(self.index)
def get_total(self) -> int:
return self.total
def get_time(self) -> float:
return getattr(self, "end_time", time.time()) - self.start_time
def str_time(self) -> str:
return "%0.1f" % (self.get_time())
def get_index(self) -> int:
return self.index
class FileDialog(QtWidgets.QDialog):
update_file = QtCore.pyqtSignal(int)
new_file = QtCore.pyqtSignal(list)
def __init__(self, save_path, parent=None):
self.current = ""
super(FileDialog, self).__init__(parent)
self.icon = QtGui.QIcon("images/file.png")
self.download = QtGui.QIcon("images/download.png")
self.upload = QtGui.QIcon("images/upload.png")
self.DOWNLOAD = 0
self.UPLOAD = 1
self.LOAD_dict = {self.DOWNLOAD: self.download,
self.UPLOAD: self.upload}
self.setWindowFlags(QtCore.Qt.WindowMinimizeButtonHint) # 只显示最小化按钮
self.save_path = save_path
self.filedict = {}
self.pathdict = {}
self.namedict = {}
self.update_file.connect(self.fileUpdate)
self.new_file.connect(self.newFile)
self.setObjectName("Dialog")
self.resize(666, 421)
font = QtGui.QFont()
font.setFamily("Consolas")
self.setFont(font)
self.setWindowIcon(self.icon)
self.gridLayout = QtWidgets.QGridLayout(self)
self.gridLayout.setObjectName("gridLayout")
self.label_4 = QtWidgets.QLabel(self)
self.label_4.setObjectName("label_4")
self.gridLayout.addWidget(self.label_4, 1, 2, 1, 1)
self.listWidget_2 = QtWidgets.QListWidget(self)
self.listWidget_2.setObjectName("listWidget_2")
self.gridLayout.addWidget(self.listWidget_2, 0, 0, 4, 1)
self.groupBox = QtWidgets.QGroupBox(self)
self.groupBox.setObjectName("groupBox")
self.gridLayout_2 = QtWidgets.QGridLayout(self.groupBox)
self.gridLayout_2.setObjectName("gridLayout_2")
self.label = QtWidgets.QLabel(self.groupBox)
self.label.setObjectName("label")
self.gridLayout_2.addWidget(self.label, 1, 0, 1, 2)
self.progressBar = QtWidgets.QProgressBar(self.groupBox)
font = QtGui.QFont()
font.setFamily("Consolas")
self.progressBar.setFont(font)
self.progressBar.setProperty("value", 0)
self.progressBar.setTextVisible(True)
self.progressBar.setObjectName("progressBar")
self.gridLayout_2.addWidget(self.progressBar, 0, 0, 1, 2)
self.label_3 = QtWidgets.QLabel(self.groupBox)
self.label_3.setObjectName("label_3")
self.gridLayout_2.addWidget(self.label_3, 2, 0, 1, 2)
self.pushButton = QtWidgets.QPushButton(self.groupBox)
self.pushButton.setObjectName("pushButton")
self.pushButton.setEnabled(False)
self.gridLayout_2.addWidget(self.pushButton, 4, 0, 1, 1)
self.label_2 = QtWidgets.QLabel(self.groupBox)
self.label_2.setObjectName("label_2")
self.gridLayout_2.addWidget(self.label_2, 3, 0, 1, 2)
self.pushButton_2 = QtWidgets.QPushButton(self.groupBox)
self.pushButton_2.setObjectName("pushButton_2")
self.gridLayout_2.addWidget(self.pushButton_2, 4, 1, 1, 1)
self.gridLayout.addWidget(self.groupBox, 0, 2, 1, 1)
self.lineEdit = QtWidgets.QLineEdit(self)
self.lineEdit.setReadOnly(True)
self.lineEdit.setObjectName("lineEdit")
self.gridLayout.addWidget(self.lineEdit, 2, 2, 1, 1)
self.line = QtWidgets.QFrame(self)
self.line.setFrameShape(QtWidgets.QFrame.VLine)
self.line.setFrameShadow(QtWidgets.QFrame.Sunken)
self.line.setObjectName("line")
self.gridLayout.addWidget(self.line, 0, 1, 4, 1)
spacerItem = QtWidgets.QSpacerItem(20, 40, QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Expanding)
self.gridLayout.addItem(spacerItem, 3, 2, 1, 1)
self.retranslateUi()
QtCore.QMetaObject.connectSlotsByName(self)
def retranslateUi(self):
_translate = QtCore.QCoreApplication.translate
self.setWindowTitle(_translate("Dialog", "Files"))
self.pushButton.setText(_translate("Dialog", "打开"))
self.pushButton_2.setText(_translate("Dialog", "打开文件夹"))
self.pushButton.clicked.connect(self.startfile)
self.pushButton_2.clicked.connect(self.startpath)
self.label_4.setText(_translate("Dialog", f"下载位置:"))
self.lineEdit.setText(_translate("Dialog", self.save_path))
self.listWidget_2.itemClicked.connect(self.fileChanged)
def newFile(self, args: list):
self.activateWindow() # 窗口置顶
index = len(self.filedict)
name, total, size, (type, path) = args
png = self.LOAD_dict.get(type, self.UPLOAD)
layout = ListWidgetLayout(name, total)
progress = layout.get_progress()
self.filedict[index] = (layout, size)
self.pathdict[index] = path
Item = QtWidgets.QListWidgetItem(png, "", self.listWidget_2)
Item.index_name = index # <-
Item.setSizeHint(QtCore.QSize(200, 80))
self.listWidget_2.addItem(Item)
self.listWidget_2.setItemWidget(Item, layout)
self.namedict[index] = name
# if not self.current:
# self.changeCurrent(0)
self.changeCurrent(index)
def changeCurrent(self, index: int):
self.current = index
self.changeGroupBox()
self.show()
def fileChanged(self, widget: QtWidgets.QListWidgetItem):
self.current = widget.index_name
self.changeGroupBox()
def changeGroupBox(self):
name = self.namedict[self.current]
layout, size = self.filedict[self.current]
layout: ListWidgetLayout
size = covert(size, True)
_translate = QtCore.QCoreApplication.translate
self.groupBox.setTitle(_translate("Dialog", f"{name}"))
self.label.setText(_translate("Dialog", f"文件大小: {size}"))
self.label_3.setText(_translate("Dialog", f"文件名: {name}"))
self.label_2.setText(_translate("Dialog", f"位置: {os.path.join(self.save_path, name)}"))
self.progressBar.setMaximum(layout.get_total())
self.progressBar.setValue(layout.get_index())
self.pushButton.setEnabled(layout.is_finished())
self.fpath = self.pathdict[self.current]
self.path = os.path.dirname(self.fpath)
def fileUpdate(self, index: int):
layout, size = self.filedict[index]
layout: ListWidgetLayout
layout.update()
if index == self.current:
self.changeGroupBox()
def startfile(self, _):
if hasattr(self, "fpath") and os.path.isfile(self.fpath):
os.startfile(self.fpath)
def startpath(self, _):
if hasattr(self, "path") and os.path.isdir(self.path):
os.startfile(self.path)
save_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "resource")
if not os.path.isdir(save_path):
os.makedirs(save_path)
if __name__ == "__main__":
file_thread.start()
s = Socket()
app = QtWidgets.QApplication(sys.argv)
conn = login()
main = MainTalk()
file_dialog = FileDialog(save_path)
sys.exit(app.exec_())