hello 大家好 通过前几期我们已经聊了 Tornado 中协程的创建、运行,本期我们开始聊聊 tornado 中 网络读写数据处理相关的内容,这部分还是比较复杂的我们打算拆分成几期来聊。
本期我们先聊聊大体的概念性质的和整体部分代码的概况。tornado.iostream 模块提供了一个向非阻塞文件或者socket句柄写数据或者读数据的工具,其中 BaseIOStream 用于读写的通用接口,SSLIOStream 是IOStream 的SSl 封装版本,PipeIOStream 是基于管道的IOStream 实现的。
我们先看一下我整理过后的这部分的核心代码逻辑:
代码语言:javascript复制# -*- encoding: utf-8 -*-
# !/usr/bin/python
"""
@File : BaseIOStream_Learn.py
@Time : 2020/09/12 15:24
@Author : haishiniu
@Software: PyCharm
"""
import numbers
import socket
import sys
import errno
from tornado import ioloop, stack_context
from tornado.concurrent import TracebackFuture
from tornado.iostream import UnsatisfiableReadError, StreamBufferFullError
from tornado.log import app_log, gen_log
from tornado.util import errno_from_exception
class BaseIOStream(object):
def __init__(self, io_loop=None, max_buffer_size=None,
read_chunk_size=None, max_write_buffer_size=None):
self.io_loop = io_loop or ioloop.IOLoop.current()
self.max_buffer_size = max_buffer_size or 104857600
self.read_chunk_size = min(read_chunk_size or 65536,
self.max_buffer_size // 2)
self._read_buffer = bytearray()
self._read_buffer_pos = 0
self._read_buffer_size = 0
self._read_bytes = None
self._read_callback = None
self._read_future = None
self._state = None
def read_bytes(self, num_bytes, callback=None, streaming_callback=None,
partial=False):
future = self._set_read_callback(callback)
assert isinstance(num_bytes, numbers.Integral)
self._read_bytes = num_bytes
self._read_partial = partial
self._streaming_callback = stack_context.wrap(streaming_callback)
try:
self._try_inline_read()
except:
if future is not None:
future.add_done_callback(lambda f: f.exception())
raise
return future
def _set_read_callback(self, callback):
assert self._read_callback is None, "Already reading"
assert self._read_future is None, "Already reading"
if callback is not None:
self._read_callback = stack_context.wrap(callback)
else:
self._read_future = TracebackFuture()
return self._read_future
def _try_inline_read(self):
self._run_streaming_callback()
pos = self._find_read_pos()
if pos is not None:
self._read_from_buffer(pos)
return
self._check_closed()
try:
pos = self._read_to_buffer_loop()
except Exception:
self._maybe_run_close_callback()
raise
if pos is not None:
self._read_from_buffer(pos)
return
if self.closed():
self._maybe_run_close_callback()
else:
self._add_io_state(ioloop.IOLoop.READ)
def _find_read_pos(self):
if (self._read_bytes is not None and (self._read_buffer_size >= self._read_bytes or (self._read_partial and self._read_buffer_size > 0))):
num_bytes = min(self._read_bytes, self._read_buffer_size)
return num_bytes
return None
def _read_from_buffer(self, pos):
self._read_bytes = self._read_delimiter = self._read_regex = None
self._read_partial = False
self._run_read_callback(pos, False)
def _run_read_callback(self, size, streaming):
if 1:
pass
else:
callback = self._read_callback
self._read_callback = self._streaming_callback = None
if self._read_future is not None:
assert callback is None
self._read_future = None
future.set_result(self._consume(size))
if callback is not None:
assert (self._read_future is None) or streaming
self._run_callback(callback, self._consume(size))
else:
self._maybe_add_error_listener()
def _run_callback(self, callback, *args):
def wrapper():
self._pending_callbacks -= 1
try:
return callback(*args)
except Exception:
app_log.error("Uncaught exception, closing connection.",
exc_info=True)
self.close(exc_info=True)
raise
finally:
self._maybe_add_error_listener()
with stack_context.NullContext():
self._pending_callbacks = 1
self.io_loop.add_callback(wrapper)
def _maybe_add_error_listener(self):
if self._pending_callbacks != 0:
return
if self._state is None or self._state == ioloop.IOLoop.ERROR:
if self.closed():
self._maybe_run_close_callback()
elif (self._read_buffer_size == 0 and
self._close_callback is not None):
self._add_io_state(ioloop.IOLoop.READ)
def _consume(self, loc):
if loc == 0:
return b""
assert loc <= self._read_buffer_size
b = (memoryview(self._read_buffer)
[self._read_buffer_pos:self._read_buffer_pos loc]
).tobytes()
self._read_buffer_pos = loc
self._read_buffer_size -= loc
if self._read_buffer_pos > self._read_buffer_size:
del self._read_buffer[:self._read_buffer_pos]
self._read_buffer_pos = 0
return b
def _add_io_state(self, state):
if self.closed():
# connection has been closed, so there can be no future events
return
if self._state is None:
self._state = ioloop.IOLoop.ERROR | state
with stack_context.NullContext():
self.io_loop.add_handler(
self.fileno(), self._handle_events, self._state)
elif not self._state & state:
self._state = self._state | state
self.io_loop.update_handler(self.fileno(), self._state)
def _read_to_buffer_loop(self):
try:
if self._read_bytes is not None:
target_bytes = self._read_bytes
else:
target_bytes = 0
next_find_pos = 0
self._pending_callbacks = 1
while not self.closed():
if self._read_to_buffer() == 0:
break
self._run_streaming_callback()
if (target_bytes is not None and
self._read_buffer_size >= target_bytes):
break
if self._read_buffer_size >= next_find_pos:
pos = self._find_read_pos()
if pos is not None:
return pos
next_find_pos = self._read_buffer_size * 2
return self._find_read_pos()
finally:
self._pending_callbacks -= 1
def _handle_events(self, fd, events):
if self.closed():
gen_log.warning("Got events for closed stream %s", fd)
return
try:
...
if events & self.io_loop.READ:
self._handle_read()
state = self.io_loop.ERROR
if self.reading():
state |= self.io_loop.READ
if self.writing():
state |= self.io_loop.WRITE
if state == self.io_loop.ERROR and self._read_buffer_size == 0:
state |= self.io_loop.READ
if state != self._state:
assert self._state is not None,
"shouldn't happen: _handle_events without self._state"
self._state = state
self.io_loop.update_handler(self.fileno(), self._state)
except UnsatisfiableReadError as e:
gen_log.info("Unsatisfiable read, closing connection: %s" % e)
self.close(exc_info=True)
except Exception:
gen_log.error("Uncaught exception, closing connection.",
exc_info=True)
self.close(exc_info=True)
raise
def _handle_read(self):
try:
pos = self._read_to_buffer_loop()
except UnsatisfiableReadError:
raise
except Exception as e:
gen_log.warning("error on read: %s" % e)
self.close(exc_info=True)
return
if pos is not None:
self._read_from_buffer(pos)
return
else:
self._maybe_run_close_callback()
def _read_to_buffer(self):
while True:
try:
chunk = self.read_from_fd()
except (socket.error, IOError, OSError) as e:
if errno_from_exception(e) == errno.EINTR:
continue
if self._is_connreset(e):
self.close(exc_info=True)
return
self.close(exc_info=True)
raise
break
if chunk is None:
return 0
self._read_buffer = chunk
self._read_buffer_size = len(chunk)
if self._read_buffer_size > self.max_buffer_size:
gen_log.error("Reached maximum read buffer size")
self.close()
raise StreamBufferFullError("Reached maximum read buffer size")
return len(chunk)
好的,本期我们就先简单的介绍了一下BaseIOStream 这个类的核心逻辑,大家可以先自行看看是否能看懂这部分的逻辑,下期我们会给出这部分核心代码的批注,敬请期待,感谢大家的支持,谢谢!