python TCP套接字服务器v1.1-新增服务端命令功能及修改bug(socket+PyQt5)

2023-02-06 09:53:08 浏览数 (1)

TCP聊天服务器套接字v1.1

所有版本记录: v1.0 : TCP聊天服务器套接字|PyQt5 socket(TCP端口映射 端口放行) logging Thread(含日志,html) anaconda打包32位exe(3.4万字)|python高阶

文章目录

  • | 1. 服务器代码改进 / bug改进
      • (1).发送函数改为@function
      • (2).异常运行函数改为三叠函数
      • (3).服务端在下线时列表在遍历时 增加下线的服务端 -> 触发RuntimeError
      • (4)获取真正本机的ip地址
  • | 2.新增命令功能 (在输入框添加"/")
  • | 3.新增cmd控制台颜色改变
  • | 4.客户端PyQt5信息过快使Textedit刷新空白
  • | 5.PyQt5 消息超过一页下拉浪费时间: 指针至最后
  • | 6.服务端socket多次连接不同地址错误
  • | 7. 完整代码

| 1. 服务器代码改进 / bug改进

(1).发送函数改为@function

代码语言:javascript复制
class Server():
	...
    def send(self, sock, user, mes):
        self.QUIT(user, lambda: sock.sendall(mes.encode(self.encode)))()
代码语言:javascript复制
    def send(self, sock, user, mes):
        @self.QUIT(user)
        def func():
            sock.sendall(mes.encode(self.encode))
        return func()

(2).异常运行函数改为三叠函数

代码语言:javascript复制
    def QUIT(self, user, command):
        sock = list(self.connect.keys())[list(self.connect.values()).index(user)]
        def logs(*args, **kargs):
            try:
                command(*args, **kargs)
            except:
                self.errs.append(sock)
        return logs
代码语言:javascript复制
    def QUIT(self, user):
        sock = list(self.connect.keys())[list(self.connect.values()).index(user)]
        def prop(command, *args, **kwargs):
            def logs(*args, **kwargs):
                try:
                    command(*args, **kwargs)
                    return True
                except:
                    logging.exception(str())
                    if self.errduring is False:
                        self.errs.append((sock,user)) #在遍历时列表增加 -> 触发RuntimeError
                    return False
            return logs
        return prop

(3).服务端在下线时列表在遍历时 增加下线的服务端 -> 触发RuntimeError

bug改进方法:

增加errduring; inc参数

代码语言:javascript复制
    def ServerMessage(self, mes):
        for sock, user in self.connect.items():
            if not sock in self.errs:
                self.send(sock, user, mes)
    def UserMessage(self, address, _user, mes):
        if not mes:
            return
        mes = mes.decode(encoding="utf8")
        for sock, user in self.connect.items():
            if not sock in self.errs:
                send_message = Server.user_message % ("brown" if _user == user else "red",
                                                      _user,
                                                      address,
                                                      "(我自己)" if _user == user else "",
                                                      mes)
                self.send(sock, user, send_message)
        logger.info(f"{address}[{_user}] : {mes}")
        self.error_handle()
    def error_handle(self):
        for sock in self.errs:
            sock.close()
            logger.exception(msg = str())
            Q = Server.quit_message % user
            logger.info(Q)
            self.connect.pop(sock)
            self.ServerMessage(Q)
代码语言:javascript复制
    def ServerMessage(self, mes, inc=True):
        self.errduring = True
        for sock, user in self.connect.items():
            if not sock in self.errs:
                self.send(sock, user, mes)
        if inc:
            self.error_handle()
        self.errduring = False
        
    def UserMessage(self, address, _user, mes,inc=True):
        if not mes:
            return
        self.errduring = True
        for sock, user in self.connect.items():
            if not sock in self.errs:
                send_message = Server.user_message % ("brown" if _user == user else "red",
                                                      _user,
                                                      address,
                                                      "(我自己)" if _user == user else "",
                                                      mes)
                self.send(sock, user, send_message)
        logger.info(f"{address}[{_user}] : {mes}")
        if inc:
            self.error_handle()
        self.errduring = False
        
    def error_handle(self):
        for _, user in self.errs: #使用变量_ : 为了防止触发KeyError: <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>
            if user in list(self.connect.values()):
                sock = list(self.connect.keys())[list(self.connect.values()).index(user)]
                logger.exception(msg = str())
                Q = Server.quit_message % (user, self._str_sockets())
                logger.info(Q)
                self.connect.pop(sock) 
                self.ServerMessage(Q, False) #防止多次调用error_handle函数
                sock.close()

(4)获取真正本机的ip地址

如果直接用socket.gethostbyname(socket.gethostname())获取地址,很有可能是错误的(Vmware虚拟机的地址127.0.0.1等)

搜索后我得到了下面这段精巧的代码:

代码语言:javascript复制
...
def get_host_ip() -> str:
    """get current IP address"""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 80))
        ip = s.getsockname()[0]
    finally:
        s.close()
    return ip

通过UDP尝试连接8.8.8.8:80,不管是否连接成功,获得的本机IP一定是正确的。在Ubuntu(kivydev)和Windows上都可用,还省去了判断操作系统的大段代码.

那么代码可以改为:

代码语言:javascript复制
server = Server(socket.gethostname(),429)
代码语言:javascript复制
server = Server(get_host_ip(),429)

| 2.新增命令功能 (在输入框添加"/")

代码语言:javascript复制
class Command_Handler(object):
    def __init__(self, bind=None):
        """Bind Server class"""
        self.bind = bind
    def _function(self, _list, client):
        
        data = {"/info" : {"-v": self.get_version(),
                           "-id" : self.get_id(client),
                           "-i" : self.info(),
                           "-h" : self.help()},
                }
        _dict = data
        for n in range(len(_list)):
            if type(_dict) == dict:
                _dict = _dict.get(_list[n],  self.unknown(" ".join(_list)))
            else:
                break
        if  type(_dict) == dict:
             _dict = "Error:n<font color='blue'>This command must take more arguments. Such as %s.</font>" % list(_dict.keys()) 
        return _dict
    @staticmethod
    def help():
        return """/info [-v] [-id] [-i]
-v : get version of program.
-id : get your id.
-i : get information.
-h : help.
For example, <font color=red>/info -id</font>"""
    @staticmethod
    def get_version():
        return "version : "   str(__version__)
    def get_id(self, client):
        return "Your id is {}.".format(id(client))
    
    def info(self):
        return f"Socket Server[version {self.get_version()}] By zmh."
    def unknown(self,s):
        return """Error:
No command named "%s". Please search [/info -h] to help.
%s""" % (s, self.help())
    def cut(self, string):
        return string.strip().split()
    def handler(self, c, client=None):
        return "<font color='gray'>[command] </font><font color='brown'>%s</font>n%s"  % (c,str(self._function(self.cut(c),client)))
    def iscommand(self,i):
        return i.strip().startswith("/")
代码语言:javascript复制
class Server():
    def __init__(self, addr, port, backlog = 10, encode = 'utf8'):
		...
        self.com = Command_Handler(self)
        self.encode = encode
        self.errs = []
        self.errduring = False

| 3.新增cmd控制台颜色改变

我的文章: Python 命令行cmd指定颜色设置

代码语言:javascript复制
#color.py

import ctypes,time
STD_INPUT_HANDLE = -10  
STD_OUTPUT_HANDLE= -11  
STD_ERROR_HANDLE = -12
std_out_handle=ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
class Text():
    DARKBLUE = 0x01 # 暗蓝色
    DARKGREEN = 0x02 # 暗绿色
    DARKSKYBLUE = 0x03 # 暗天蓝色
    DARKRED = 0x04 # 暗红色
    DARKPINK = 0x05 # 暗粉红色
    DARKYELLOW = 0x06 # 暗黄色
    DARKWHITE = 0x07 # 暗白色
    DARKGRAY = 0x08 # 暗灰色/亮
    BLUE = 0x09 # 蓝色
    GREEN = 0x0a # 绿色
    SKYBLUE = 0x0b # 天蓝色
    RED = 0x0c # 红色
    PINK = 0x0d # 粉红色
    YELLOW = 0x0e # 黄色
    WHITE = 0x0f # 白色

class Background():
    BLUE     = 0x10 # 蓝
    GREEN    = 0x20 # 绿
    RED      = 0x40  # 红
    INTENSITY = 0x80 # 亮
 
def SetCmdColor(color, handle=std_out_handle):
    Bool=ctypes.windll.kernel32.SetConsoleTextAttribute(handle, color)
    return Bool
 
def Print(color:int,*args, **kwargs):
    SetCmdColor(color)
    print(*args, **kwargs)
    SetCmdColor(Text.DARKWHITE)
代码语言:javascript复制
#server.py
from color import Text, Background, Print

| 4.客户端PyQt5信息过快使Textedit刷新空白

增加延迟呗

代码语言:javascript复制
...
    def Show_Message(self, data):
        if data:
            for i in data.split('n'):
                sleep(0.2 / len(data.split('n'))) #防止信息过快使Textedit刷新空白
                self.textEdit_2.append(i)
                print(i)

| 5.PyQt5 消息超过一页下拉浪费时间: 指针至最后

代码语言:javascript复制
    def Show_Message(self, data):
        if data:
			...
        self.textEdit_2.moveCursor(QtGui.QTextCursor.End)

| 6.服务端socket多次连接不同地址错误

OSError: [WinError 10022] 提供了一个无效的参数.

代码语言:javascript复制
    @to_logging
    def socket_connect(self):
        if hasattr(self, "_socc"):
            self.retry() #socket多次连接不同地址会出现 `OSError: [WinError 10022] 提供了一个无效的参数.`
        else:
            self._socc = True
       self.socket.connect(self.addr)

| 7. 完整代码

客户端:

代码语言:javascript复制
# -*- coding: utf-8 -*-

# Form implementation generated from reading ui files 'USER.ui', 'Connect.ui'
#
# Created by: PyQt5 UI code generator 5.15.4
#
# WARNING: Any manual changes made to this file will be lost when pyuic5 is
# run again.  Do not edit this file unless you know what you are doing.


from PyQt5 import QtCore, QtGui, QtWidgets
import socket, sys, logging
from traceback import format_exc
from datetime import datetime
from time import sleep
from threading import Thread
from random import randint as rand
import logging  # 引入logging模块
logging.basicConfig(level=logging.DEBUG,format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')
Username = f'zmh {rand(0,1000)}'
TIMEOUT = 3
IP = "114.67.206.19"
def get_host_ip() -> str:
    """get current IP address"""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 80))
        ip = s.getsockname()[0]
    finally:
        s.close()
    return ip
selfIP = get_host_ip()
bytecount = 1024
dicts = {f"{IP} (公网)" : {"ip":IP, "port":16223},
         f"{selfIP} (私网)" : {"ip":selfIP,"port":429},
         "EXAM-41 (微机室)" : {"ip":"EXAM-41","port":429}}
get_time = lambda: datetime.now().strftime('%Y %m %d %H:%M:%S')
def threading(Daemon, **kwargs):
        thread = Thread(**kwargs)
        thread.setDaemon(Daemon)
        thread.start()
        return thread

def to_logging(command):
    def logs(*args, **kwargs):
        try:
            command(*args, **kwargs)
        except Exception as e:
            if "main" in dir():
                main.Show_Message(format_exc())
            else:
                logging.exception(str())
            return False
        else:
            return True
    return logs

class Socket:
    def __init__(self,Function=lambda i:None,code='utf-8'):
        self.socket = socket.socket()
        self.code = code
        self._logger = Function
        self.socket.settimeout(TIMEOUT)
        self._connect = False
    def set_func(self, f):
        self._logger = f

    def retry(self):
        del self.socket
        self.socket = socket.socket()
        self.socket.settimeout(TIMEOUT)
    @to_logging
    def socket_connect(self):
        if hasattr(self, "_socc"):
            self.retry() #socket多次连接不同地址会出现 `OSError: [WinError 10022] 提供了一个无效的参数.`
        else:
            self._socc = True
        self.socket.connect(self.addr)
    def connect(self, ip = None,port:int=0000, show=None):
        self.addr = (ip, port)   
        if not self.socket_connect():
                show("[{}]: 连接服务器[{}]失败".format(get_time(),self.addr[0]))
                return False
        else:
                show("[{}]: 连接成功".format(get_time()))
                return True
    def _handler(self):
            self.socket.send(Username.encode(self.code))
            while True:
                try:
                    byte = self.socket.recv(bytecount **2)
                    if len(byte) == 0:
                            break
                    kb = len(byte) / bytecount
                    self._logger(f'[{datetime.now().strftime("%H:%M:%S")}]{byte.decode(encoding=self.code)}                  {"<font size=1>%0.2f kb</font>" % kb}')
                except Exception as e:
                    if not type(e) == socket.timeout:
                        for n in ["","ERROR".center(20,"-")] format_exc().split('n') ["".center(20,"-"),""]:
                            self._logger(f"<font color='red'>{n}</font>")
                        self.socket.close()
                        return
    def run(self): #线程
        self._connect = True
        self.thread = threading(False, target=self._handler)

class Ui_Dialog(object):
    def __init__(self):
        Dialog = QtWidgets.QDialog()
        self.Dialog = Dialog
        Dialog.setObjectName("Dialog")
        Dialog.resize(495, 81)
        self.gridLayout = QtWidgets.QGridLayout(Dialog)
        self.gridLayout.setObjectName("gridLayout")
        self.comboBox = QtWidgets.QComboBox(Dialog)
        self.comboBox.setObjectName("comboBox")
        self.comboBox.addItems(list(dicts.keys()))
        self.gridLayout.addWidget(self.comboBox, 0, 1, 1, 1)
        self.pushButton = QtWidgets.QPushButton(Dialog)
        self.pushButton.setObjectName("pushButton")
        self.gridLayout.addWidget(self.pushButton, 0, 2, 1, 1)
        self.label = QtWidgets.QLabel(Dialog)
        font = QtGui.QFont()
        font.setFamily("Consolas")
        self.label.setFont(font)
        self.label.setObjectName("label")
        self.gridLayout.addWidget(self.label, 0, 0, 1, 1)
        self.lineEdit = QtWidgets.QLineEdit(Dialog)
        self.lineEdit.setReadOnly(True)
        self.lineEdit.setObjectName("lineEdit")
        self.gridLayout.addWidget(self.lineEdit, 1, 0, 2, 3)

        self.retranslateUi(Dialog)
        self.pushButton.clicked.connect(self._connect)
        QtCore.QMetaObject.connectSlotsByName(Dialog)
        self.num = 0
    def retranslateUi(self, Dialog):
        _translate = QtCore.QCoreApplication.translate
        Dialog.setWindowTitle(_translate("Dialog", "Dialog"))
        self.comboBox.setItemText(0, _translate("Dialog", list(dicts.keys())[0]))
        self.pushButton.setText(_translate("Dialog", "Connecting"))
        self.label.setText(_translate("Dialog", "Select Socket Connect Address:"))
        self.set_text("No return value.")
        Dialog.show()
    def set_text(self, m):
        self.lineEdit.setText(QtCore.QCoreApplication.translate("Dialog", str(m)))
        self.lineEdit.update()
    @to_logging
    def _connect(self, i):
        addr = dicts[self.comboBox.currentText()]
        self.set_text("[{}]: 尝试连接服务器[{}],最大超时报错 {}s".format(datetime.now().strftime('%Y %m %d %H:%M:%S'),addr["ip"],TIMEOUT))
        if s.connect(**addr, show=self.set_text):
            global main
            main = Ui_MainWindow()
            main.show()
            def close(widget):
                sleep(1)
                widget.close()
            threading(False, target=close, args=(self.Dialog, ))
class Ui_MainWindow(object):
    def __init__(self):
        MainWindow = QtWidgets.QMainWindow()
        MainWindow.setObjectName("MainWindow")
        MainWindow.resize(800, 619)
        font = QtGui.QFont()
        font.setFamily("Consolas")
        MainWindow.setFont(font)
        self.centralwidget = QtWidgets.QWidget(MainWindow)
        self.centralwidget.setObjectName("centralwidget")
        self.gridLayout = QtWidgets.QGridLayout(self.centralwidget)
        self.gridLayout.setObjectName("gridLayout")
        self.lineEdit_3 = QtWidgets.QLineEdit(self.centralwidget)
        self.lineEdit_3.setReadOnly(True)
        self.lineEdit_3.setObjectName("lineEdit_3")
        self.gridLayout.addWidget(self.lineEdit_3, 7, 3, 1, 1)
        self.pushButton = QtWidgets.QPushButton(self.centralwidget)
        self.pushButton.setStyleSheet("background-color:rgb(44, 176, 13);n"
"color:rgb(255, 255, 255);n"
"font: 200 10pt "Consolas";")
        self.pushButton.setObjectName("pushButton")
        self.gridLayout.addWidget(self.pushButton, 8, 6, 1, 1)
        spacerItem = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
        self.gridLayout.addItem(spacerItem, 8, 5, 1, 1)
        self.label_2 = QtWidgets.QLabel(self.centralwidget)
        self.label_2.setObjectName("label_2")
        self.gridLayout.addWidget(self.label_2, 6, 1, 1, 1)
        spacerItem1 = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
        self.gridLayout.addItem(spacerItem1, 8, 3, 1, 1)
        self.lineEdit_2 = QtWidgets.QLineEdit(self.centralwidget)
        self.lineEdit_2.setReadOnly(True)
        self.lineEdit_2.setObjectName("lineEdit_2")
        self.gridLayout.addWidget(self.lineEdit_2, 6, 3, 1, 1)
        self.label = QtWidgets.QLabel(self.centralwidget)
        self.label.setObjectName("label")
        self.gridLayout.addWidget(self.label, 5, 1, 1, 1)
        self.lineEdit = QtWidgets.QLineEdit(self.centralwidget)
        self.lineEdit.setCursor(QtGui.QCursor(QtCore.Qt.IBeamCursor))
        self.lineEdit.setDragEnabled(False)
        self.lineEdit.setReadOnly(True)
        self.lineEdit.setObjectName("lineEdit")
        self.gridLayout.addWidget(self.lineEdit, 5, 3, 1, 1)
        self.label_3 = QtWidgets.QLabel(self.centralwidget)
        self.label_3.setObjectName("label_3")
        self.gridLayout.addWidget(self.label_3, 7, 1, 1, 1)
        self.line = QtWidgets.QFrame(self.centralwidget)
        self.line.setFrameShape(QtWidgets.QFrame.VLine)
        self.line.setFrameShadow(QtWidgets.QFrame.Sunken)
        self.line.setObjectName("line")
        self.gridLayout.addWidget(self.line, 5, 4, 3, 1)
        self.textEdit = QtWidgets.QTextEdit(self.centralwidget)
        self.textEdit.setObjectName("textEdit")
        self.gridLayout.addWidget(self.textEdit, 5, 5, 3, 2)
        spacerItem2 = QtWidgets.QSpacerItem(20, 40, QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Expanding)
        self.gridLayout.addItem(spacerItem2, 1, 1, 1, 1)
        self.textEdit_2 = QtWidgets.QTextEdit(self.centralwidget)
        self.textEdit_2.setObjectName("textEdit_2")
        self.textEdit_2.setReadOnly(True)
        self.gridLayout.addWidget(self.textEdit_2, 0, 3, 2, 4)
        self.pushButton_2 = QtWidgets.QPushButton(self.centralwidget)
        self.pushButton_2.setObjectName("pushButton_2")
        self.gridLayout.addWidget(self.pushButton_2, 8, 1, 1, 1)
        MainWindow.setCentralWidget(self.centralwidget)
        self.menubar = QtWidgets.QMenuBar(MainWindow)
        self.menubar.setGeometry(QtCore.QRect(0, 0, 800, 24))
        self.menubar.setObjectName("menubar")
        self.menu = QtWidgets.QMenu(self.menubar)
        self.menu.setObjectName("menu")
        self.menulanguage = QtWidgets.QMenu(self.menu)
        self.menulanguage.setObjectName("menulanguage")
        MainWindow.setMenuBar(self.menubar)
        self.statusbar = QtWidgets.QStatusBar(MainWindow)
        self.statusbar.setObjectName("statusbar")
        MainWindow.setStatusBar(self.statusbar)
        self.actionsocket_connet = QtWidgets.QAction(MainWindow)
        self.actionsocket_connet.setObjectName("actionsocket_connet")
        self.actionChinese = QtWidgets.QAction(MainWindow)
        self.actionChinese.setObjectName("actionChinese")
        self.actionip_socket_gethostbyname_socket_gethostname = QtWidgets.QAction(MainWindow)
        self.actionip_socket_gethostbyname_socket_gethostname.setObjectName("actionip_socket_gethostbyname_socket_gethostname")
        self.menulanguage.addSeparator()
        self.menulanguage.addAction(self.actionChinese)
        self.menu.addSeparator()
        self.menu.addAction(self.menulanguage.menuAction())
        self.menu.addAction(self.actionip_socket_gethostbyname_socket_gethostname)
        self.menubar.addAction(self.menu.menuAction())
        self.socket_peername = s.addr[0]
        self.retranslateUi(MainWindow)
        QtCore.QMetaObject.connectSlotsByName(MainWindow)
        self.MainWindow = MainWindow
        self.pushButton.clicked.connect(self.send)
    @to_logging
    def sendmsg(self):
        data = self.textEdit.toPlainText().strip()
        if data:
            s.socket.send(data.encode('utf8'));self.textEdit.clear()
    @to_logging
    def send(self, i):
        if hasattr(s,"_connect") and s._connect:
            if not self.sendmsg():
                QtWidgets.QMessageBox.information(self.MainWindow, 'TraceBack',f'Socket Server<{self.socket_peername}> 断开连接')
                s._connect = False
        else:
            self.Show_Message("<font color='red'>发送异常. 未连接至服务器.请点击[重新连接服务器]按钮尝试重新连接.</font>")
    def retranslateUi(self, MainWindow):
        _translate = QtCore.QCoreApplication.translate
        MainWindow.setWindowTitle(_translate("MainWindow", "Socket"))
        self.lineEdit_2.setText(socket.gethostname())
        self.lineEdit.setText(socket.gethostbyname(socket.gethostname()))
        self.lineEdit_3.setText(self.socket_peername)
        self.pushButton.setText(_translate("MainWindow", "send"))
        self.label_2.setText(_translate("MainWindow", "主机名:"))
        self.label.setText(_translate("MainWindow", "本地端口:"))
        self.label_3.setText(_translate("MainWindow", "连接端口:"))
        self.pushButton_2.setText(_translate("MainWindow", "重新连接服务器"))
        self.menu.setTitle(_translate("MainWindow", "设置"))
        self.menulanguage.setTitle(_translate("MainWindow", "language"))
        self.actionsocket_connet.setText(_translate("MainWindow", "socket connect"))
        self.actionChinese.setText(_translate("MainWindow", "Chinese"))
        self.actionip_socket_gethostbyname_socket_gethostname.setText(_translate("MainWindow", "ip: " socket.gethostbyname(socket.gethostname()) ))

        s.set_func(self.Show_Message)
        s.run()
    def show(self):
        self.MainWindow.show()
    def Show_Message(self, data):
        if data:
            for i in data.split('n'):
                sleep(0.2 / len(data.split('n'))) #防止信息过快使Textedit刷新空白
                self.textEdit_2.append(i)
                print(i)
        self.textEdit_2.moveCursor(QtGui.QTextCursor.End)
if __name__ == "__main__":
        app = QtWidgets.QApplication(sys.argv)
        s = Socket()
        conn = Ui_Dialog()
        sys.exit(app.exec_())

服务端:

server.py

代码语言:javascript复制
import socket # 导入 socket 模块
from threading import Thread
import logging
from color import Text, Background, Print
__version__ = 1.1
def threading(Daemon, **kwargs):
        thread = Thread(**kwargs)
        thread.setDaemon(Daemon)
        thread.start()
        return thread




logger = logging.getLogger(__name__)
logger.setLevel(level = logging.INFO)
handler = logging.FileHandler("log.txt")
handler.setLevel(logging.INFO)
handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
logger.addHandler(handler)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logger.addHandler(handler)
logger.addHandler(console)
bytecount = 1024
class Command_Handler(object):
    def __init__(self, bind=None):
        """Bind Server class"""
        self.bind = bind
    def _function(self, _list, client):
        
        data = {"/info" : {"-v": self.get_version(),
                           "-id" : self.get_id(client),
                           "-i" : self.info(),
                           "-h" : self.help()},
                }
        _dict = data
        for n in range(len(_list)):
            if type(_dict) == dict:
                _dict = _dict.get(_list[n],  self.unknown(" ".join(_list)))
            else:
                break
        if  type(_dict) == dict:
             _dict = "Error:n<font color='blue'>This command must take more arguments. Such as %s.</font>" % list(_dict.keys()) 
        return _dict
    @staticmethod
    def help():
        return """/info [-v] [-id] [-i]
-v : get version of program.
-id : get your id.
-i : get information.
-h : help.
For example, <font color=red>/info -id</font>"""
    @staticmethod
    def get_version():
        return "version : "   str(__version__)
    def get_id(self, client):
        return "Your id is {}.".format(id(client))
    
    def info(self):
        return f"Socket Server[version {self.get_version()}] By zmh."
    def unknown(self,s):
        return """Error:
No command named "%s". Please search [/info -h] to help.
%s""" % (s, self.help())
    def cut(self, string):
        return string.strip().split()
    def handler(self, c, client=None):
        return "<font color='gray'>[command] </font><font color='brown'>%s</font>n%s"  % (c,str(self._function(self.cut(c),client)))
    def iscommand(self,i):
        return i.strip().startswith("/")
class Server(object):
    join_message = "<font color='red'>Server></font> <font color='blue'>%s</font> 连接服务器. 当前在线人数: <font color='red'>%s</font>"
    user_message = "<font color='%s'>%s(%s端)%s></font> %s"
    quit_message = "%s 下线了, %s"
    def __init__(self, addr, port, backlog = 10, encode = 'utf8'):
        self.address = addr, port
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.bind(self.address)
        self.socket.listen(backlog)
        self.connect = {}
        self.com = Command_Handler(self)
        self.encode = encode
        self.errs = []
        self.errduring = False
    def run(self):
        logger.info("服务器[%s]开启,端口[%s]" % (socket.gethostbyname(socket.gethostname()), self.address[1]))
        self.accept_client()
    def _get_sockets(self): #return int
        return len(self.connect.items())
    def _str_sockets(self):
        return f"当前人数 {self._get_sockets()}"
    def send(self, sock, user, mes):
        @self.QUIT(user)
        def func():
            sock.sendall(mes.encode(self.encode))
        return func()
    def QUIT(self, user):
        sock = list(self.connect.keys())[list(self.connect.values()).index(user)]
        def prop(command, *args, **kwargs):
            def logs(*args, **kwargs):
                try:
                    command(*args, **kwargs)
                    return True
                except:
                    logging.exception(str())
                    if self.errduring is False:
                        self.errs.append((sock,user)) #在遍历时列表增加 -> 触发RuntimeError
                    return False
            return logs
        return prop
    def ServerMessage(self, mes, inc=True):
        self.errduring = True
        for sock, user in self.connect.items():
            if not sock in self.errs:
                self.send(sock, user, mes)
        if inc:
            self.error_handle()
        self.errduring = False
    def UserMessage(self, address, _user, mes,inc=True):
        if not mes:
            return
        self.errduring = True
        for sock, user in self.connect.items():
            if not sock in self.errs:
                send_message = Server.user_message % ("brown" if _user == user else "red",
                                                      _user,
                                                      address,
                                                      "(我自己)" if _user == user else "",
                                                      mes)
                self.send(sock, user, send_message)
        logger.info(f"{address}[{_user}] : {mes}")
        if inc:
            self.error_handle()
        self.errduring = False
    def error_handle(self):
        for _, user in self.errs: #使用变量_ : 为了防止触发KeyError: <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>
            if user in list(self.connect.values()):
                sock = list(self.connect.keys())[list(self.connect.values()).index(user)]
                logger.exception(msg = str())
                Q = Server.quit_message % (user, self._str_sockets())
                logger.info(Q)
                self.connect.pop(sock) 
                self.ServerMessage(Q, False) #防止多次调用error_handle函数
                sock.close()
    def accept_client(self):
      while True:
        client, address = self.socket.accept() # 阻塞,等待客户端连接
        thread = threading(Daemon=True, target=self.message_handle, args=(client,address[0]))

    def message_handle(self,client,address):
        username = client.recv(1024).decode(encoding='utf8')
        self.connect[client] = username
        logger.info(f"{address}[{username}] 加入服务器 , "   self._str_sockets())
        self.ServerMessage(Server.join_message % (address, self._get_sockets()) )
        @self.QUIT(username)
        def update():
            string = client.recv(bytecount **2).decode(encoding="utf8")
            Print(Text.BLUE,f"{username}>{string}")
            if self.com.iscommand(string):
                self.send(client, username, self.com.handler(string))
            else:
                self.UserMessage(address,username,string)
        while True:
            status = update()
            if not status:
                break
def get_host_ip() -> str:
    """get current IP address"""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 80))
        ip = s.getsockname()[0]
    finally:
        s.close()
    return ip

server = Server(get_host_ip(),429)
server.run()
sys.exit(app.exec_())

color.py

代码语言:javascript复制
import ctypes,time
  
STD_INPUT_HANDLE = -10  
STD_OUTPUT_HANDLE= -11  
STD_ERROR_HANDLE = -12
std_out_handle=ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
class Text():
    DARKBLUE = 0x01 # 暗蓝色
    DARKGREEN = 0x02 # 暗绿色
    DARKSKYBLUE = 0x03 # 暗天蓝色
    DARKRED = 0x04 # 暗红色
    DARKPINK = 0x05 # 暗粉红色
    DARKYELLOW = 0x06 # 暗黄色
    DARKWHITE = 0x07 # 暗白色
    DARKGRAY = 0x08 # 暗灰色/亮
    BLUE = 0x09 # 蓝色
    GREEN = 0x0a # 绿色
    SKYBLUE = 0x0b # 天蓝色
    RED = 0x0c # 红色
    PINK = 0x0d # 粉红色
    YELLOW = 0x0e # 黄色
    WHITE = 0x0f # 白色

class Background():
    BLUE     = 0x10 # 蓝
    GREEN    = 0x20 # 绿
    RED      = 0x40  # 红
    INTENSITY = 0x80 # 亮
 
def SetCmdColor(color, handle=std_out_handle):
    Bool=ctypes.windll.kernel32.SetConsoleTextAttribute(handle, color)
    return Bool
 
def Print(color:int,*args, **kwargs):
    SetCmdColor(color)
    print(*args, **kwargs)
    SetCmdColor(Text.DARKWHITE)

0 人点赞