Python TCP服务器v1.6 - multiprocessing多进程及Ctrl-c(SIGINT)退出

2023-02-06 09:56:27 浏览数 (2)

TCP聊天服务器套接字v1.6

所有版本记录: v1.0 : TCP聊天服务器套接字|PyQt5 socket(TCP端口映射 端口放行) logging Thread(含日志,html) anaconda打包32位exe(3.4万字)|python高阶 v1.1 : python TCP套接字服务器v1.1-新增服务端命令功能及修改bug(socket PyQt5) v1.2 : python TCP服务器v1.2 - 服务端新增用户登录注册(json, md5加密) v1.3 : python TCP服务器v1.3 - 服务器抗压测试及关闭套接字处理 v1.4 : python TCP服务器v1.4 - 客户端连接服务器异常(异常情况分类)处理 v1.5 : PyQt5可编辑下拉框(comboBox):editable - python TCP服务器v1.5 - 客户端连接界面增加自定义参数(设置超时, 连接地址可选)

文章目录

  • | 多线程threading 与 多进程multiprocess
  • | signal
  • | 使用
    • |全部代码
      • 最后, 公布大家一个事,v1.7会带来服务端的GUI界面(PyQt5)!

| 多线程threading 与 多进程multiprocess

线程:所有的线程运行在同一个进程中,共享相同的运行环境。每个独立的线程有一个程序入口,顺序执行序列和程序的出口

python对线程的支持并不是非常好(GIL),所以你可以在很多文章上批评python的多线程的弊端

GIL作为解释器的一个Bug一样的存在,我们也有一定的解决方法,用Ctype绕过解释器是我们一般的解决方法,主要用multiprocessing来绕过多线程的瓶颈。并且退出线程很麻烦.

进程:程序的一次执行(程序载入内存,系统分配资源运行)。每个进程有自己的内存空间,数据栈等,进程之间可以进行通讯,但是不能共享信息。退出进程模块中有一个函数Process.terminate()可以直接调用

| signal

Python中要捕获信号,需要signal包来处理。

几个常用信号:

SIGINT 终止进程 中断进程 (control c)

SIGTERM 终止进程 软件终止信号

SIGKILL 终止进程 杀死进程

SIGALRM 闹钟信号

| 使用

代码语言:javascript复制
from  signal import SIGINT, signal
import multiprocessing
代码语言:javascript复制
if __name__ == "__main__":
    server = Server("127.0.0.1", 429)
    process = multiprocessing.Process(target=server.run)
    def quit(signum, frame):
        if process.is_alive():
            process.terminate()

    signal(SIGINT, quit)
    print("[i]Press Ctrl-c to shutdown the server.")
    process.start()
    while True:
        if not process.is_alive():
            logger.info("Server shutdown complete.")
            break
    os.system("pause")

感觉这次更新内容不怎么多

|全部代码

server.py

内个data.py就不传了

代码语言:javascript复制
import os
import socket  # 导入 socket 模块
from threading import Thread
import logging
# from color import Text, Background, Print
from  signal import SIGINT, signal
import data
import multiprocessing
import time
__version__ = 1.6


def threading(Daemon, **kwargs):
    thread = Thread(**kwargs)
    thread.setDaemon(Daemon)
    thread.start()
    return thread


def ignore(function):
    def i(*args, **kwargs):
        try:
            function(*args, **kwargs)
        except:
            return

    return i


logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
handler = logging.FileHandler("log.txt")
handler.setLevel(logging.INFO)
handler.setFormatter(logging.Formatter("[%(asctime)s(%(levelname)s)]:  %(message)s"))
logger.addHandler(handler)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(logging.Formatter("[%(asctime)s(%(levelname)s)]: %(message)s"))
logger.addHandler(handler)
logger.addHandler(console)
bytecount = 1024


class Command_Handler(object):
    def __init__(self, bind):
        """Bind Client class"""
        assert isinstance(bind, Client)
        self.client = bind

    def _function(self, _list):

        data = {"/info": {"-v": self.get_version(),
                          "-id": self.get_id(),
                          "-i": self.info(),
                          "-h": self.help(),
                          "-name": self.name()},
                }
        _dict = data
        for n in range(len(_list)):
            if type(_dict) == dict:
                _dict = _dict.get(_list[n], self.unknown(" ".join(_list)))
            else:
                break
        if type(_dict) == dict:
            _dict = "Error:n<font color='blue'>This command must take more arguments. Such as %s.</font>" % list(
                _dict.keys())
        return _dict

    @staticmethod
    def help():
        return """/info [-v] [-id] [-i]
-v : get version of program.
-id : get your id.
-i : get information.
-h : help.
-name : get your name
For example, <font color=red>/info -id</font>"""

    @staticmethod
    def get_version():
        return "version : "   str(__version__)

    def get_id(self):
        return "Your id is {}.".format(id(self.client))

    def name(self):
        return "Your name is {}.".format(self.client.username)

    def info(self):
        return f"Socket Server[version {self.get_version()}] By zmh."

    def unknown(self, s):
        return """Error:
No command named "%s". Please search [/info -h] to help.
%s""" % (s, self.help())

    def cut(self, string):
        return string.strip().split()

    def handler(self, c):
        return "<font color='gray'>[command]</font><font color='brown'>%s</font>n%s" % (
        c, str(self._function(self.cut(c))))

    def iscommand(self, i):
        return i.strip().startswith("/")


class Server(object):
    join_message = "<font color='red'>Server></font> <font color='blue'>%s(%s)</font> 连接服务器. 当前在线人数: <font color='red'>%s</font>"
    user_message = "<font color='%s'>%s(%s)%s></font> %s"
    quit_message = "%s(%s) 下线了, %s"

    def __init__(self, addr, port, backlog=10, encode='utf8'):
        self.address = addr, port
        self.backlog = backlog
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.bind(self.address)
        self.socket.listen(backlog)
        self.connect = []
        self.encode = encode
        self.user_record = data.user()

    def clear_socket(self, clear_ms = 500):
        logger.info(f"Clear the closed socket once every {clear_ms} ms.")
        while True:
            del_list = list(filter(lambda c: hasattr(c, 'Quitted') or (not c.isOpen()), self.connect))
            for user in del_list:
                self.connect.remove(user)
            #if del_list:
            #   logger.info(f"Clear the closed client socket, number is {len(del_list)}.")
            #else:
            #   logger.info('None of the sockets have been cleaned.')
            time.sleep(clear_ms / 1000)

    def run(self):
        logger.debug(f"pid {os.getpid()}.")
        logger.info(f"Server [{':'.join(map(lambda i: str(i), self.address))}] on.")
        logger.info("Backlog number: "   str(self.backlog))
        logger.info('The CODEC is sent as '   self.encode)
        threading(Daemon=True, target=self.clear_socket)
        self.accept_client()

    def _get_Clients(self) -> list:
        def func(c):
            return c.__filter__()

        return list(filter(func, self.connect))

    def _get_sockets(self):  # return int
        return len(self._get_Clients())

    def _str_sockets(self):
        return f"当前人数 {self._get_sockets()}"

    def ServerMessage(self, mes, inc=True):
        for user in self._get_Clients():
            if user.__filter__():
                user._send(mes)

    def UserMessage(self, address, _user, mes, inc=True):
        if not mes:
            return
        for user in self.connect:
            if user.__filter__():
                username = user.username
                send_message = Server.user_message % ("brown" if _user == username else "red",
                                                      _user,
                                                      address,
                                                      "(我自己)" if _user == username else "",
                                                      mes)
                user._send(send_message)
        logger.info(f"{address}[{_user}] : {mes}")

    def error_handle(self):
        for user in filter(lambda c: not c.isOpen(), self.connect):
            print(user)
            self.connect.remove(user)

    def accept_client(self):
        while True:
            logger.info("The server is listening on the port.")
            client, address = self.socket.accept()  # 阻塞,等待客户端连接
            NewClient = Client(client, address[0], self)
            self.connect.append(NewClient)
            NewClient.run()
            logger.info(f'The address {address[0]} is connected to the server')

    def quit(self, username, address):
        QuitMessage = Server.quit_message % (username, address, self._str_sockets())
        logger.info(QuitMessage)
        self.ServerMessage(QuitMessage, False)

    def login(self, username, address):
        logger.info(f"{address}[{username}] 登录服务器 , "   self._str_sockets())
        self.ServerMessage(Server.join_message % (username, address, self._get_sockets()))


class Client(object):
    class QuitError(Exception):
        def __init__(self, *args):
            super().__init__(*args)

    def __init__(self, socket, addr, server: Server):
        self.socket = socket
        self.addr = addr
        if not isinstance(server, Server):
            raise ValueError
        self.server = server
        self.encode = self.server.encode

        self.com = Command_Handler(self)

        @self.error
        def _recv(self) -> bytes:
            return self.socket.recv(bytecount ** 2).decode(encoding=self.encode).strip()

        self._recv = lambda: _recv(self)

        @self.error
        def _send(self, message=str()) -> None:
            self.socket.sendall(message.encode(self.encode))

        self._send = lambda m: _send(self, m)

    def __del__(self):
        self.socket.close()

    def isLogin(self) -> bool:
        return hasattr(self, "_login") and self._login

    def isOpen(self) -> bool:
        return not getattr(self.socket, "_closed", True)

    def __filter__(self) -> bool:
        """返回是否在线并已可接受消息"""
        return self.isLogin() and self.isOpen()

    def recv(self) -> str:
        data = self._recv()
        while not data:
            data = self._recv()
        # while not (data := self._recv()):
        #   pass
        # 我的PythonIDE是3.8, PyCharm是3.7(anaconda 32x),而赋值表达式是3.8加进来的.
        return data

    @ignore
    def login(self):
        self._send(f'<font color="red">欢迎来到服务器[{self.server.address[0]}].您的ip地址为{self.socket.getpeername()[0]}')
        self.username = self.recv()[:8]
        if self.server.user_record.__in__(self.username):
            self._send("<font color='red'>请输入您的密码: (右下[send]键发送)</font>")
            i = self.recv()
            if self.server.user_record.handler(self.username, i):
                self._send(f'<font color="green">欢迎回来, {self.username}.</font>')
            else:
                self._send('<font color="red">密码错误,请重试.</font>')
                self.__del__()
        else:
            def normal(string):
                return (4 <= len(string) <= 10) and not ('n' in string)

            while True:
                self._send(
                    "<font color='blue'>[i]提示: 密码需在4 ~ 10位之间, 且不能换行.</font>n<font color='red'>请输入您的密码: (右下[send]键发送)</font>")
                p1 = self.recv()
                if normal(p1):
                    break
            while True:
                self._send("<font color='red'>再次输入您的密码: (右下[send]键发送)</font>")
                p2 = self.recv()
                if p1 == p2:
                    break
                else:
                    self._send("<font color='red'>密码与前次不符!</font>")
            self.server.user_record.handler(self.username, p1)
            self._send(f'初来乍到, {self.username}')
        self._login = True
        self.server.login(self.username, self.addr)

    def quit(self) -> None:
        if hasattr(self, 'Quitted'):
            return
        self.Quitted = True
        if self.isOpen() is True:
            self.socket.close()
        self.server.quit(self.username, self.addr)


    @ignore
    def forever_receive(self):
        self.login()
        while self.__filter__():
            string = self.recv()
            if string == Client.QuitError:
                return
            if self.com.iscommand(string):
                self._send(self.com.handler(string))
            else:
                self.server.UserMessage(self.addr, self.username, string)

    def error(self, func):
        def function(*args, **kwargs):
            try:
                res = func(*args, **kwargs)
                return res
            except ConnectionAbortedError as e:
                self.quit()
            except Exception:
                logger.exception("error")
                return Client.QuitError

        return function

    def run(self):
        self.thread = threading(True, target=self.forever_receive)


def get_host_ip() -> str:
    """get current IP address"""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 80))
        ip = s.getsockname()[0]
    finally:
        s.close()
    return ip

if __name__ == "__main__":
    server = Server("127.0.0.1", 429)
    process = multiprocessing.Process(target=server.run)
    def quit(signum, frame):
        if process.is_alive():
            process.terminate()

    signal(SIGINT, quit)
    print("[i]Press Ctrl-c to shutdown the server.")
    process.start()
    while True:
        if not process.is_alive():
            logger.info("Server shutdown complete.")
            break
    os.system("pause")

最后, 公布大家一个事,v1.7会带来服务端的GUI界面(PyQt5)!

0 人点赞