TCP聊天服务器套接字v1.6
所有版本记录:
v1.0
: TCP聊天服务器套接字|PyQt5 socket(TCP端口映射 端口放行) logging Thread(含日志,html) anaconda打包32位exe(3.4万字)|python高阶v1.1
: python TCP套接字服务器v1.1-新增服务端命令功能及修改bug(socket PyQt5)v1.2
: python TCP服务器v1.2 - 服务端新增用户登录注册(json, md5加密)v1.3
: python TCP服务器v1.3 - 服务器抗压测试及关闭套接字处理v1.4
: python TCP服务器v1.4 - 客户端连接服务器异常(异常情况分类)处理v1.5
: PyQt5可编辑下拉框(comboBox):editable - python TCP服务器v1.5 - 客户端连接界面增加自定义参数(设置超时, 连接地址可选)
文章目录
- | 多线程threading 与 多进程multiprocess
- | signal
- | 使用
-
- |全部代码
-
- 最后, 公布大家一个事,v1.7会带来服务端的GUI界面(
PyQt5
)!
- 最后, 公布大家一个事,v1.7会带来服务端的GUI界面(
| 多线程threading 与 多进程multiprocess
线程
:所有的线程运行在同一个进程中,共享相同的运行环境。每个独立的线程有一个程序入口,顺序执行序列和程序的出口
python对线程的支持并不是非常好(GIL),所以你可以在很多文章上批评python的多线程的弊端
GIL作为解释器的一个Bug一样的存在,我们也有一定的解决方法,用Ctype
绕过解释器是我们一般的解决方法,主要用multiprocessing
来绕过多线程的瓶颈。并且退出线程很麻烦.
进程
:程序的一次执行(程序载入内存,系统分配资源运行)。每个进程有自己的内存空间,数据栈等,进程之间可以进行通讯,但是不能共享信息。退出进程模块中有一个函数Process.terminate()
可以直接调用
| signal
Python中要捕获信号,需要signal包来处理。
几个常用信号:
SIGINT 终止进程 中断进程 (control c)
SIGTERM 终止进程 软件终止信号
SIGKILL 终止进程 杀死进程
SIGALRM 闹钟信号
| 使用
代码语言:javascript复制from signal import SIGINT, signal
import multiprocessing
代码语言:javascript复制if __name__ == "__main__":
server = Server("127.0.0.1", 429)
process = multiprocessing.Process(target=server.run)
def quit(signum, frame):
if process.is_alive():
process.terminate()
signal(SIGINT, quit)
print("[i]Press Ctrl-c to shutdown the server.")
process.start()
while True:
if not process.is_alive():
logger.info("Server shutdown complete.")
break
os.system("pause")
感觉这次更新内容不怎么多
|全部代码
server.py
内个data.py就不传了
代码语言:javascript复制import os
import socket # 导入 socket 模块
from threading import Thread
import logging
# from color import Text, Background, Print
from signal import SIGINT, signal
import data
import multiprocessing
import time
__version__ = 1.6
def threading(Daemon, **kwargs):
thread = Thread(**kwargs)
thread.setDaemon(Daemon)
thread.start()
return thread
def ignore(function):
def i(*args, **kwargs):
try:
function(*args, **kwargs)
except:
return
return i
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
handler = logging.FileHandler("log.txt")
handler.setLevel(logging.INFO)
handler.setFormatter(logging.Formatter("[%(asctime)s(%(levelname)s)]: %(message)s"))
logger.addHandler(handler)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(logging.Formatter("[%(asctime)s(%(levelname)s)]: %(message)s"))
logger.addHandler(handler)
logger.addHandler(console)
bytecount = 1024
class Command_Handler(object):
def __init__(self, bind):
"""Bind Client class"""
assert isinstance(bind, Client)
self.client = bind
def _function(self, _list):
data = {"/info": {"-v": self.get_version(),
"-id": self.get_id(),
"-i": self.info(),
"-h": self.help(),
"-name": self.name()},
}
_dict = data
for n in range(len(_list)):
if type(_dict) == dict:
_dict = _dict.get(_list[n], self.unknown(" ".join(_list)))
else:
break
if type(_dict) == dict:
_dict = "Error:n<font color='blue'>This command must take more arguments. Such as %s.</font>" % list(
_dict.keys())
return _dict
@staticmethod
def help():
return """/info [-v] [-id] [-i]
-v : get version of program.
-id : get your id.
-i : get information.
-h : help.
-name : get your name
For example, <font color=red>/info -id</font>"""
@staticmethod
def get_version():
return "version : " str(__version__)
def get_id(self):
return "Your id is {}.".format(id(self.client))
def name(self):
return "Your name is {}.".format(self.client.username)
def info(self):
return f"Socket Server[version {self.get_version()}] By zmh."
def unknown(self, s):
return """Error:
No command named "%s". Please search [/info -h] to help.
%s""" % (s, self.help())
def cut(self, string):
return string.strip().split()
def handler(self, c):
return "<font color='gray'>[command]</font><font color='brown'>%s</font>n%s" % (
c, str(self._function(self.cut(c))))
def iscommand(self, i):
return i.strip().startswith("/")
class Server(object):
join_message = "<font color='red'>Server></font> <font color='blue'>%s(%s)</font> 连接服务器. 当前在线人数: <font color='red'>%s</font>"
user_message = "<font color='%s'>%s(%s)%s></font> %s"
quit_message = "%s(%s) 下线了, %s"
def __init__(self, addr, port, backlog=10, encode='utf8'):
self.address = addr, port
self.backlog = backlog
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind(self.address)
self.socket.listen(backlog)
self.connect = []
self.encode = encode
self.user_record = data.user()
def clear_socket(self, clear_ms = 500):
logger.info(f"Clear the closed socket once every {clear_ms} ms.")
while True:
del_list = list(filter(lambda c: hasattr(c, 'Quitted') or (not c.isOpen()), self.connect))
for user in del_list:
self.connect.remove(user)
#if del_list:
# logger.info(f"Clear the closed client socket, number is {len(del_list)}.")
#else:
# logger.info('None of the sockets have been cleaned.')
time.sleep(clear_ms / 1000)
def run(self):
logger.debug(f"pid {os.getpid()}.")
logger.info(f"Server [{':'.join(map(lambda i: str(i), self.address))}] on.")
logger.info("Backlog number: " str(self.backlog))
logger.info('The CODEC is sent as ' self.encode)
threading(Daemon=True, target=self.clear_socket)
self.accept_client()
def _get_Clients(self) -> list:
def func(c):
return c.__filter__()
return list(filter(func, self.connect))
def _get_sockets(self): # return int
return len(self._get_Clients())
def _str_sockets(self):
return f"当前人数 {self._get_sockets()}"
def ServerMessage(self, mes, inc=True):
for user in self._get_Clients():
if user.__filter__():
user._send(mes)
def UserMessage(self, address, _user, mes, inc=True):
if not mes:
return
for user in self.connect:
if user.__filter__():
username = user.username
send_message = Server.user_message % ("brown" if _user == username else "red",
_user,
address,
"(我自己)" if _user == username else "",
mes)
user._send(send_message)
logger.info(f"{address}[{_user}] : {mes}")
def error_handle(self):
for user in filter(lambda c: not c.isOpen(), self.connect):
print(user)
self.connect.remove(user)
def accept_client(self):
while True:
logger.info("The server is listening on the port.")
client, address = self.socket.accept() # 阻塞,等待客户端连接
NewClient = Client(client, address[0], self)
self.connect.append(NewClient)
NewClient.run()
logger.info(f'The address {address[0]} is connected to the server')
def quit(self, username, address):
QuitMessage = Server.quit_message % (username, address, self._str_sockets())
logger.info(QuitMessage)
self.ServerMessage(QuitMessage, False)
def login(self, username, address):
logger.info(f"{address}[{username}] 登录服务器 , " self._str_sockets())
self.ServerMessage(Server.join_message % (username, address, self._get_sockets()))
class Client(object):
class QuitError(Exception):
def __init__(self, *args):
super().__init__(*args)
def __init__(self, socket, addr, server: Server):
self.socket = socket
self.addr = addr
if not isinstance(server, Server):
raise ValueError
self.server = server
self.encode = self.server.encode
self.com = Command_Handler(self)
@self.error
def _recv(self) -> bytes:
return self.socket.recv(bytecount ** 2).decode(encoding=self.encode).strip()
self._recv = lambda: _recv(self)
@self.error
def _send(self, message=str()) -> None:
self.socket.sendall(message.encode(self.encode))
self._send = lambda m: _send(self, m)
def __del__(self):
self.socket.close()
def isLogin(self) -> bool:
return hasattr(self, "_login") and self._login
def isOpen(self) -> bool:
return not getattr(self.socket, "_closed", True)
def __filter__(self) -> bool:
"""返回是否在线并已可接受消息"""
return self.isLogin() and self.isOpen()
def recv(self) -> str:
data = self._recv()
while not data:
data = self._recv()
# while not (data := self._recv()):
# pass
# 我的PythonIDE是3.8, PyCharm是3.7(anaconda 32x),而赋值表达式是3.8加进来的.
return data
@ignore
def login(self):
self._send(f'<font color="red">欢迎来到服务器[{self.server.address[0]}].您的ip地址为{self.socket.getpeername()[0]}')
self.username = self.recv()[:8]
if self.server.user_record.__in__(self.username):
self._send("<font color='red'>请输入您的密码: (右下[send]键发送)</font>")
i = self.recv()
if self.server.user_record.handler(self.username, i):
self._send(f'<font color="green">欢迎回来, {self.username}.</font>')
else:
self._send('<font color="red">密码错误,请重试.</font>')
self.__del__()
else:
def normal(string):
return (4 <= len(string) <= 10) and not ('n' in string)
while True:
self._send(
"<font color='blue'>[i]提示: 密码需在4 ~ 10位之间, 且不能换行.</font>n<font color='red'>请输入您的密码: (右下[send]键发送)</font>")
p1 = self.recv()
if normal(p1):
break
while True:
self._send("<font color='red'>再次输入您的密码: (右下[send]键发送)</font>")
p2 = self.recv()
if p1 == p2:
break
else:
self._send("<font color='red'>密码与前次不符!</font>")
self.server.user_record.handler(self.username, p1)
self._send(f'初来乍到, {self.username}')
self._login = True
self.server.login(self.username, self.addr)
def quit(self) -> None:
if hasattr(self, 'Quitted'):
return
self.Quitted = True
if self.isOpen() is True:
self.socket.close()
self.server.quit(self.username, self.addr)
@ignore
def forever_receive(self):
self.login()
while self.__filter__():
string = self.recv()
if string == Client.QuitError:
return
if self.com.iscommand(string):
self._send(self.com.handler(string))
else:
self.server.UserMessage(self.addr, self.username, string)
def error(self, func):
def function(*args, **kwargs):
try:
res = func(*args, **kwargs)
return res
except ConnectionAbortedError as e:
self.quit()
except Exception:
logger.exception("error")
return Client.QuitError
return function
def run(self):
self.thread = threading(True, target=self.forever_receive)
def get_host_ip() -> str:
"""get current IP address"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
if __name__ == "__main__":
server = Server("127.0.0.1", 429)
process = multiprocessing.Process(target=server.run)
def quit(signum, frame):
if process.is_alive():
process.terminate()
signal(SIGINT, quit)
print("[i]Press Ctrl-c to shutdown the server.")
process.start()
while True:
if not process.is_alive():
logger.info("Server shutdown complete.")
break
os.system("pause")