nio学习之Selector选择器

2021-12-13 10:00:07 浏览数 (1)

nio学习之Selector选择器

  • Selector选择器
    • 三个相关的类
    • 如何创建选择器
    • SelectionKey选择键相关的方法
    • 选择器的使用
    • 服务器端模板代码
      • selector.select()方法阻塞问题
    • 案例
      • 服务器端代码
    • 客户端代码
    • 补充说明
      • select()方法的三个重载:
      • 停止选择的方法
      • NIO 编程步骤总结
    • socketChannel read返回值

Selector选择器

选择器提供一种选择执行已经就绪的任务的能力

selector选择器可以让单线程处理多个通道

如果程序打开了多个连接通道,每个连接的流量都比较低,可以使用Selector对通道进行管理

三个相关的类

Selector选择器类管理着被注册的通道的集合的信息和他们的就绪状态。

SelectableChannel可选择通道类,它是抽象类,是所有支持就绪检查的通道类的父类.

注意FileChannel不是SelectablChannel的子类,即FileChannel不是可选择的,可选择通道可以注册到多个选择器上,但是同一个选择器只能注册一次。

SelectionKey选择键类,封装了特定的通道与选择器之间的一种注册关系,选择键包含两个比特集,一个指示该注册关系所关心的通道操作,一个表示通道已经准备好的 操作

如何创建选择器

1.创建Selector

代码语言:javascript复制
Selector selector = Selector.open();

2.必须将通道设置为非阻塞模式才能注册到选择器上

代码语言:javascript复制
ssc.configureBlocking(false);

3.把通道注册到选择器上,会返回一个选择键

代码语言:javascript复制
   SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);

SelectionKey的操作有:

  • SelectionKey.OP_CONNECT,指某个通道连接到服务器
  • SelectionKey.OP_ACCEPT,只有ServerSocketChannel有这个事件,查看是否有新的连接
  • SelectionKey.OP_READ,是否有可读的通道就绪
  • ​ SelectionKey.OP_WRITE,写数据的通道是否就绪

注册完成后,可以调用select()方法轮询是否有就绪的通道

代码语言:javascript复制
int count = selector.select();

select()方法,返回就绪的通道数量

SelectionKey选择键相关的方法

向Selector注册一个Channel通道时,就会返回一个SelectionKey选择键对象,这个选择键表示一个通道与一个选择器之间的注册关系

SelectionKey相关方法:

channel()方法,返回与该键对应的通道

selector()方法,返回通道注册的选择器

cancel()方法,取消这种特定的注册关系

isValid()方法,判断注册关系是否有效

interestOps()方法,返回你关心的操作,是以整数的形式进行编码的比特掩码,可以使用位运算检查所关心的操作,如:

代码语言:javascript复制
Boolean isAccept=interestops & SelectionKey.OP_ACCEPT == SelectionKey.OP_ACCEPT

readOps()方法返回通道已经就绪的操作,返回值也是一个整数,也开业使用上面相同的位操作检测通道中有那个事件或操作已经就绪,入:

代码语言:javascript复制
selectionKey.readyOps() & SelectionKey.OP_WRITE !=0 说明write操作已经就绪

除了按位与操作外,还可以使用isReadable(),isWriteable(),isConnectable(),isAcceptable()等方法检测,这些比特值,上面一行检测write就绪的操作可以使用下面这行代替:

代码语言:javascript复制
if(selectionKey.isWritable())

选择器的使用

Selector选择器维护着注册过的通道集合,并且这些注册关系都封装在了SelectionKey对象中

每个Selector对象都需要维护以下三个集合:

1.已注册的键的集合,keys()方法返回这个已注册的键的集合,这个集合不能修改

2.已选择的键的集合,selectedKeys()方法返回,该集合中的每个成员都是相关的通道被选择器判断已经准备好的,并且包含了键的interest集合中的操作,键可以从集合中移除,不能添加.

3.已取消的键的集合,这个集合包含了调用过cancel()方法的键

刚刚开始初始化Selector对象,这三个集合都是空的

Selector类的核心技就是select()选择,该方法调用时,执行以下步骤:

(1) 检查已取消的键的集合,如果该集合非空,就把该集合中的键从另外两个集合中移除,注销相关的通道,这个步骤结束后,已取消的集合应该是空的

(2)检查已经注册键的结合中所有键的interest集合,确定每个通道所关心的操作是否已经就绪

(3)select()方法返回的是从上一次调用select()方法后进入就绪状态的通道数量

通常使用以下方法来管理这些键:

1.在选择器上调用select方法

2.遍历selectedkeys方法,返回键的集合

​ 检查每个键,查看相关通道的就绪信息,并进行处理

​ 将键从已选择的集合中移除

​ 继续检查下个键

服务器端模板代码

代码语言:javascript复制
//服务器端模板代码
    public static void Server_Standard_Code_template()
    {
        try {
            ServerSocketChannel ssc=ServerSocketChannel.open();
            ssc.socket().bind(new InetSocketAddress("localhost",80));
            //只有设置为非阻塞才能注册到选择器中
            ssc.configureBlocking(false);
            //创建一个选择器
            Selector selector = Selector.open();
            //通道注册进选择器中---监听客户端连接事件
            ssc.register(selector, SelectionKey.OP_ACCEPT);

            while(true){
                //获取以及就绪的通道数量
                int select = selector.select();
                //没有通道就绪
                if(select==0)
                {
                    continue;
                }
             //获取已经就绪的
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
               while (iterator.hasNext())
               {
                   SelectionKey selectionKey = iterator.next();
                   if(selectionKey.isAcceptable())
                   {
                       //接收连接
                   }else if(selectionKey.isReadable())
                   {
                       //读取数据
                   }
                   else if(selectionKey.isWritable())
                   {
                       //写数据
                   }
                   //移除
                   iterator.remove();
               }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

selector.select()方法阻塞问题

无阻塞io是使用单线程或者只使用少量的多线程,每个连接共用一个线程,当处于等待(没有事件)的时候线程资源可以释放出来处理别的请求,通过事件驱动模型当有accept/read/write等事件发生后通知(唤醒)主线程分配资源来处理相关事件。

java.nio.channels.Selector就是在该模型中事件的观察者,可以将多个SocketChannel的事件注册到一个Selector上,当没有事件发生时Selector处于阻塞状态,当SocketChannel有accept/read/write等事件发生时唤醒Selector。

这个Selector是使用了单线程模型,主要用来描述事件驱动模型,要优化性能需要一个好的线程模型来使用,目前比较好的nio框架有Netty,apache的mina等。

阻塞后唤醒可以通过注册在selector上的socket有事件发生 或者 selector.select(timeOut)超时 或者 selector.wakeup()主动唤醒;

nio Selector 阻塞 唤醒 原理

案例

服务器端代码

代码语言:javascript复制
public class DHYSelector
{
    //服务器端口号
    public  static final  int PORT=80;
    private static ByteBuffer buffer=ByteBuffer.allocate(1024);
    private static ServerSocketChannel ssc;

    public static void main(String[] args) throws IOException, InterruptedException {
        ssc=ServerSocketChannel.open();
        //获取ServerSocketChannel管理的serverSocket
        ServerSocket serverSocket = ssc.socket();
        //绑定到指定的端口号
        serverSocket.bind(new InetSocketAddress(PORT));
        //设置为非阻塞
        ssc.configureBlocking(false);

        //创建选择器
        Selector selector = Selector.open();
        //将ServerSocketChannel注册到选择器上
        ssc.register(selector,SelectionKey.OP_ACCEPT);

        while (true)
        {
            //设置超时时间为1秒,1秒后还没有客户端连接,就唤醒当前等待的线程
            //selector.wakeup()也可以唤醒当前等待的selector线程
            int select = selector.select(1000);
            if(select==0)
            {
                Thread.sleep(2000);
                continue;
            }
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            selectionKeys.stream().forEach(selectionKey->{
                    try {
                        if(selectionKey.isAcceptable())
                        {
                        //有新的连接,获得serversocketchannel
                        //只有serversocketchannel支持accept事件
                        ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
                        //调用accept方法,返回到达服务器的新客户端连接
                        SocketChannel socketChannel = channel.accept();
                        //当前客户端连接也注册进选择器中
                            socketChannel.configureBlocking(false);
                            socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
                       }
                        else if(selectionKey.isReadable())
                         {
                           //通道有数据可读,读出数据
                             read_data(selectionKey);
                         }
                        else if(selectionKey.isWritable())
                        {
                        //写入数据

                        }
                         //从集合中移除当前的key
                        selectionKeys.remove(selectionKey);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
            });
        }
    }

    //读数据
    public static void read_data(SelectionKey selectionKey) throws IOException {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        //定义集合接收读取到的字节
        List<Byte> list=new LinkedList<>();

        while(socketChannel.read(buffer)>0)
        {
         //切换读取模式
         buffer.flip();
         while(buffer.hasRemaining())
         {
             list.add(buffer.get());
         }
         buffer.clear();
        }
        
        System.out.println("内容如下:   ");

        byte[] array = buffer.array();

        System.out.println(new String(array));

        socketChannel.close();
    }


}

客户端代码

代码语言:javascript复制
public class DHYClient
{
    //客户端
    public static void main(String[] args) throws IOException, InterruptedException {
        SocketChannel socketChannel=SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(80));
        socketChannel.configureBlocking(false);

        //等待连接
        while(!socketChannel.finishConnect())
        {
            System.out.println("等待连接....");
            Thread.sleep(2000);
        }

        Scanner scanner=new Scanner(System.in);
        while(true)
        {
            System.out.print("发送的内容:  ");
            String nextLine = scanner.nextLine();
            if(nextLine.equals("quit"))
            {
                break;
            }
                ByteBuffer buffer=ByteBuffer.allocate(nextLine.getBytes().length);
                buffer.put(nextLine.getBytes());
                buffer.flip();
                while(buffer.hasRemaining())
                {
                    socketChannel.write(buffer);
                }
                buffer.clear();
        }
        socketChannel.close();
    }
}

补充说明

  • 与 Selector 一起使用时,Channel 必须处于非阻塞模式下,否则将抛出异常IllegalBlockingModeException。这意味着,FileChannel 不能与 Selector 一起使用,因为 FileChannel 不能切换到非阻塞模式,而套接字相关的所有的通道都可以。
  • 一个通道,并没有一定要支持所有的四种操作。比如服务器通道ServerSocketChannel 支持 Accept 接受操作,而 SocketChannel 客户端通道则不支持。可以通过通道上的 validOps()方法,来获取特定通道下所有支持的操作集合。

select()方法的三个重载:

  • 通过 Selector 的 select()方法,可以查询出已经就绪的通道操作,这些就绪的状态集合,存在一个元素是 SelectionKey 对象的 Set 集合中

下面是 Selector 几个重载的查询 select()方法:

  • select():阻塞到至少有一个通道在你注册的事件上就绪了。
  • select(long timeout):和 select()一样,但最长阻塞事件为 timeout 毫秒。
  • selectNow():非阻塞,只要有通道就绪就立刻返回

select()方法返回的 int 值,表示有多少通道已经就绪,更准确的说,是自前一次 select方法以来到这一次 select 方法之间的时间段上,有多少通道变成就绪状态

例如:首次调用 select()方法,如果有一个通道变成就绪状态,返回了 1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回 1。如果对第一个就绪的channel 没有做任何操作,现在就有两个就绪的通道,但在每次 select()方法调用之间,只有一个通道就绪了

一旦调用 select()方法,并且返回值不为 0 时,在 Selector 中有一个 selectedKeys()方法,用来访问已选择键集合,迭代集合的每一个选择键元素,根据就绪操作的类型,完成对应的操作:

代码语言:javascript复制
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
 SelectionKey key = keyIterator.next();
 if(key.isAcceptable()) {
 // a connection was accepted by a ServerSocketChannel.
 } else if (key.isConnectable()) {
 // a connection was established with a remote server.
 } else if (key.isReadable()) {
 // a channel is ready for reading
 } else if (key.isWritable()) {
 // a channel is ready for writing
 }
 keyIterator.remove();
}

停止选择的方法

选择器执行选择的过程,系统底层会依次询问每个通道是否已经就绪,这个过程可能会造成调用线程进入阻塞状态,那么我们有以下三种方式可以唤醒在 select()方法中阻塞的线程。

wakeup()方法 :通过调用 Selector 对象的 wakeup()方法让处在阻塞状态的select()方法立刻返回

该方法使得选择器上的第一个还没有返回的选择操作立即返回。如果当前没有进行中的选择操作,那么下一次对 select()方法的一次调用将立即返回。

close()方法 :通过 close()方法关闭 Selector,该方法使得任何一个在选择操作中阻塞的线程都被唤醒(类似 wakeup()),同时 使得注册到该 Selector 的所有 Channel 被注销,所有的键将被取消,但是 Channel本身并不会关闭

NIO 编程步骤总结

第一步:创建 Selector 选择器

第二步:创建 ServerSocketChannel 通道,并绑定监听端口

第三步:设置 Channel 通道是非阻塞模式

第四步:把 Channel 注册到 Socketor 选择器上,监听连接事件

第五步:调用 Selector 的 select 方法(循环调用),监测通道的就绪状况

第六步:调用 selectKeys 方法获取就绪 channel 集合

第七步:遍历就绪 channel 集合,判断就绪事件类型,实现具体的业务操作

第八步:根据业务,决定是否需要再次注册监听事件,重复执行第三步操作

socketChannel read返回值

java nio socketChannel read返回值代表的意思

0 人点赞