TCP聊天 传输文件服务器服务器套接字v1.9
这, 划时代的版本更新, 修复若干大bug, 文件/信息区分, 最最最重要的一点是, 程序可以传文件了
!!!
类似于QQ等软件(只不过.css比不上)
所有版本记录:
v1.0
: TCP聊天服务器套接字|PyQt5 socket(TCP端口映射 端口放行) logging Thread(含日志,html) anaconda打包32位exe(3.4万字)|python高阶v1.1
: python TCP套接字服务器v1.1-新增服务端命令功能及修改bug(socket PyQt5)v1.2
: python TCP服务器v1.2 - 服务端新增用户登录注册(json, md5加密)v1.3
: python TCP服务器v1.3 - 服务器抗压测试及关闭套接字处理v1.4
: python TCP服务器v1.4 - 客户端连接服务器异常(异常情况分类)处理v1.5
: PyQt5可编辑下拉框(comboBox):editable - python TCP服务器v1.5 - 客户端连接界面增加自定义参数(设置超时, 连接地址可选)v1.6
: Python TCP服务器v1.6 - multiprocessing多进程及Ctrl-c(SIGINT)退出v1.7
: Python TCP服务器v1.7 - PyQt5 server服务端来临v1.8
: python TCP服务器v1.8 - PyQt5登录界面美化 淡入淡出
我也懒得解释了
原理是:
文章目录
- 客户端
-
-
-
-
-
- user.pyw
- ProgressBar.py
- file_handle.py
-
- 别急啊, 这还只是客户端, 代码还少.
-
-
- 服务端
-
-
-
-
-
- server.pyw
- data.py
-
-
-
- 我的v1.8->1.9日志(800mb)
上代码!
客户端
user.pyw
代码语言:javascript复制# -*- coding: utf-8 -*-
# Form implementation generated from reading ui files 'USER.ui', 'Connect.ui'
#
# Created by: PyQt5 UI code generator 5.15.4
#
# WARNING: Any manual changes made to this file will be lost when pyuic5 is
# run again. Do not edit this file unless you know what you are doing.
import time
import traceback
from PyQt5 import QtCore, QtGui, QtWidgets
import socket, sys, logging
from traceback import format_exc
from datetime import datetime
from time import sleep
from threading import Thread
from random import randint as rand
import logging # 引入logging模块
from file_handle import message_handle, Animation
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')
TIMEOUT = 2
def get_host_ip() -> str:
"""get current IP address"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
Username = str()
bytecount = 1024
dicts = {"127.0.0.1:429 (默认)" : "127.0.0.1:429",
f"103.46.128.21:51203 (公网)": "103.46.128.21:51203",
f"{get_host_ip()}:429 (私网)": f"{get_host_ip()}:429",
"EXAM-41:429 (微机室)": "EXAM-41:429"}
get_time = lambda: datetime.now().strftime('%H:%M:%S')
def threading(Daemon, **kwargs):
thread = Thread(**kwargs)
thread.setDaemon(Daemon)
thread.start()
return thread
def to_logging(command):
def logs(*args, **kwargs):
try:
_result = command(*args, **kwargs)
if _result is None:
return True
return _result
except ConnectionResetError:
if "s" in dir():
s._connect = False
except Exception as e:
if "main" in dir():
main.Show_Message(format_exc())
else:
logging.exception(str())
return False
return logs
@to_logging
def address_split(address):
def func(ip="", port=0) -> (str, int):
return ip, int(port)
ip, port = func(*address.strip().split(':')[:2])
return {"ip": ip,
"port": port}
class Socket:
def __init__(self, Function=lambda i: None, code='utf-8'):
self.socket = socket.socket()
self.code = code
self._logger = Function
self.socket.settimeout(TIMEOUT)
self._connect = False
def set_func(self, f):
self._logger = f
def retry(self):
del self.socket
self.socket = socket.socket()
self.socket.settimeout(TIMEOUT)
def socket_connect(self):
if hasattr(self, "_socc"):
self.retry() # socket多次连接不同地址会出现 `OSError: [WinError 10022] 提供了一个无效的参数.`
else:
self._socc = True
try:
self.socket.connect(self.addr)
except socket.gaierror:
return f"获取地址信息失败.请确保{self.addr[0]}是有效地址或ipv4/ipv6"
except socket.timeout:
return f"连接超时({TIMEOUT}s).服务器[{self.addr[0]}:{self.addr[1]}]连接失败."
except OverflowError:
return f"输入的端口号为{self.addr[1]},端口号必须在0~65535间."
except ConnectionResetError:
pass
except OSError as e:
if int(self.addr[1]) == 0:
return "[WinError 10049] 在其上下文中,该请求的地址无效"
return str(e.args[1]).rstrip('。')
except TypeError:
return f"网络地址格式错误: 格式[ip:port] port必须为整型变量0~65535间."
except:
logging.exception(msg=str())
else:
return True
def connect(self, data=None, ip=None, port: int = 0000, show=lambda _: None):
if data is None:
assert hasattr(self, "dt"), "user data is empty."
else:
self.dt = data
if ip is None:
assert hasattr(self, "addr"), "ip address is empty."
else:
self.addr = (ip, port)
result = self.socket_connect()
if result is True:
show("[{}]: 连接成功".format(get_time()))
s._connect = True
return True
else:
show(f"[{get_time()}]: {result}")
s._connect = False
return False
def info(self):
return repr(self.dt)
@to_logging
def _handler(self):
self.handle.send_text(self.dt)
while True:
try:
result = self.handle.handle(self.socket.recv(bytecount ** 2))
for data in result:
if data and isinstance(data, str):
kb = len(data) / bytecount
self._logger(f'{data} {"<font size=1>%0.2f kb</font>" % kb}')
except Exception as e:
if not type(e) == socket.timeout:
for n in ["", "ERROR".center(20, "-")] format_exc().split('n') ["".center(20, "-"), ""]:
self._logger(f"<font color='red'>{n}</font>")
self.socket.close()
return
def run(self): # 线程
self.handle = message_handle(self.socket.send)
self._connect = True
self.thread = threading(True, target=self._handler)
class login(QtWidgets.QWidget):
def __init__(self, parent=None):
super(login, self).__init__(parent)
self.Fadein = Animation(self)
self.setupUi()
def destory_window(self):
if hasattr(self, "success"):
main.SetupUi()
self.close()
def closeEvent(self, event):
if not hasattr(self,"Fadeout"):
self.Fadeout = Animation(self, from_value=1, to_value=0, connect=self.destory_window)
event.ignore()
return
def setupUi(self):
self.setObjectName("Dialog")
self.resize(252, 339)
font = QtGui.QFont()
font.setFamily("Consolas")
self.setFont(font)
self.setStyleSheet("QWidgetn"
"{border:15px;n"
"margin:10px;n"
"background-color:rgb(255, 255, 255);n"
"}n"
"QWidget#right_widgetn"
"{border-radius:15;}n"
"n"
"")
self.gridLayout = QtWidgets.QGridLayout(self)
self.gridLayout.setObjectName("gridLayout")
self.label_3 = QtWidgets.QLabel(self)
font = QtGui.QFont()
font.setFamily("Adobe Arabic")
font.setPointSize(11)
self.label_3.setFont(font)
self.label_3.setStyleSheet("font-color:rgb(0, 0, 0)n"
"font-size:20px")
self.label_3.setObjectName("label_3")
self.gridLayout.addWidget(self.label_3, 2, 0, 1, 1)
self.label_2 = QtWidgets.QLabel(self)
self.label_2.setStyleSheet("color: rgb(99, 99, 99);")
self.label_2.setObjectName("label_2")
self.gridLayout.addWidget(self.label_2, 3, 0, 1, 3)
self.label = QtWidgets.QLabel(self)
font = QtGui.QFont()
font.setFamily("Consolas")
font.setPointSize(22)
font.setBold(False)
font.setWeight(50)
self.label.setFont(font)
self.label.setStyleSheet("color: rgb(0, 0, 0);")
self.label.setObjectName("label")
self.gridLayout.addWidget(self.label, 0, 0, 1, 3)
self.lineEdit = QtWidgets.QLineEdit(self)
self.lineEdit.setStyleSheet(" QLineEditn"
" {border:0px;n"
" border-radius:0;n"
" margin:10px;n"
" border-bottom: 2px solid #B3B3B3;n"
" font-family:'等线';n"
" font-size:20px;n"
" font-weight:bold;}n"
" QLineEdit:hover{n"
" border-bottom:3px solid #66A3FF;n"
" }n"
" QLineEdit:focus{n"
" border-bottom:3px solid #E680BDn"
" }")
self.lineEdit.setClearButtonEnabled(True)
self.lineEdit.setObjectName("lineEdit")
self.gridLayout.addWidget(self.lineEdit, 1, 0, 1, 3)
self.pushButton = QtWidgets.QPushButton(self)
self.pushButton.setEnabled(False)
self.pushButton.setAccessibleName("")
self.pushButton.setStyleSheet(" QPushButtonn"
" {text-align : center;n"
" background-color:qlineargradient(spread:pad, x1:0, y1:0, x2:1, y2:0, stop:0 #fbc2eb, stop:1 #a6c1ee);n"
" font: bold;n"
" border-color: grey;n"
" border-width: 2px;n"
" border-radius: 10px;n"
" padding: 6px;n"
" height: 28px;n"
" border-style: outset;n"
" margin-left:30px;n"
" margin-right:30px;n"
" font-family:'黑体';n"
" font : 18px;}n"
"n"
" QPushButton:pressedn"
" {text-align : center;n"
" background-color : light gray;n"
" background-color:qlineargradient(spread:pad, x1:0, y1:0, x2:1, y2:0, stop:0 #e1aad2, stop:1 #92adda);n"
" font: bold;n"
" color:lightblue;n"
" border-color: gray;n"
" border-width: 2px;n"
" border-radius: 10px;n"
" padding: 6px;n"
" height : 28px;n"
" border-style: outset;n"
" font-family:'黑体';n"
" font : 18px;}n"
" QPushButton:hover:!pressedn"
" {color:red;}")
self.pushButton.setObjectName("pushButton")
self.gridLayout.addWidget(self.pushButton, 4, 0, 1, 3)
self.comboBox = QtWidgets.QComboBox(self)
self.comboBox.addItems(dicts)
self.comboBox.setStyleSheet(" QComboBoxn"
" {border:0px;n"
" border-radius:0;n"
" margin:10px;n"
" border-bottom: 2px solid #B3B3B3;n"
" font-family:'等线';n"
" font-size:20px;}n"
" QComboBox:hover{n"
" border-bottom:3px solid #66A3FF;n"
" }n"
" QComboBox:focus{n"
" border-bottom:3px solid #E680BD;n"
" }n"
"")
self.comboBox.setEditable(True)
self.comboBox.setObjectName("comboBox")
self.gridLayout.addWidget(self.comboBox, 2, 1, 1, 2)
QtCore.QMetaObject.connectSlotsByName(self)
_translate = QtCore.QCoreApplication.translate
self.setWindowTitle(_translate("Dialog", "Login"))
self.label_3.setText(_translate("Dialog", "连接地址"))
self.label_2.setText(_translate("Dialog", "[!]未注册将会自动注册"))
self.label.setText(_translate("Dialog", "Zmh server"))
self.lineEdit.setPlaceholderText(_translate("Dialog", "用户名(3-15字符)"))
self.pushButton.setText(_translate("Dialog", " 登录"))
self.lineEdit.textChanged.connect(self.setEnableds)
self.pushButton.clicked.connect(self._connect)
self.show()
def setEnableds(self, _):
if 3 <= len(self.lineEdit.text().strip()) <= 15:
self.pushButton.setEnabled(True)
@to_logging
def _connect(self,i):
texts = self.comboBox.currentText()
addr = address_split(dicts.get(texts, texts))
if addr is False:
QtWidgets.QMessageBox.warning(self, "Error", f"[IP地址:端口]格式不正确!")
user = self.lineEdit.text().strip()
self.pushButton.setEnabled(False)
if s.connect(**addr, data=user,show=self.info):
self.success = 1
self.close()
return
self.pushButton.setEnabled(True)
def info(self, text):
QtWidgets.QMessageBox.information(self, "Login", text)
def main_surface(self):
main.SetupUi()
self.destory_window()
class Ui_MainWindow(object):
def SetupUi(self):
MainWindow = QtWidgets.QMainWindow()
self.anim = Animation(MainWindow, )
MainWindow.setObjectName("MainWindow")
MainWindow.resize(800, 619)
font = QtGui.QFont()
font.setFamily("Consolas")
MainWindow.setFont(font)
self.centralwidget = QtWidgets.QWidget(MainWindow)
self.centralwidget.setObjectName("centralwidget")
self.gridLayout = QtWidgets.QGridLayout(self.centralwidget)
self.gridLayout.setObjectName("gridLayout")
self.lineEdit_3 = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit_3.setReadOnly(True)
self.lineEdit_3.setObjectName("lineEdit_3")
self.gridLayout.addWidget(self.lineEdit_3, 7, 3, 1, 1)
self.pushButton = QtWidgets.QPushButton(self.centralwidget)
self.pushButton.setStyleSheet("background-color:rgb(44, 176, 13);n"
"color:rgb(255, 255, 255);n"
"font: 200 10pt "Consolas";")
self.pushButton.setObjectName("pushButton")
self.gridLayout.addWidget(self.pushButton, 8, 6, 1, 1)
spacerItem = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
self.gridLayout.addItem(spacerItem, 8, 5, 1, 1)
self.label_2 = QtWidgets.QLabel(self.centralwidget)
self.label_2.setObjectName("label_2")
self.gridLayout.addWidget(self.label_2, 6, 1, 1, 1)
spacerItem1 = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
self.gridLayout.addItem(spacerItem1, 8, 3, 1, 1)
self.lineEdit_2 = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit_2.setReadOnly(True)
self.lineEdit_2.setObjectName("lineEdit_2")
self.gridLayout.addWidget(self.lineEdit_2, 6, 3, 1, 1)
self.label = QtWidgets.QLabel(self.centralwidget)
self.label.setObjectName("label")
self.gridLayout.addWidget(self.label, 5, 1, 1, 1)
self.lineEdit = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit.setCursor(QtGui.QCursor(QtCore.Qt.IBeamCursor))
self.lineEdit.setDragEnabled(False)
self.lineEdit.setReadOnly(True)
self.lineEdit.setObjectName("lineEdit")
self.gridLayout.addWidget(self.lineEdit, 5, 3, 1, 1)
self.label_3 = QtWidgets.QLabel(self.centralwidget)
self.label_3.setObjectName("label_3")
self.gridLayout.addWidget(self.label_3, 7, 1, 1, 1)
self.line = QtWidgets.QFrame(self.centralwidget)
self.line.setFrameShape(QtWidgets.QFrame.VLine)
self.line.setFrameShadow(QtWidgets.QFrame.Sunken)
self.line.setObjectName("line")
self.gridLayout.addWidget(self.line, 5, 4, 3, 1)
self.textEdit = QtWidgets.QTextEdit(self.centralwidget)
self.textEdit.setObjectName("textEdit")
self.gridLayout.addWidget(self.textEdit, 5, 5, 3, 2)
spacerItem2 = QtWidgets.QSpacerItem(20, 40, QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Expanding)
self.gridLayout.addItem(spacerItem2, 1, 1, 1, 1)
self.textEdit_2 = QtWidgets.QTextBrowser(self.centralwidget)
self.textEdit_2.setObjectName("textEdit_2")
self.textEdit_2.setReadOnly(True)
self.gridLayout.addWidget(self.textEdit_2, 0, 3, 2, 4)
self.pushButton_2 = QtWidgets.QPushButton(self.centralwidget)
self.pushButton_2.setObjectName("pushButton_2")
self.gridLayout.addWidget(self.pushButton_2, 8, 1, 1, 1)
MainWindow.setCentralWidget(self.centralwidget)
self.menubar = QtWidgets.QMenuBar(MainWindow)
self.menubar.setGeometry(QtCore.QRect(0, 0, 800, 24))
self.menubar.setObjectName("menubar")
self.menu = QtWidgets.QMenu(self.menubar)
self.menu.setObjectName("menu")
self.menulanguage = QtWidgets.QMenu(self.menu)
self.menulanguage.setObjectName("menulanguage")
MainWindow.setMenuBar(self.menubar)
self.statusbar = QtWidgets.QStatusBar(MainWindow)
self.statusbar.setObjectName("statusbar")
MainWindow.setStatusBar(self.statusbar)
self.actionsocket_connet = QtWidgets.QAction(MainWindow)
self.actionsocket_connet.setObjectName("actionsocket_connet")
self.actionChinese = QtWidgets.QAction(MainWindow)
self.actionChinese.setObjectName("actionChinese")
self.actionip_socket_gethostbyname_socket_gethostname = QtWidgets.QAction(MainWindow)
self.actionip_socket_gethostbyname_socket_gethostname.setObjectName(
"actionip_socket_gethostbyname_socket_gethostname")
self.menulanguage.addSeparator()
self.menulanguage.addAction(self.actionChinese)
self.menu.addSeparator()
self.menu.addAction(self.menulanguage.menuAction())
self.menu.addAction(self.actionip_socket_gethostbyname_socket_gethostname)
self.menubar.addAction(self.menu.menuAction())
self.socket_peername = s.addr[0]
self.retranslateUi(MainWindow)
QtCore.QMetaObject.connectSlotsByName(MainWindow)
self.MainWindow = MainWindow
self.pushButton.clicked.connect(self.send)
self.pushButton_2.clicked.connect(self.re_connect)
self.textEdit.textChanged.connect(self.tc)
self.textEdit_2.setOpenLinks(False)
self.textEdit_2.setOpenExternalLinks(False)
self.textEdit_2.anchorClicked.connect(self.anchor)
self.connectEnabled(True)
MainWindow.show()
def anchor(self, res:QtCore.QUrl):
index = res.toString()
s.handle.request_file(index)
@to_logging
def sendmsg(self):
if self.textEdit.toPlainText().strip() == "a":
s.handle.send(r"C:UserszhnweDesktopDDNS.txt")
s.handle.send_text(self.textEdit.toPlainText().strip())
self.textEdit.clear()
def tc(self, _=0):
if 0<len(self.textEdit.toPlainText().strip())<=1000:
self.pushButton.setEnabled(True)
else:
self.pushButton.setEnabled(False)
self.pushButton.setText(QtCore.QCoreApplication.translate("MainWindow", f"发送({len(self.textEdit.toPlainText().strip())} / 1000)"))
@to_logging
def send(self, _):
if hasattr(s, "_connect") and s._connect:
if not self.sendmsg():
QtWidgets.QMessageBox.information(self.MainWindow, 'TraceBack',
f'Socket Server<{self.socket_peername}> 断开连接')
self.connectEnabled(False)
s._connect = False
else:
self.Show_Message("<font color='red'>发送异常. 未连接至服务器.请点击[重新连接服务器]按钮尝试重新连接.</font>")
def retranslateUi(self, MainWindow):
_translate = QtCore.QCoreApplication.translate
MainWindow.setWindowTitle(_translate("MainWindow", "Socket"))
self.lineEdit_2.setText(socket.gethostname())
self.lineEdit.setText(socket.gethostbyname(socket.gethostname()))
self.lineEdit_3.setText(self.socket_peername)
self.tc()
self.label_2.setText(_translate("MainWindow", "主机名:"))
self.label.setText(_translate("MainWindow", "本地端口:"))
self.label_3.setText(_translate("MainWindow", "连接端口:"))
self.pushButton_2.setText(_translate("MainWindow", "重新连接服务器"))
self.menu.setTitle(_translate("MainWindow", "设置"))
self.menulanguage.setTitle(_translate("MainWindow", "language"))
self.actionsocket_connet.setText(_translate("MainWindow", "socket connect"))
self.actionChinese.setText(_translate("MainWindow", "Chinese"))
self.Show_Message(f'<font color="red">欢迎来到服务器[{s.socket.getpeername()[0]}].您的ip地址为{s.addr[0]}')
self.actionip_socket_gethostbyname_socket_gethostname.setText(
_translate("MainWindow", "ip: " socket.gethostbyname(socket.gethostname())))
s.set_func(self.Show_Message)
s.run()
@to_logging
def re_connect(self, _):
self.Show_Message(
"[{}]: 尝试连接服务器[{}],最大超时报错 {}s".format(datetime.now().strftime('%Y %m %d %H:%M:%S'), s.addr[0], TIMEOUT))
status = s.connect(show=self.Show_Message)
self.connectEnabled(status)
if status:
s.run()
def connectEnabled(self, status):
self.pushButton_2.setEnabled(not status)
def Show_Message(self, data):
if data:
for i in data.split('n'):
if i:
sleep(0.06 / len(data.split('n'))) # 防止信息过快使Textedit刷新空白
self.textEdit_2.append(i)
self.textEdit_2.moveCursor(QtGui.QTextCursor.End)
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
s = Socket()
conn = login()
main = Ui_MainWindow()
sys.exit(app.exec_())
ProgressBar.py
代码语言:javascript复制import sys
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
import math
font_name = "Share-TechMono"
def Animation(parent, type=b"windowOpacity", from_value=0, to_value=1, ms=1000, connect=None):
anim = QPropertyAnimation(parent, type)
anim.setDuration(ms)
anim.setStartValue(from_value)
anim.setEndValue(to_value)
if connect:
anim.finished.connect(connect)
anim.start()
return anim
class ProgressThread(QThread):
signal = pyqtSignal(int)
class QProgress(QWidget):
fsize = 10
bg_color = QColor("#95BBFF")
m_waterOffset = 0.005
m_offset = 50
m_borderwidth = 10
percent = 0
pen = QPen()
pen.setWidth(8)
pen.setCapStyle(Qt.RoundCap)
gradient = QConicalGradient(50, 50, 91)
gradient.setColorAt(0, QColor(255, 10, 10))
gradient.setColorAt(1, QColor(255, 201, 14))
pen.setBrush(gradient)
font = QFont()
font.setFamily(font_name) # Share-TechMono
def __init__(self,size,parent=None):
super(QProgress, self).__init__(parent)
self.resize(*size)
self.size = size
self.font.setPointSize(self.size[0] // 4)
def paintEvent(self, _):
width, height = self.size
rect = QRectF(self.fsize, self.fsize, width-self.fsize*2, height-self.fsize*2)
painter = QPainter(self)
#painter.begin(self)
rotateAngle = 360 * self.percent / 100
# 绘制准备工作,启用反锯齿
painter.setRenderHints(QPainter.Antialiasing)
painter.setPen(self.pen)
painter.drawArc(rect, (90 - 0) * 16, -rotateAngle * 16) # 画圆环
painter.setFont(self.font)
painter.setPen(QColor(153 - 1.53*self.percent ,
217-0.55*self.percent,
234-0.02*self.percent)) # r:255, g:201 - 10/100 * percent, b: 14-4 /100*percent 当前渐变
painter.drawText(rect, Qt.AlignCenter, f"{self.percent}%" ) # 显示进度条当前进度
painter.setPen(Qt.NoPen)
#获取窗口的宽度和高度
percentage = 1 - self.percent/100
# 水波走向:正弦函数 y = A(wx l) k
# w 表示 周期,值越大密度越大
w = 2 * math.pi / (width)
# A 表示振幅 ,理解为水波的上下振幅
A = height * self.m_waterOffset
# k 表示 y 的偏移量,可理解为进度
k = height *percentage
water1 = QPainterPath()
water2 = QPainterPath()
#起始点
water1.moveTo(5,height)
water2.moveTo(5,height)
self.m_offset = 0.6
if(self.m_offset >(width/2)):
self.m_offset = 0
i = 5
rect = QRectF(self.fsize 2,self.fsize 2,width - self.fsize*2 -4, height - self.fsize * 2 -4)
while(i < width-5):
water1.lineTo(i, A*math.sin(w*i self.m_offset ) k)
water2.lineTo(i, A*math.sin(w*i self.m_offset width/2*w) k)
i = 1
water1.lineTo(width-5,height)
water2.lineTo(width-5,height)
totalpath = QPainterPath()
painter.drawRect(self.rect())
painter.save()
totalpath.addEllipse(rect)
totalpath.intersected(water1)
painter.setPen(Qt.NoPen)
#设置水波的透明度
watercolor1 =QColor(self.bg_color)
watercolor1.setAlpha(100)
watercolor2 = QColor(self.bg_color)
watercolor2.setAlpha(150)
path = totalpath.intersected(water1)
painter.setBrush(watercolor1)
painter.drawPath(path)
path = totalpath.intersected(water2)
painter.setBrush(watercolor2)
painter.drawPath(path)
painter.restore()
painter.end()
self.update()
def update_percent(self, p):
self.percent = p
if self.m_waterOffset < 0.05:
self.m_waterOffset = 0.001
return p
class Progress(QDialog):
percent = 0
def __init__(self, text="",parent=None):
super(Progress, self).__init__(parent)
Font = QFont()
Font.setFamily("Consolas")
Font.setPointSize(12)
self.setFont(Font)
self.setWindowFlags(Qt.FramelessWindowHint) # 去边框
self.setAttribute(Qt.WA_TranslucentBackground) # 设置窗口背景透明
self.ProgressThread = ProgressThread()
self.ProgressThread.signal.connect(self.percentUpdate)
self.ProgressThread.start()
width, height = 230, 230
self.resize(width, height)
self.pg = QProgress((width, height), self)
self.label = QLabel(self)
self.label.move((width-self.label.width())/3, height/3)
self.button = QPushButton(self)
self.button.setText(QCoreApplication.translate("Dialog", text))
self.button.move((width-self.button.width())/3, height/3*2)
self.button.setEnabled(False)
self.button.clicked.connect(self.start)
QMetaObject.connectSlotsByName(self)
self.anim = Animation(self, )
self.show()
def percentUpdate(self, num:int=0):
self.percent = num
if self.percent == 100:
self.button.setEnabled(True)
self.pg.update_percent(self.percent)
def updateStrProgress(self, text):
self.label.setText(QCoreApplication.translate("Dialog", text))
def start(self, _=None):
"""Do nothing."""
if __name__ == '__main__':
app = QApplication(sys.argv)
r = Progress("write something")
sys.exit(app.exec_())
file_handle.py
代码语言:javascript复制import asyncio
import os
import sys
from threading import Thread
from ProgressBar import Progress, Animation
base = 1024
segment = base*2
delay = 0.2
new_file = 0
update_file = 1
request_file = 2
normal_text = 3
loop = asyncio.get_event_loop()
main_thread = Thread(target=loop.run_forever)
def save_bytes(file, byte:bytes):
with open(file, "wb") as f:
f.write(byte)
def get_eval(str, defined=None):
try:
res = eval(str)
if isinstance(res, type(defined)):
return res
raise TypeError
except:
return defined
class SEND:
def __init__(self, index, name, fbytes:bytes, conn:callable, encode='utf8'):
self.segs = self.cut(fbytes)
self.encode = encode
self.total = len(self.segs)
self.index = index
self.conn = conn
self.finish = False
self.name = name
self.size = len(fbytes)
asyncio.run_coroutine_threadsafe(self.update(), loop)
async def update(self):
self.conn(self.header().encode(self.encode))
for n, data in enumerate(self.segs):
self.conn(self.format(n, data).encode(self.encode))
await asyncio.sleep(delay)
self.finish = True
def cut(self, byte:bytes, seg=segment) -> list:
return [byte[x:x seg] for x in range(0, len(byte), seg)]
def format(self, process, data) -> str:
return repr( (update_file, (self.index, process, data)) )
def header(self) -> str:
return repr((new_file, (self.index,
self.name,
self.total,
self.size)
))
class RECV:
def __init__(self, index: int, name: str, total: int, size:int):
self.index,self.name,self.total,self.size = index, name, total, size
self.progress = -1
self.file = []
self.finish = False
#self.FILEPROGRESS = FileProgress()
def update(self, p, data):
if isinstance(p, int) and p - 1 == self.progress:
self.progress = p
self.file.append(data)
if len(self.file) == self.total:
self.finish = True
return True
def mix(self):
return b''.join(self.file)
def save(self, path, ):
if self.finish:
b = self.mix()
save_bytes(os.path.join(path, self.name), b)
self.__save = os.path.join(path, self.name)
print(f"Save {self.name} at {path}, size {len(b)} b.")
os.startfile(path)
del self.file #释放空间
return True
return False
def savepath(self) -> str:
if self.finish:
return self.__save
return False
class send_files:
def __init__(self, encode='utf8'):
self.sends = []
self.encode = encode
def new_sends(self, name, file, conn):
if type(file) == str:
file = file.encode(self.encode)
self.sends.append(SEND(len(self.sends),name,file,conn)) # index: len(self.sends)-1 1 => len(self.sends)
def localfile(self, file, conn):
if os.path.isfile(file):
_, name = os.path.split(file)
with open(file, "rb") as f:
file = f.read()
self.new_sends(name, file, conn)
class recv_files:
def __init__(self, decode='utf8', path=None):
self.recvs = []
self.decode = decode
if path is None:
path = sys.path[0]
self.path = path
def new_files(self, index, name, total, size):
self.recvs.append(RECV(index, name, total, size))
def apply(self, index, progess, data) -> bool:
if len(self.recvs) - 1 >= index:
if self.recvs[index].update(progess, data):
return self.save(index)
def save(self, index):
if len(self.recvs) -1 >= index:
return self.recvs[index].save(self.path)
class message_handle:
codec = "utf8"
def __init__(self, func:callable=lambda _:None):
save_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "resource")
if not os.path.isdir(save_path):
os.makedirs(save_path)
self.Sender = send_files(self.codec, )
self.Receiver = recv_files(self.codec, save_path)
self.func = func
def handle(self, bytes,):
for data in bytes.decode(self.codec).split("n"):
_res = get_eval(data, (None, ) )
if len(_res) == 2:
type, arguments = _res
if type == new_file:
self.Receiver.new_files(*arguments)
elif type == update_file:
self.Receiver.apply(*arguments)
elif type == request_file:
path = self.Receiver.recvs[arguments].savepath()
if path:
self.Sender.localfile(path, self.func) # 如若无, 报错False
elif type == normal_text:
yield arguments
def send(self, sendpath):
return self.Sender.localfile(sendpath, self.func)
def send_text(self, mes:str):
return self.func(repr((normal_text, mes)).encode(self.codec))
def request_file(self, name):
index = get_eval(name, 0)
self.func(repr((request_file, index)).encode(self.codec))
def get_index(self, index):
if index 1 <= len(self.Receiver.recvs):
return self.Receiver.recvs[index]
main_thread.start()
别急啊, 这还只是客户端, 代码还少.
服务端
server.pyw
代码语言:javascript复制import os, socket, sys, time, logging
from threading import Thread
import data #同目录data.py
from psutil import cpu_percent
from PyQt5 import QtCore, QtGui, QtWidgets
#from signal import SIGINT, signal
import asyncio
import os
import sys
from threading import Thread
__version__ = 1.9
base = 1024
segment = base*2
delay = 0.2
new_file = 0
update_file = 1
request_file = 2
normal_text = 3
class QLogger(logging.Handler):
def __init__(self, *args, **kwargs):
logging.Handler.__init__(self, *args, **kwargs)
self.setFormatter(logging.Formatter("[<font color='darkgreen'>%(asctime)s</font>(<font color='blue'>%(levelname)s</font>)]: <font color='brown'>%(message)s</font>"))
def emit(self, record):
record = self.format(record)
if hasattr(self, "output"):
self.output(record)
def connect(self, func):
if callable(func):
self.output = func
loop = asyncio.get_event_loop()
main_thread = Thread(target=loop.run_forever)
def threading(Daemon, **kwargs):
thread = Thread(**kwargs)
thread.setDaemon(Daemon)
thread.start()
return thread
def ignore(function):
def i(*args, **kwargs):
try:
function(*args, **kwargs)
except:
return
return i
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.DEBUG)
Qlog = QLogger()
logger.addHandler(Qlog)
filehandle = logging.FileHandler("log.txt")
filehandle.setFormatter(logging.Formatter("[%(asctime)s(%(levelname)s)]: %(message)s"))
logger.addHandler(filehandle)
logger.setLevel(logging.DEBUG)
bytecount = 1024
def to_logging(command):
def logs(*args, **kwargs):
try:
_result = command(*args, **kwargs)
if _result is None:
return True
return _result
except:
logger.exception(str())
return False
return logs
class Command_Handler(object):
def __init__(self, bind):
"""Bind Client class"""
assert isinstance(bind, Client)
self.client = bind
def _function(self, _list):
data = {"/info": {"-v": self.get_version(),
"-id": self.get_id(),
"-i": self.info(),
"-h": self.help(),
"-name": self.name()},
}
_dict = data
for n in range(len(_list)):
if type(_dict) == dict:
_dict = _dict.get(_list[n], self.unknown(" ".join(_list)))
else:
break
if type(_dict) == dict:
_dict = "Error:n<font color='blue'>This command must take more arguments. Such as %s.</font>" % list(
_dict.keys())
return _dict
@staticmethod
def help():
return """/info [-v] [-id] [-i]
-v : get version of program.
-id : get your id.
-i : get information.
-h : help.
-name : get your name
For example, <font color=red>/info -id</font>"""
@staticmethod
def get_version():
return "version : " str(__version__)
def get_id(self):
return "Your id is {}.".format(id(self.client))
def name(self):
return "Your name is {}.".format(self.client.username)
def info(self):
return f"Socket Server[version {self.get_version()}] By zmh."
def unknown(self, s):
return """Error:
No command named "%s". Please search [/info -h] to help.
%s""" % (s, self.help())
def cut(self, string):
return string.strip().split()
def handler(self, c):
return "<font color='gray'>[command]</font><font color='brown'>%s</font>n%s" % (
c, str(self._function(self.cut(c))))
def iscommand(self, i):
return i.strip().startswith("/")
class Server():
join_message = "<font color='red'>Server></font> <font color='blue'>%s(%s)</font> 连接服务器. 当前在线人数: <font color='red'>%s</font>"
user_message = "<font color='%s'>%s(%s)%s></font> %s"
quit_message = "%s(%s) 下线了, %s"
def __init__(self, gui=None):
self._call_func = bool(gui)
if self._call_func:
self.add_user = gui.add_user
self.remove_user = gui.remove_user
self.user_num_change = gui.usernumChanged
def Setup(self, addr, port, backlog=10, max_count=bytecount**2, encode='utf8'):
self.user_handle = message_handle(self)
self.address = addr, port
self.backlog = backlog
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind(self.address)
self.socket.listen(backlog)
self.max_count = max_count
self.connect = []
self.encode = encode
self.user_record = data.user()
return self.run()
def clear_socket(self, clear_ms = 500):
logger.info(f"Clear the closed socket once every {clear_ms} ms.")
while True:
del_list = list(filter(lambda c: hasattr(c, 'Quitted') or (not c.isOpen()), self.connect))
for user in del_list:
self.connect.remove(user)
#if del_list:
# logger.info(f"Clear the closed client socket, number is {len(del_list)}.")
#else:
# logger.info('None of the sockets have been cleaned.')
time.sleep(clear_ms / 1000)
def run(self):
logger.debug(f"pid {os.getpid()}.")
logger.info(f"Server [{':'.join(map(lambda i: str(i), self.address))}] on.")
logger.info("Backlog number: " str(self.backlog))
logger.info('The CODEC is sent as ' self.encode)
threading(Daemon=True, target=self.clear_socket)
return threading(Daemon=True, target=self.accept_client)
def _get_Clients(self) -> list:
def func(c):
return c.__filter__()
return list(filter(func, self.connect))
def _get_sockets(self): # return int
i = len(self._get_Clients())
if self._call_func:
self.user_num_change(i)
return i
def _str_sockets(self):
return f"当前人数 {self._get_sockets()}"
def ServerMessage(self, mes, inc=True):
for user in self._get_Clients():
if user.__filter__():
user._send(mes)
def UserMessage(self, address, _user, mes, inc=True):
if not mes:
return
for user in self.connect:
if user.__filter__():
username = user.username
send_message = Server.user_message % ("brown" if _user == username else "red",
_user,
address,
"(我自己)" if _user == username else "",
mes)
user._send(send_message)
logger.info(f"{address}[{_user}] : {mes}")
def error_handle(self):
for user in filter(lambda c: not c.isOpen(), self.connect):
self.connect.remove(user)
def accept_client(self):
while True:
logger.info("The server is listening on the port.")
client, address = self.socket.accept() # 阻塞,等待客户端连接
NewClient = Client(client, address[0], self)
self.connect.append(NewClient)
NewClient.run()
logger.info(f'The address {address[0]} is connected to the server')
def quit(self, username, address):
if self._call_func:
self.remove_user(username)
QuitMessage = Server.quit_message % (username, address, self._str_sockets())
logger.info(QuitMessage)
self.ServerMessage(QuitMessage, False)
def login(self, username, address):
if self._call_func:
self.add_user(username)
logger.info(f"{address}[{username}] 登录服务器 , " self._str_sockets())
self.ServerMessage(Server.join_message % (username, address, self._get_sockets()))
class Client(object):
class QuitError(Exception):
def __init__(self, *args):
super().__init__(*args)
def __init__(self, socket, addr, server: Server):
self.socket = socket
self.addr = addr
if not isinstance(server, Server):
raise ValueError
self.server = server
self.encode = self.server.encode
self.max_count = self.server.max_count
self.com = Command_Handler(self)
@self.error
def _recv(self) -> bytes:
return self.socket.recv(self.max_count).decode(encoding=self.encode).strip()
self._recv = lambda: _recv(self)
@self.error
def _send(self, message=None) -> None:
if message:
if isinstance(message,str):
message = repr((normal_text, message)).encode(self.encode)
message = b'n' #防止粘包
self.socket.sendall(message)
self._send = lambda m: _send(self, m)
def __del__(self):
self.socket.close()
def isLogin(self) -> bool:
return hasattr(self, "_login") and self._login
def isOpen(self) -> bool:
return not getattr(self.socket, "_closed", True)
def __filter__(self) -> bool:
"""返回是否在线并已可接受消息"""
return self.isLogin() and self.isOpen()
def recv(self) -> str:
while True:
result = self.server.user_handle.handle(self._recv(), self)
for x in result:
if isinstance(x, str):
return x #假装没看见 x_x ~~~
@ignore
def login(self):
self.username = self.recv()[:15]
if self.server.user_record.__in__(self.username):
self._send("<font color='red'>请输入您的密码: (右下[send]键发送)</font>")
i = self.recv()
if self.server.user_record.handler(self.username, i):
self._send(f'<font color="green">欢迎回来, {self.username}.</font>')
else:
self._send('<font color="red">密码错误,请重试.</font>')
self.__del__()
else:
def normal(string):
return (4 <= len(string) <= 10) and not ('n' in string)
while True:
self._send(
"<font color='blue'>[i]提示: 密码需在4 ~ 10位之间, 且不能换行.</font>n<font color='red'>请输入您的密码: (右下[send]键发送)</font>")
p1 = self.recv()
if normal(p1):
break
while True:
self._send("<font color='red'>再次输入您的密码: (右下[send]键发送)</font>")
p2 = self.recv()
if p1 == p2:
break
else:
self._send("<font color='red'>密码与前次不符!</font>")
self.server.user_record.handler(self.username, p1)
self._send(f'初来乍到, {self.username}')
self._login = True
self.server.login(self.username, self.addr)
def quit(self) -> None:
if hasattr(self, 'Quitted'):
return
self.Quitted = True
if self.isOpen() is True:
self.socket.close()
self.server.quit(self.username, self.addr)
@to_logging
def forever_receive(self):
self.login()
while self.__filter__():
string = self.recv()
if string is None:
continue
elif string == Client.QuitError:
return
elif self.com.iscommand(string):
self._send(self.com.handler(string))
else:
self.server.UserMessage(self.addr, self.username, string)
def error(self, func):
def function(*args, **kwargs):
try:
res = func(*args, **kwargs)
return res
except ConnectionAbortedError as e:
self.quit()
except Exception:
logger.exception("error")
return Client.QuitError
return function
def run(self):
self.thread = threading(True, target=self.forever_receive)
def get_host_ip() -> str:
"""get current IP address"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
class Interface(QtWidgets.QMainWindow):
Database_signal = QtCore.pyqtSignal(str)
def __init__(self):
super(Interface, self).__init__()
self.setupUi()
self.show()
def setupUi(self):
self.setObjectName("MainWindow")
self.resize(788, 685)
font = QtGui.QFont()
font.setFamily("Consolas")
font.setPointSize(11)
self.setFont(font)
self.setStyleSheet("")
self.centralwidget = QtWidgets.QWidget(self)
self.centralwidget.setObjectName("centralwidget")
self.gridLayout = QtWidgets.QGridLayout(self.centralwidget)
self.gridLayout.setObjectName("gridLayout")
self.label_6 = QtWidgets.QLabel(self.centralwidget)
self.label_6.setObjectName("label_6")
self.gridLayout.addWidget(self.label_6, 4, 0, 1, 1)
self.textEdit_2 = QtWidgets.QTextEdit(self.centralwidget)
self.textEdit_2.setObjectName("textEdit_2")
self.gridLayout.addWidget(self.textEdit_2, 5, 0, 1, 1)
self.groupBox = QtWidgets.QGroupBox(self.centralwidget)
self.groupBox.setObjectName("groupBox")
self.formLayout_2 = QtWidgets.QFormLayout(self.groupBox)
self.formLayout_2.setObjectName("formLayout_2")
self.label_2 = QtWidgets.QLabel(self.groupBox)
self.label_2.setObjectName("label_2")
self.formLayout_2.setWidget(0, QtWidgets.QFormLayout.LabelRole, self.label_2)
self.lineEdit = QtWidgets.QLineEdit(self.groupBox)
self.lineEdit.setObjectName("lineEdit")
self.formLayout_2.setWidget(0, QtWidgets.QFormLayout.FieldRole, self.lineEdit)
self.label_8 = QtWidgets.QLabel(self.groupBox)
self.label_8.setObjectName("label_8")
self.formLayout_2.setWidget(1, QtWidgets.QFormLayout.LabelRole, self.label_8)
self.lineEdit_3 = QtWidgets.QLineEdit(self.groupBox)
self.lineEdit_3.setObjectName("lineEdit_3")
self.formLayout_2.setWidget(1, QtWidgets.QFormLayout.FieldRole, self.lineEdit_3)
self.label_7 = QtWidgets.QLabel(self.groupBox)
self.label_7.setObjectName("label_7")
self.formLayout_2.setWidget(2, QtWidgets.QFormLayout.LabelRole, self.label_7)
self.lineEdit_2 = QtWidgets.QLineEdit(self.groupBox)
self.lineEdit_2.setReadOnly(True)
self.lineEdit_2.setObjectName("lineEdit_2")
self.formLayout_2.setWidget(2, QtWidgets.QFormLayout.FieldRole, self.lineEdit_2)
spacerItem = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
self.formLayout_2.setItem(5, QtWidgets.QFormLayout.LabelRole, spacerItem)
self.pushButton = QtWidgets.QPushButton(self.groupBox)
self.pushButton.setObjectName("pushButton")
self.formLayout_2.setWidget(5, QtWidgets.QFormLayout.FieldRole, self.pushButton)
self.lineEdit_4 = QtWidgets.QLineEdit(self.groupBox)
self.lineEdit_4.setObjectName("lineEdit_4")
self.formLayout_2.setWidget(3, QtWidgets.QFormLayout.FieldRole, self.lineEdit_4)
self.label_9 = QtWidgets.QLabel(self.groupBox)
self.label_9.setObjectName("label_9")
self.formLayout_2.setWidget(3, QtWidgets.QFormLayout.LabelRole, self.label_9)
self.label_10 = QtWidgets.QLabel(self.groupBox)
self.label_10.setObjectName("label_10")
self.formLayout_2.setWidget(4, QtWidgets.QFormLayout.LabelRole, self.label_10)
self.lineEdit_5 = QtWidgets.QLineEdit(self.groupBox)
self.lineEdit_5.setObjectName("lineEdit_5")
self.formLayout_2.setWidget(4, QtWidgets.QFormLayout.FieldRole, self.lineEdit_5)
self.gridLayout.addWidget(self.groupBox, 0, 0, 4, 1)
self.line = QtWidgets.QFrame(self.centralwidget)
self.line.setFrameShape(QtWidgets.QFrame.VLine)
self.line.setFrameShadow(QtWidgets.QFrame.Sunken)
self.line.setObjectName("line")
self.gridLayout.addWidget(self.line, 2, 1, 4, 1)
self.label_5 = QtWidgets.QLabel(self.centralwidget)
font = QtGui.QFont()
font.setFamily("Comic Sans MS")
font.setPointSize(14)
font.setBold(False)
font.setWeight(50)
self.label_5.setFont(font)
self.label_5.setStyleSheet("background-color:qlineargradient(spread:pad, x1:0, y1:0, x2:1, y2:1, stop:0 rgba(0, 255, 241, 255), stop:0.930348 rgba(0, 158, 255, 255));n"
"color:rgb(85, 0, 255)")
self.label_5.setObjectName("label_5")
self.gridLayout.addWidget(self.label_5, 0, 1, 1, 3)
self.groupBox_2 = QtWidgets.QGroupBox(self.centralwidget)
self.groupBox_2.setEnabled(False)
self.groupBox_2.setObjectName("groupBox_2")
self.gridLayout_2 = QtWidgets.QGridLayout(self.groupBox_2)
self.gridLayout_2.setObjectName("gridLayout_2")
self.lcdNumber = QtWidgets.QLCDNumber(self.groupBox_2)
self.lcdNumber.setObjectName("lcdNumber")
self.gridLayout_2.addWidget(self.lcdNumber, 1, 1, 1, 1)
self.line_3 = QtWidgets.QFrame(self.groupBox_2)
self.line_3.setFrameShape(QtWidgets.QFrame.HLine)
self.line_3.setFrameShadow(QtWidgets.QFrame.Sunken)
self.line_3.setObjectName("line_3")
self.gridLayout_2.addWidget(self.line_3, 3, 0, 1, 2)
self.listView_2 = QtWidgets.QListWidget(self.groupBox_2)
self.listView_2.setObjectName("listView_2")
self.gridLayout_2.addWidget(self.listView_2, 5, 0, 1, 2)
self.label_4 = QtWidgets.QLabel(self.groupBox_2)
self.label_4.setObjectName("label_4")
self.gridLayout_2.addWidget(self.label_4, 4, 0, 1, 1)
self.label_3 = QtWidgets.QLabel(self.groupBox_2)
self.label_3.setObjectName("label_3")
self.gridLayout_2.addWidget(self.label_3, 1, 0, 1, 1)
self.label = QtWidgets.QLabel(self.groupBox_2)
self.label.setObjectName("label")
self.gridLayout_2.addWidget(self.label, 0, 0, 1, 1)
self.progressBar = QtWidgets.QProgressBar(self.groupBox_2)
self.progressBar.setProperty("value", 0)
self.progressBar.setObjectName("progressBar")
self.gridLayout_2.addWidget(self.progressBar, 0, 1, 1, 1)
self.gridLayout.addWidget(self.groupBox_2, 1, 2, 5, 2)
self.textEdit_2.setReadOnly(True)
self.setCentralWidget(self.centralwidget)
self.statusbar = QtWidgets.QStatusBar(self)
self.statusbar.setObjectName("statusbar")
self.setStatusBar(self.statusbar)
self.groupBox_2.setEnabled(False)
self.retranslateUi()
self.pushButton.clicked.connect(self.run)
QtCore.QMetaObject.connectSlotsByName(self)
self.lcdNumber.display(0)
self.cpu = cpuThread()
self.cpu.signal.connect(self.progessUpdate)
self.Database_signal.connect(self.databaseUpdate)
Qlog.connect(self.Database_signal.emit)
def progessUpdate(self,v):
self.progressBar.setValue(int(v))
@to_logging
def handle(self):
self.max_recv = int(self.lineEdit.text())
self.backlog = int(self.lineEdit_3.text())
self.addr = self.lineEdit_4.text()
self.port = int(self.lineEdit_5.text())
server.Setup(self.addr, self.port, self.backlog, self.max_recv)
def run(self, _):
if self.handle():
self.groupBox.setEnabled(False)
self.groupBox_2.setEnabled(True)
self.timer = QtCore.QTimer()
self.cpu.start()
def databaseUpdate(self, data:str):
if data:
time.sleep(0.01) #Qpainter过快导致死机
self.textEdit_2.append(data.strip())
self.textEdit_2.moveCursor(QtGui.QTextCursor.End)
def usernumChanged(self, int):
if int != int(self.lcdNumber.value()):
self.lcdNumber.display(int)
def add_user(self, name:str):
self.listView_2.addItem(name)
def remove_user(self, name:str):
for i in range(self.listView_2.count()):
if name == self.listView_2.item(i).text():
self.listView_2.takeItem(i)
return True
return False
def retranslateUi(self,):
_translate = QtCore.QCoreApplication.translate
self.setWindowTitle(_translate("MainWindow", "Socket Server"))
self.label_6.setText(_translate("MainWindow", "Database(Logging and traceback):"))
self.groupBox.setTitle(_translate("MainWindow", "Server Setup"))
self.label_2.setText(_translate("MainWindow", "Maximum load(kb):"))
self.lineEdit.setText(_translate("MainWindow", "1024"))
self.label_8.setText(_translate("MainWindow", "backlog:"))
self.lineEdit_3.setText(_translate("MainWindow", "10"))
self.label_7.setText(_translate("MainWindow", "CODEC(Unalterable):"))
self.lineEdit_2.setText(_translate("MainWindow", "utf8"))
self.pushButton.setText(_translate("MainWindow", "Run"))
self.lineEdit_4.setText(_translate("MainWindow", "127.0.0.1"))
self.label_9.setText(_translate("MainWindow", "Address:"))
self.label_10.setText(_translate("MainWindow", "Port:"))
self.lineEdit_5.setText(_translate("MainWindow", "429"))
self.label_5.setText(_translate("MainWindow", f"TCP Server v{__version__}"))
self.groupBox_2.setTitle(_translate("MainWindow", "Run"))
self.label_4.setText(_translate("MainWindow", "Logged in users:"))
self.label_3.setText(_translate("MainWindow", "Online user(s):"))
self.label.setText(_translate("MainWindow", "Running memory with CPU"))
class cpuThread(QtCore.QThread):
signal = QtCore.pyqtSignal(int)
def run(self) -> None:
while True:
self.signal.emit(int(cpu_percent(interval=1)))
def save_bytes(file, byte:bytes):
with open(file, "wb") as f:
f.write(byte)
def get_eval(str, defined=None):
try:
res = eval(str)
if isinstance(res, type(defined)):
return res
raise TypeError
except:
return defined
class SEND():
def __init__(self, index, name, fbytes:bytes, conn:callable, encode='utf8'):
self.segs = self.cut(fbytes)
self.encode = encode
self.total = len(self.segs)
self.index = index
self.conn = conn
self.finish = False
self.name = name
self.size = len(fbytes)
asyncio.run_coroutine_threadsafe(self.update(), loop)
async def update(self):
self.conn(self.header().encode(self.encode))
for n, data in enumerate(self.segs):
self.conn(self.format(n, data).encode(self.encode))
await asyncio.sleep(delay)
self.finish = True
def cut(self, byte:bytes, seg=segment) -> list:
return [byte[x:x seg] for x in range(0, len(byte), seg)]
def format(self, process, data) -> str:
return repr( (update_file, (self.index, process, data)) )
def header(self) -> str:
return repr((new_file, (self.index,
self.name,
self.total,
self.size)
))
class RECV():
def __init__(self, index: int, name: str, total: int,size: int):
self.index,self.name,self.total,self.size = index, name, total, size
self.progress = -1
self.file = []
self.finish = False
def update(self, p, data):
if isinstance(p, int) and p - 1 == self.progress:
self.progress = p
self.file.append(data)
if len(self.file) == self.total:
self.finish = True
return True
def mix(self):
return b''.join(self.file)
def save(self, path, ):
if self.finish:
b = self.mix()
save_bytes(os.path.join(path, self.name), b)
self.__save = os.path.join(path, self.name)
logger.info(f"Save {self.name} at {path}, size {len(b)} b.")
del self.file #释放空间
return True
return False
def savepath(self) -> str:
if self.finish:
return self.__save
return ""
class send_files():
def __init__(self, encode='utf8'):
self.sends = []
self.encode = encode
def new_sends(self, name, file, conn):
if type(file) == str:
file = file.encode(self.encode)
self.sends.append(SEND(len(self.sends),name,file,conn)) # index: len(self.sends)-1 1 => len(self.sends)
def localfile(self, file, conn):
if os.path.isfile(file):
_, name = os.path.split(file)
with open(file, "rb") as f:
file = f.read()
self.new_sends(name, file, conn)
class recv_files():
def __init__(self, decode='utf8', path=None):
self.recvs = []
self.decode = decode
if path is None:
path = sys.path[0]
self.path = path
def new_files(self, index, name, total,size):
self.recvs.append(RECV(index, name, total,size))
def apply(self, index, progess, data):
if len(self.recvs) - 1 >= index:
if self.recvs[index].update(progess, data):
if self.save(index):
return index, self.recvs[index].name
else:
return False
def save(self, index):
if len(self.recvs) -1 >= index:
return self.recvs[index].save(self.path)
class message_handle():
codec = "utf8"
def __init__(self, server:Server, path=None):
if path is None:
path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "resource")
if not os.path.isdir(path):
os.makedirs(path)
self.Sender = send_files(self.codec, )
self.Receiver = recv_files(self.codec, path)
self.files_record = {}
self.server = server
@to_logging
def handle(self, string, client:Client):
for data in string.split("n"):
_res = get_eval(data, (None, ) )
if len(_res) == 2:
type, arguments = _res
if type == new_file:
index, name, total, size = arguments
if not client.username in self.files_record:
if index == 0:
self.files_record[client.username] = [len(self.Receiver.recvs), ]
else:
self.files_record[client.username].append(len(self.Receiver.recvs))
self.Receiver.new_files(len(self.Receiver.recvs), name, total, size)
elif type == update_file:
index, progess, data = arguments
if client.username in self.files_record:
if not len(self.files_record[client.username]) >= index 1:
index = len(self.files_record[client.username]) - 1
_res = self.Receiver.apply(self.files_record[client.username][index], progess, data)
if _res:
INDEX, NAME = _res
self.server.UserMessage(client.addr , client.username, f'<a href="{INDEX}">{NAME}</a>')
elif type == request_file:
path = self.Receiver.recvs[arguments].savepath()
if path:
self.Sender.localfile(path, client._send) # 如若无, 报错False
elif type == normal_text:
yield arguments
def send(self, sendpath, conn):
return self.Sender.localfile(sendpath, conn)
def get_index(self, index):
if index 1 <= len(self.Receiver.recvs):
return self.Receiver.recvs[index]
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
main_thread.start()
gui = Interface()
server = Server()
sys.exit(app.exec_())
data.py
代码语言:javascript复制from json import load, dump
from os import path, mkdir
from hashlib import md5
from time import time
def encode(data: str):
m = md5()
m.update(data.encode('utf8'))
return m.hexdigest()
file = '.clientsdata.json'
folder = '.clients'
if not path.exists(folder):
mkdir(folder)
class user():
def __init__(self):
if path.exists(file):
with open(file, 'r') as f:
self.data = load(f)
else:
self.data = {}
def __get__(self, username, default=None) -> tuple:
return self.data.get(username, default)
def __in__(self, username) -> bool:
return username in self.data.keys()
def __write__(self) -> None:
with open(file, 'w') as f:
dump(self.data, f)
def __register__(self, username, password, time: (int, float)) -> None:
self.data[username] = (encode(password), int(time))
self.__write__()
def __login__(self, username, password) -> bool:
return self.data[username][0] == encode(username)
def handler(self, username, password) -> bool:
if self.__in__(username):
return self.__login__(username, password)
else:
self.__register__(username, password, time())
return True
def get_time(self, username):
return self.data[username][1]