02-Reactor线程模型-(单线程)

2023-10-21 14:30:14 浏览数 (5)

Reactor线程模型-反应器线程模型

  • 网络IO设计中的高性能模型
  • 事件驱动(IO的读/写/接受....)
  • ACCEPT与READ等功能不一的IO事件分离, 交由不同角色处理

Reactor的角色

  • Reactor : 反应器负责注册事件等待与分发(IO多路复用), 解决因传统IO等待而出现的性能等问题
  • Acceptor : 接收器负责首次接收accept事件的处理, 并注册新事件给Reactor, 给Reactor增加需要等待与分发的事件.
  • Handler : 处理器负责实际业务的处理, 承接Reactor分发的事件的下一发加工动作
Rector线程模型Rector线程模型
代码语言:javascript复制
// Reactor 反应器
// - 由Selector IO多路复用选择器提供事件注册与捕获
// - 被捕获事件进行统一处理分发给下游处理
public class DefaultReactor implements Reactor {
    
    private final static int PORT = 8080;
    
    private final Selector selector;
    private final Server server;

    public DefaultReactor() throws IOException {
        this(PORT);
    }
    
    public DefaultReactor(int port) throws IOException {
        selector = Selector.open();//IO多路复用
        server = new Server(selector, port);
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                // 获取发生的事件(阻塞)
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterable = selectionKeys.iterator();
                while (iterable.hasNext()) {
                    // 对事件进行分发
                    dispatch(iterable.next());
                    iterable.remove();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            LockSupport.parkNanos(1000 * 1000 * 1000);
        }
    }

    private void dispatch(SelectionKey selectionKey) {
        // 获取事件的附加器
        // ACCEPT 事件的附加器是 Acceptor, 故由 Acceptor 来处理 ACCEPT 事件
        // READ 事件的附加器是 Handler, 故由 Handler 来处理 READ 事件
        Object attachment = selectionKey.attachment();
        if (attachment instanceof Acceptor) {
            ((Acceptor) attachment).run();
            return;
        }
        if (attachment instanceof Handler) {
            ((Handler) attachment).run();
            return;
        }
    }
}
代码语言:javascript复制
// Acceptor接收器
// - 类似于ServerSocketChannel服务器
// - 专门处理ACCEPT首次访问的IO事件
// - 每次有ACCEPT访问时, 就创建新IO Channel(SocketChannel)注册到Reactor反应器的Selector中, 等待捕获
public class Server implements Acceptor{
    
    private final ServerSocketChannel serverSocketChannel;
    private final Selector selector;
    
    public Server(Selector selector, int port) throws IOException {
        this.selector = selector;
        // 服务端创建 listen-socket 管道
        this.serverSocketChannel = ServerSocketChannel.open();
        // 绑定端口
        this.serverSocketChannel.socket().bind(new InetSocketAddress(port));
        // 设置为非阻塞模式
        this.serverSocketChannel.configureBlocking(false);
        // ACCEPT 事件的附加器是 Acceptor
        this.serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, this);
    }

    @Override
    public void run() {
        try {
            // 处理ACCEPT事件
            // 为连接的客户端创建 client-socket 管道
            SocketChannel clientSocketChannel = serverSocketChannel.accept();
            // 设置为非阻塞
            clientSocketChannel.configureBlocking(false);
            // READ 事件的附加器是 Handler
            clientSocketChannel.register(selector, SelectionKey.OP_READ,
                    new DefaultHandler(clientSocketChannel));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
代码语言:javascript复制
// Handler 处理器
// - 由SocketChannel实现
// - 处理客户端发送过来的真正业务内容
public class DefaultHandler implements Handler {
    private final SocketChannel clientSocketChannel;

    public DefaultHandler(SocketChannel clientSocketChannel) {
        this.clientSocketChannel = clientSocketChannel;
    }


    @Override
    public void run() {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        try {
            // 读取数据
            int read = clientSocketChannel.read(byteBuffer);
            if (read <= 0) {
                clientSocketChannel.close();
            } else {
                System.out.println("----"   new String(byteBuffer.array())   "----");
                // 响应结果 200, 模拟请求响应
                String response = "HTTP/1.1 200 OKrn"  
                        "Content-Length: 11rnrn"  
                        "Yes, He is";
                ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());// 数据存放在byte数组
                while (buffer.hasRemaining()) {
                    // hasRemaining() 返回是否有剩余的可用长度
                    clientSocketChannel.write(buffer); // 非阻塞
                }
            }
        } catch (IOException e1) {
            try {
                clientSocketChannel.close();
            } catch (IOException e2) {
                e2.printStackTrace();
            }
            e1.printStackTrace();
        }
    }
}

GITHUB源码

https://github.com/spbreak/i-netty/tree/master/02-reactor

0 人点赞