TCP聊天服务器套接字v1.1
所有版本记录:
v1.0
: TCP聊天服务器套接字|PyQt5 socket(TCP端口映射 端口放行) logging Thread(含日志,html) anaconda打包32位exe(3.4万字)|python高阶
文章目录
- | 1. 服务器代码改进 / bug改进
-
-
- (1).发送函数改为@function
- (2).异常运行函数改为三叠函数
- (3).服务端在下线时列表在遍历时 增加下线的服务端 -> 触发RuntimeError
- (4)获取真正本机的ip地址
-
- | 2.新增命令功能 (在输入框添加"/")
- | 3.新增cmd控制台颜色改变
- | 4.客户端PyQt5信息过快使Textedit刷新空白
- | 5.PyQt5 消息超过一页下拉浪费时间: 指针至最后
- | 6.服务端socket多次连接不同地址错误
- | 7. 完整代码
| 1. 服务器代码改进 / bug改进
(1).发送函数改为@function
代码语言:javascript复制class Server():
...
def send(self, sock, user, mes):
self.QUIT(user, lambda: sock.sendall(mes.encode(self.encode)))()
代码语言:javascript复制 def send(self, sock, user, mes):
@self.QUIT(user)
def func():
sock.sendall(mes.encode(self.encode))
return func()
(2).异常运行函数改为三叠函数
代码语言:javascript复制 def QUIT(self, user, command):
sock = list(self.connect.keys())[list(self.connect.values()).index(user)]
def logs(*args, **kargs):
try:
command(*args, **kargs)
except:
self.errs.append(sock)
return logs
代码语言:javascript复制 def QUIT(self, user):
sock = list(self.connect.keys())[list(self.connect.values()).index(user)]
def prop(command, *args, **kwargs):
def logs(*args, **kwargs):
try:
command(*args, **kwargs)
return True
except:
logging.exception(str())
if self.errduring is False:
self.errs.append((sock,user)) #在遍历时列表增加 -> 触发RuntimeError
return False
return logs
return prop
(3).服务端在下线时列表在遍历时 增加下线的服务端 -> 触发RuntimeError
bug改进方法:
增加errduring
; inc
参数
def ServerMessage(self, mes):
for sock, user in self.connect.items():
if not sock in self.errs:
self.send(sock, user, mes)
def UserMessage(self, address, _user, mes):
if not mes:
return
mes = mes.decode(encoding="utf8")
for sock, user in self.connect.items():
if not sock in self.errs:
send_message = Server.user_message % ("brown" if _user == user else "red",
_user,
address,
"(我自己)" if _user == user else "",
mes)
self.send(sock, user, send_message)
logger.info(f"{address}[{_user}] : {mes}")
self.error_handle()
def error_handle(self):
for sock in self.errs:
sock.close()
logger.exception(msg = str())
Q = Server.quit_message % user
logger.info(Q)
self.connect.pop(sock)
self.ServerMessage(Q)
代码语言:javascript复制 def ServerMessage(self, mes, inc=True):
self.errduring = True
for sock, user in self.connect.items():
if not sock in self.errs:
self.send(sock, user, mes)
if inc:
self.error_handle()
self.errduring = False
def UserMessage(self, address, _user, mes,inc=True):
if not mes:
return
self.errduring = True
for sock, user in self.connect.items():
if not sock in self.errs:
send_message = Server.user_message % ("brown" if _user == user else "red",
_user,
address,
"(我自己)" if _user == user else "",
mes)
self.send(sock, user, send_message)
logger.info(f"{address}[{_user}] : {mes}")
if inc:
self.error_handle()
self.errduring = False
def error_handle(self):
for _, user in self.errs: #使用变量_ : 为了防止触发KeyError: <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>
if user in list(self.connect.values()):
sock = list(self.connect.keys())[list(self.connect.values()).index(user)]
logger.exception(msg = str())
Q = Server.quit_message % (user, self._str_sockets())
logger.info(Q)
self.connect.pop(sock)
self.ServerMessage(Q, False) #防止多次调用error_handle函数
sock.close()
(4)获取真正本机的ip地址
如果直接用socket.gethostbyname(socket.gethostname())
获取地址,很有可能是错误的(Vmware虚拟机的地址
、127.0.0.1
等)
搜索后我得到了下面这段精巧的代码:
代码语言:javascript复制...
def get_host_ip() -> str:
"""get current IP address"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
通过UDP
尝试连接8.8.8.8:80
,不管是否连接成功,获得的本机IP一定是正确的。在Ubuntu
(kivydev)和Windows
上都可用,还省去了判断操作系统的大段代码.
那么代码可以改为:
代码语言:javascript复制server = Server(socket.gethostname(),429)
代码语言:javascript复制server = Server(get_host_ip(),429)
| 2.新增命令功能 (在输入框添加"/")
代码语言:javascript复制class Command_Handler(object):
def __init__(self, bind=None):
"""Bind Server class"""
self.bind = bind
def _function(self, _list, client):
data = {"/info" : {"-v": self.get_version(),
"-id" : self.get_id(client),
"-i" : self.info(),
"-h" : self.help()},
}
_dict = data
for n in range(len(_list)):
if type(_dict) == dict:
_dict = _dict.get(_list[n], self.unknown(" ".join(_list)))
else:
break
if type(_dict) == dict:
_dict = "Error:n<font color='blue'>This command must take more arguments. Such as %s.</font>" % list(_dict.keys())
return _dict
@staticmethod
def help():
return """/info [-v] [-id] [-i]
-v : get version of program.
-id : get your id.
-i : get information.
-h : help.
For example, <font color=red>/info -id</font>"""
@staticmethod
def get_version():
return "version : " str(__version__)
def get_id(self, client):
return "Your id is {}.".format(id(client))
def info(self):
return f"Socket Server[version {self.get_version()}] By zmh."
def unknown(self,s):
return """Error:
No command named "%s". Please search [/info -h] to help.
%s""" % (s, self.help())
def cut(self, string):
return string.strip().split()
def handler(self, c, client=None):
return "<font color='gray'>[command] </font><font color='brown'>%s</font>n%s" % (c,str(self._function(self.cut(c),client)))
def iscommand(self,i):
return i.strip().startswith("/")
代码语言:javascript复制class Server():
def __init__(self, addr, port, backlog = 10, encode = 'utf8'):
...
self.com = Command_Handler(self)
self.encode = encode
self.errs = []
self.errduring = False
| 3.新增cmd控制台颜色改变
我的文章: Python 命令行cmd指定颜色设置
代码语言:javascript复制#color.py
import ctypes,time
STD_INPUT_HANDLE = -10
STD_OUTPUT_HANDLE= -11
STD_ERROR_HANDLE = -12
std_out_handle=ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
class Text():
DARKBLUE = 0x01 # 暗蓝色
DARKGREEN = 0x02 # 暗绿色
DARKSKYBLUE = 0x03 # 暗天蓝色
DARKRED = 0x04 # 暗红色
DARKPINK = 0x05 # 暗粉红色
DARKYELLOW = 0x06 # 暗黄色
DARKWHITE = 0x07 # 暗白色
DARKGRAY = 0x08 # 暗灰色/亮
BLUE = 0x09 # 蓝色
GREEN = 0x0a # 绿色
SKYBLUE = 0x0b # 天蓝色
RED = 0x0c # 红色
PINK = 0x0d # 粉红色
YELLOW = 0x0e # 黄色
WHITE = 0x0f # 白色
class Background():
BLUE = 0x10 # 蓝
GREEN = 0x20 # 绿
RED = 0x40 # 红
INTENSITY = 0x80 # 亮
def SetCmdColor(color, handle=std_out_handle):
Bool=ctypes.windll.kernel32.SetConsoleTextAttribute(handle, color)
return Bool
def Print(color:int,*args, **kwargs):
SetCmdColor(color)
print(*args, **kwargs)
SetCmdColor(Text.DARKWHITE)
代码语言:javascript复制#server.py
from color import Text, Background, Print
| 4.客户端PyQt5信息过快使Textedit刷新空白
增加延迟呗
代码语言:javascript复制...
def Show_Message(self, data):
if data:
for i in data.split('n'):
sleep(0.2 / len(data.split('n'))) #防止信息过快使Textedit刷新空白
self.textEdit_2.append(i)
print(i)
| 5.PyQt5 消息超过一页下拉浪费时间: 指针至最后
代码语言:javascript复制 def Show_Message(self, data):
if data:
...
self.textEdit_2.moveCursor(QtGui.QTextCursor.End)
| 6.服务端socket多次连接不同地址错误
OSError: [WinError 10022] 提供了一个无效的参数.
@to_logging
def socket_connect(self):
if hasattr(self, "_socc"):
self.retry() #socket多次连接不同地址会出现 `OSError: [WinError 10022] 提供了一个无效的参数.`
else:
self._socc = True
self.socket.connect(self.addr)
| 7. 完整代码
客户端:
代码语言:javascript复制# -*- coding: utf-8 -*-
# Form implementation generated from reading ui files 'USER.ui', 'Connect.ui'
#
# Created by: PyQt5 UI code generator 5.15.4
#
# WARNING: Any manual changes made to this file will be lost when pyuic5 is
# run again. Do not edit this file unless you know what you are doing.
from PyQt5 import QtCore, QtGui, QtWidgets
import socket, sys, logging
from traceback import format_exc
from datetime import datetime
from time import sleep
from threading import Thread
from random import randint as rand
import logging # 引入logging模块
logging.basicConfig(level=logging.DEBUG,format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')
Username = f'zmh {rand(0,1000)}'
TIMEOUT = 3
IP = "114.67.206.19"
def get_host_ip() -> str:
"""get current IP address"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
selfIP = get_host_ip()
bytecount = 1024
dicts = {f"{IP} (公网)" : {"ip":IP, "port":16223},
f"{selfIP} (私网)" : {"ip":selfIP,"port":429},
"EXAM-41 (微机室)" : {"ip":"EXAM-41","port":429}}
get_time = lambda: datetime.now().strftime('%Y %m %d %H:%M:%S')
def threading(Daemon, **kwargs):
thread = Thread(**kwargs)
thread.setDaemon(Daemon)
thread.start()
return thread
def to_logging(command):
def logs(*args, **kwargs):
try:
command(*args, **kwargs)
except Exception as e:
if "main" in dir():
main.Show_Message(format_exc())
else:
logging.exception(str())
return False
else:
return True
return logs
class Socket:
def __init__(self,Function=lambda i:None,code='utf-8'):
self.socket = socket.socket()
self.code = code
self._logger = Function
self.socket.settimeout(TIMEOUT)
self._connect = False
def set_func(self, f):
self._logger = f
def retry(self):
del self.socket
self.socket = socket.socket()
self.socket.settimeout(TIMEOUT)
@to_logging
def socket_connect(self):
if hasattr(self, "_socc"):
self.retry() #socket多次连接不同地址会出现 `OSError: [WinError 10022] 提供了一个无效的参数.`
else:
self._socc = True
self.socket.connect(self.addr)
def connect(self, ip = None,port:int=0000, show=None):
self.addr = (ip, port)
if not self.socket_connect():
show("[{}]: 连接服务器[{}]失败".format(get_time(),self.addr[0]))
return False
else:
show("[{}]: 连接成功".format(get_time()))
return True
def _handler(self):
self.socket.send(Username.encode(self.code))
while True:
try:
byte = self.socket.recv(bytecount **2)
if len(byte) == 0:
break
kb = len(byte) / bytecount
self._logger(f'[{datetime.now().strftime("%H:%M:%S")}]{byte.decode(encoding=self.code)} {"<font size=1>%0.2f kb</font>" % kb}')
except Exception as e:
if not type(e) == socket.timeout:
for n in ["","ERROR".center(20,"-")] format_exc().split('n') ["".center(20,"-"),""]:
self._logger(f"<font color='red'>{n}</font>")
self.socket.close()
return
def run(self): #线程
self._connect = True
self.thread = threading(False, target=self._handler)
class Ui_Dialog(object):
def __init__(self):
Dialog = QtWidgets.QDialog()
self.Dialog = Dialog
Dialog.setObjectName("Dialog")
Dialog.resize(495, 81)
self.gridLayout = QtWidgets.QGridLayout(Dialog)
self.gridLayout.setObjectName("gridLayout")
self.comboBox = QtWidgets.QComboBox(Dialog)
self.comboBox.setObjectName("comboBox")
self.comboBox.addItems(list(dicts.keys()))
self.gridLayout.addWidget(self.comboBox, 0, 1, 1, 1)
self.pushButton = QtWidgets.QPushButton(Dialog)
self.pushButton.setObjectName("pushButton")
self.gridLayout.addWidget(self.pushButton, 0, 2, 1, 1)
self.label = QtWidgets.QLabel(Dialog)
font = QtGui.QFont()
font.setFamily("Consolas")
self.label.setFont(font)
self.label.setObjectName("label")
self.gridLayout.addWidget(self.label, 0, 0, 1, 1)
self.lineEdit = QtWidgets.QLineEdit(Dialog)
self.lineEdit.setReadOnly(True)
self.lineEdit.setObjectName("lineEdit")
self.gridLayout.addWidget(self.lineEdit, 1, 0, 2, 3)
self.retranslateUi(Dialog)
self.pushButton.clicked.connect(self._connect)
QtCore.QMetaObject.connectSlotsByName(Dialog)
self.num = 0
def retranslateUi(self, Dialog):
_translate = QtCore.QCoreApplication.translate
Dialog.setWindowTitle(_translate("Dialog", "Dialog"))
self.comboBox.setItemText(0, _translate("Dialog", list(dicts.keys())[0]))
self.pushButton.setText(_translate("Dialog", "Connecting"))
self.label.setText(_translate("Dialog", "Select Socket Connect Address:"))
self.set_text("No return value.")
Dialog.show()
def set_text(self, m):
self.lineEdit.setText(QtCore.QCoreApplication.translate("Dialog", str(m)))
self.lineEdit.update()
@to_logging
def _connect(self, i):
addr = dicts[self.comboBox.currentText()]
self.set_text("[{}]: 尝试连接服务器[{}],最大超时报错 {}s".format(datetime.now().strftime('%Y %m %d %H:%M:%S'),addr["ip"],TIMEOUT))
if s.connect(**addr, show=self.set_text):
global main
main = Ui_MainWindow()
main.show()
def close(widget):
sleep(1)
widget.close()
threading(False, target=close, args=(self.Dialog, ))
class Ui_MainWindow(object):
def __init__(self):
MainWindow = QtWidgets.QMainWindow()
MainWindow.setObjectName("MainWindow")
MainWindow.resize(800, 619)
font = QtGui.QFont()
font.setFamily("Consolas")
MainWindow.setFont(font)
self.centralwidget = QtWidgets.QWidget(MainWindow)
self.centralwidget.setObjectName("centralwidget")
self.gridLayout = QtWidgets.QGridLayout(self.centralwidget)
self.gridLayout.setObjectName("gridLayout")
self.lineEdit_3 = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit_3.setReadOnly(True)
self.lineEdit_3.setObjectName("lineEdit_3")
self.gridLayout.addWidget(self.lineEdit_3, 7, 3, 1, 1)
self.pushButton = QtWidgets.QPushButton(self.centralwidget)
self.pushButton.setStyleSheet("background-color:rgb(44, 176, 13);n"
"color:rgb(255, 255, 255);n"
"font: 200 10pt "Consolas";")
self.pushButton.setObjectName("pushButton")
self.gridLayout.addWidget(self.pushButton, 8, 6, 1, 1)
spacerItem = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
self.gridLayout.addItem(spacerItem, 8, 5, 1, 1)
self.label_2 = QtWidgets.QLabel(self.centralwidget)
self.label_2.setObjectName("label_2")
self.gridLayout.addWidget(self.label_2, 6, 1, 1, 1)
spacerItem1 = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
self.gridLayout.addItem(spacerItem1, 8, 3, 1, 1)
self.lineEdit_2 = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit_2.setReadOnly(True)
self.lineEdit_2.setObjectName("lineEdit_2")
self.gridLayout.addWidget(self.lineEdit_2, 6, 3, 1, 1)
self.label = QtWidgets.QLabel(self.centralwidget)
self.label.setObjectName("label")
self.gridLayout.addWidget(self.label, 5, 1, 1, 1)
self.lineEdit = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit.setCursor(QtGui.QCursor(QtCore.Qt.IBeamCursor))
self.lineEdit.setDragEnabled(False)
self.lineEdit.setReadOnly(True)
self.lineEdit.setObjectName("lineEdit")
self.gridLayout.addWidget(self.lineEdit, 5, 3, 1, 1)
self.label_3 = QtWidgets.QLabel(self.centralwidget)
self.label_3.setObjectName("label_3")
self.gridLayout.addWidget(self.label_3, 7, 1, 1, 1)
self.line = QtWidgets.QFrame(self.centralwidget)
self.line.setFrameShape(QtWidgets.QFrame.VLine)
self.line.setFrameShadow(QtWidgets.QFrame.Sunken)
self.line.setObjectName("line")
self.gridLayout.addWidget(self.line, 5, 4, 3, 1)
self.textEdit = QtWidgets.QTextEdit(self.centralwidget)
self.textEdit.setObjectName("textEdit")
self.gridLayout.addWidget(self.textEdit, 5, 5, 3, 2)
spacerItem2 = QtWidgets.QSpacerItem(20, 40, QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Expanding)
self.gridLayout.addItem(spacerItem2, 1, 1, 1, 1)
self.textEdit_2 = QtWidgets.QTextEdit(self.centralwidget)
self.textEdit_2.setObjectName("textEdit_2")
self.textEdit_2.setReadOnly(True)
self.gridLayout.addWidget(self.textEdit_2, 0, 3, 2, 4)
self.pushButton_2 = QtWidgets.QPushButton(self.centralwidget)
self.pushButton_2.setObjectName("pushButton_2")
self.gridLayout.addWidget(self.pushButton_2, 8, 1, 1, 1)
MainWindow.setCentralWidget(self.centralwidget)
self.menubar = QtWidgets.QMenuBar(MainWindow)
self.menubar.setGeometry(QtCore.QRect(0, 0, 800, 24))
self.menubar.setObjectName("menubar")
self.menu = QtWidgets.QMenu(self.menubar)
self.menu.setObjectName("menu")
self.menulanguage = QtWidgets.QMenu(self.menu)
self.menulanguage.setObjectName("menulanguage")
MainWindow.setMenuBar(self.menubar)
self.statusbar = QtWidgets.QStatusBar(MainWindow)
self.statusbar.setObjectName("statusbar")
MainWindow.setStatusBar(self.statusbar)
self.actionsocket_connet = QtWidgets.QAction(MainWindow)
self.actionsocket_connet.setObjectName("actionsocket_connet")
self.actionChinese = QtWidgets.QAction(MainWindow)
self.actionChinese.setObjectName("actionChinese")
self.actionip_socket_gethostbyname_socket_gethostname = QtWidgets.QAction(MainWindow)
self.actionip_socket_gethostbyname_socket_gethostname.setObjectName("actionip_socket_gethostbyname_socket_gethostname")
self.menulanguage.addSeparator()
self.menulanguage.addAction(self.actionChinese)
self.menu.addSeparator()
self.menu.addAction(self.menulanguage.menuAction())
self.menu.addAction(self.actionip_socket_gethostbyname_socket_gethostname)
self.menubar.addAction(self.menu.menuAction())
self.socket_peername = s.addr[0]
self.retranslateUi(MainWindow)
QtCore.QMetaObject.connectSlotsByName(MainWindow)
self.MainWindow = MainWindow
self.pushButton.clicked.connect(self.send)
@to_logging
def sendmsg(self):
data = self.textEdit.toPlainText().strip()
if data:
s.socket.send(data.encode('utf8'));self.textEdit.clear()
@to_logging
def send(self, i):
if hasattr(s,"_connect") and s._connect:
if not self.sendmsg():
QtWidgets.QMessageBox.information(self.MainWindow, 'TraceBack',f'Socket Server<{self.socket_peername}> 断开连接')
s._connect = False
else:
self.Show_Message("<font color='red'>发送异常. 未连接至服务器.请点击[重新连接服务器]按钮尝试重新连接.</font>")
def retranslateUi(self, MainWindow):
_translate = QtCore.QCoreApplication.translate
MainWindow.setWindowTitle(_translate("MainWindow", "Socket"))
self.lineEdit_2.setText(socket.gethostname())
self.lineEdit.setText(socket.gethostbyname(socket.gethostname()))
self.lineEdit_3.setText(self.socket_peername)
self.pushButton.setText(_translate("MainWindow", "send"))
self.label_2.setText(_translate("MainWindow", "主机名:"))
self.label.setText(_translate("MainWindow", "本地端口:"))
self.label_3.setText(_translate("MainWindow", "连接端口:"))
self.pushButton_2.setText(_translate("MainWindow", "重新连接服务器"))
self.menu.setTitle(_translate("MainWindow", "设置"))
self.menulanguage.setTitle(_translate("MainWindow", "language"))
self.actionsocket_connet.setText(_translate("MainWindow", "socket connect"))
self.actionChinese.setText(_translate("MainWindow", "Chinese"))
self.actionip_socket_gethostbyname_socket_gethostname.setText(_translate("MainWindow", "ip: " socket.gethostbyname(socket.gethostname()) ))
s.set_func(self.Show_Message)
s.run()
def show(self):
self.MainWindow.show()
def Show_Message(self, data):
if data:
for i in data.split('n'):
sleep(0.2 / len(data.split('n'))) #防止信息过快使Textedit刷新空白
self.textEdit_2.append(i)
print(i)
self.textEdit_2.moveCursor(QtGui.QTextCursor.End)
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
s = Socket()
conn = Ui_Dialog()
sys.exit(app.exec_())
服务端:
server.py
代码语言:javascript复制import socket # 导入 socket 模块
from threading import Thread
import logging
from color import Text, Background, Print
__version__ = 1.1
def threading(Daemon, **kwargs):
thread = Thread(**kwargs)
thread.setDaemon(Daemon)
thread.start()
return thread
logger = logging.getLogger(__name__)
logger.setLevel(level = logging.INFO)
handler = logging.FileHandler("log.txt")
handler.setLevel(logging.INFO)
handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
logger.addHandler(handler)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logger.addHandler(handler)
logger.addHandler(console)
bytecount = 1024
class Command_Handler(object):
def __init__(self, bind=None):
"""Bind Server class"""
self.bind = bind
def _function(self, _list, client):
data = {"/info" : {"-v": self.get_version(),
"-id" : self.get_id(client),
"-i" : self.info(),
"-h" : self.help()},
}
_dict = data
for n in range(len(_list)):
if type(_dict) == dict:
_dict = _dict.get(_list[n], self.unknown(" ".join(_list)))
else:
break
if type(_dict) == dict:
_dict = "Error:n<font color='blue'>This command must take more arguments. Such as %s.</font>" % list(_dict.keys())
return _dict
@staticmethod
def help():
return """/info [-v] [-id] [-i]
-v : get version of program.
-id : get your id.
-i : get information.
-h : help.
For example, <font color=red>/info -id</font>"""
@staticmethod
def get_version():
return "version : " str(__version__)
def get_id(self, client):
return "Your id is {}.".format(id(client))
def info(self):
return f"Socket Server[version {self.get_version()}] By zmh."
def unknown(self,s):
return """Error:
No command named "%s". Please search [/info -h] to help.
%s""" % (s, self.help())
def cut(self, string):
return string.strip().split()
def handler(self, c, client=None):
return "<font color='gray'>[command] </font><font color='brown'>%s</font>n%s" % (c,str(self._function(self.cut(c),client)))
def iscommand(self,i):
return i.strip().startswith("/")
class Server(object):
join_message = "<font color='red'>Server></font> <font color='blue'>%s</font> 连接服务器. 当前在线人数: <font color='red'>%s</font>"
user_message = "<font color='%s'>%s(%s端)%s></font> %s"
quit_message = "%s 下线了, %s"
def __init__(self, addr, port, backlog = 10, encode = 'utf8'):
self.address = addr, port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind(self.address)
self.socket.listen(backlog)
self.connect = {}
self.com = Command_Handler(self)
self.encode = encode
self.errs = []
self.errduring = False
def run(self):
logger.info("服务器[%s]开启,端口[%s]" % (socket.gethostbyname(socket.gethostname()), self.address[1]))
self.accept_client()
def _get_sockets(self): #return int
return len(self.connect.items())
def _str_sockets(self):
return f"当前人数 {self._get_sockets()}"
def send(self, sock, user, mes):
@self.QUIT(user)
def func():
sock.sendall(mes.encode(self.encode))
return func()
def QUIT(self, user):
sock = list(self.connect.keys())[list(self.connect.values()).index(user)]
def prop(command, *args, **kwargs):
def logs(*args, **kwargs):
try:
command(*args, **kwargs)
return True
except:
logging.exception(str())
if self.errduring is False:
self.errs.append((sock,user)) #在遍历时列表增加 -> 触发RuntimeError
return False
return logs
return prop
def ServerMessage(self, mes, inc=True):
self.errduring = True
for sock, user in self.connect.items():
if not sock in self.errs:
self.send(sock, user, mes)
if inc:
self.error_handle()
self.errduring = False
def UserMessage(self, address, _user, mes,inc=True):
if not mes:
return
self.errduring = True
for sock, user in self.connect.items():
if not sock in self.errs:
send_message = Server.user_message % ("brown" if _user == user else "red",
_user,
address,
"(我自己)" if _user == user else "",
mes)
self.send(sock, user, send_message)
logger.info(f"{address}[{_user}] : {mes}")
if inc:
self.error_handle()
self.errduring = False
def error_handle(self):
for _, user in self.errs: #使用变量_ : 为了防止触发KeyError: <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>
if user in list(self.connect.values()):
sock = list(self.connect.keys())[list(self.connect.values()).index(user)]
logger.exception(msg = str())
Q = Server.quit_message % (user, self._str_sockets())
logger.info(Q)
self.connect.pop(sock)
self.ServerMessage(Q, False) #防止多次调用error_handle函数
sock.close()
def accept_client(self):
while True:
client, address = self.socket.accept() # 阻塞,等待客户端连接
thread = threading(Daemon=True, target=self.message_handle, args=(client,address[0]))
def message_handle(self,client,address):
username = client.recv(1024).decode(encoding='utf8')
self.connect[client] = username
logger.info(f"{address}[{username}] 加入服务器 , " self._str_sockets())
self.ServerMessage(Server.join_message % (address, self._get_sockets()) )
@self.QUIT(username)
def update():
string = client.recv(bytecount **2).decode(encoding="utf8")
Print(Text.BLUE,f"{username}>{string}")
if self.com.iscommand(string):
self.send(client, username, self.com.handler(string))
else:
self.UserMessage(address,username,string)
while True:
status = update()
if not status:
break
def get_host_ip() -> str:
"""get current IP address"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
server = Server(get_host_ip(),429)
server.run()
sys.exit(app.exec_())
color.py
代码语言:javascript复制import ctypes,time
STD_INPUT_HANDLE = -10
STD_OUTPUT_HANDLE= -11
STD_ERROR_HANDLE = -12
std_out_handle=ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
class Text():
DARKBLUE = 0x01 # 暗蓝色
DARKGREEN = 0x02 # 暗绿色
DARKSKYBLUE = 0x03 # 暗天蓝色
DARKRED = 0x04 # 暗红色
DARKPINK = 0x05 # 暗粉红色
DARKYELLOW = 0x06 # 暗黄色
DARKWHITE = 0x07 # 暗白色
DARKGRAY = 0x08 # 暗灰色/亮
BLUE = 0x09 # 蓝色
GREEN = 0x0a # 绿色
SKYBLUE = 0x0b # 天蓝色
RED = 0x0c # 红色
PINK = 0x0d # 粉红色
YELLOW = 0x0e # 黄色
WHITE = 0x0f # 白色
class Background():
BLUE = 0x10 # 蓝
GREEN = 0x20 # 绿
RED = 0x40 # 红
INTENSITY = 0x80 # 亮
def SetCmdColor(color, handle=std_out_handle):
Bool=ctypes.windll.kernel32.SetConsoleTextAttribute(handle, color)
return Bool
def Print(color:int,*args, **kwargs):
SetCmdColor(color)
print(*args, **kwargs)
SetCmdColor(Text.DARKWHITE)