Reactor线程模型-反应器线程模型
- 网络IO设计中的高性能模型
- 事件驱动(IO的读/写/接受....)
- ACCEPT与READ等功能不一的IO事件分离, 交由不同角色处理
Reactor的角色
- Reactor : 反应器负责注册事件等待与分发(IO多路复用), 解决因传统IO等待而出现的性能等问题
- Acceptor : 接收器负责首次接收accept事件的处理, 并注册新事件给Reactor, 给Reactor增加需要等待与分发的事件.
- Handler : 处理器负责实际业务的处理, 承接Reactor分发的事件的下一发加工动作
// Reactor 反应器
// - 由Selector IO多路复用选择器提供事件注册与捕获
// - 被捕获事件进行统一处理分发给下游处理
public class DefaultReactor implements Reactor {
private final static int PORT = 8080;
private final Selector selector;
private final Server server;
public DefaultReactor() throws IOException {
this(PORT);
}
public DefaultReactor(int port) throws IOException {
selector = Selector.open();//IO多路复用
server = new Server(selector, port);
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
// 获取发生的事件(阻塞)
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterable = selectionKeys.iterator();
while (iterable.hasNext()) {
// 对事件进行分发
dispatch(iterable.next());
iterable.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
LockSupport.parkNanos(1000 * 1000 * 1000);
}
}
private void dispatch(SelectionKey selectionKey) {
// 获取事件的附加器
// ACCEPT 事件的附加器是 Acceptor, 故由 Acceptor 来处理 ACCEPT 事件
// READ 事件的附加器是 Handler, 故由 Handler 来处理 READ 事件
Object attachment = selectionKey.attachment();
if (attachment instanceof Acceptor) {
((Acceptor) attachment).run();
return;
}
if (attachment instanceof Handler) {
((Handler) attachment).run();
return;
}
}
}
代码语言:javascript复制// Acceptor接收器
// - 类似于ServerSocketChannel服务器
// - 专门处理ACCEPT首次访问的IO事件
// - 每次有ACCEPT访问时, 就创建新IO Channel(SocketChannel)注册到Reactor反应器的Selector中, 等待捕获
public class Server implements Acceptor{
private final ServerSocketChannel serverSocketChannel;
private final Selector selector;
public Server(Selector selector, int port) throws IOException {
this.selector = selector;
// 服务端创建 listen-socket 管道
this.serverSocketChannel = ServerSocketChannel.open();
// 绑定端口
this.serverSocketChannel.socket().bind(new InetSocketAddress(port));
// 设置为非阻塞模式
this.serverSocketChannel.configureBlocking(false);
// ACCEPT 事件的附加器是 Acceptor
this.serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, this);
}
@Override
public void run() {
try {
// 处理ACCEPT事件
// 为连接的客户端创建 client-socket 管道
SocketChannel clientSocketChannel = serverSocketChannel.accept();
// 设置为非阻塞
clientSocketChannel.configureBlocking(false);
// READ 事件的附加器是 Handler
clientSocketChannel.register(selector, SelectionKey.OP_READ,
new DefaultHandler(clientSocketChannel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
代码语言:javascript复制// Handler 处理器
// - 由SocketChannel实现
// - 处理客户端发送过来的真正业务内容
public class DefaultHandler implements Handler {
private final SocketChannel clientSocketChannel;
public DefaultHandler(SocketChannel clientSocketChannel) {
this.clientSocketChannel = clientSocketChannel;
}
@Override
public void run() {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
try {
// 读取数据
int read = clientSocketChannel.read(byteBuffer);
if (read <= 0) {
clientSocketChannel.close();
} else {
System.out.println("----" new String(byteBuffer.array()) "----");
// 响应结果 200, 模拟请求响应
String response = "HTTP/1.1 200 OKrn"
"Content-Length: 11rnrn"
"Yes, He is";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());// 数据存放在byte数组
while (buffer.hasRemaining()) {
// hasRemaining() 返回是否有剩余的可用长度
clientSocketChannel.write(buffer); // 非阻塞
}
}
} catch (IOException e1) {
try {
clientSocketChannel.close();
} catch (IOException e2) {
e2.printStackTrace();
}
e1.printStackTrace();
}
}
}
GITHUB源码
https://github.com/spbreak/i-netty/tree/master/02-reactor