Netty入门之消息边界处理以及ByteBuffer大小分配

2023-06-28 14:26:07 浏览数 (2)

以上三篇内容主要讲了NIO的三大组件ByteBuffer文件编程阻塞非阻塞Selector等,需要了解像详情的请移步查看。

本章主要讲解如何处理在消息传递过程中的边界问题。

处理消息边界(如图)

如图所示:在实际项目中,消息有可能要比ByteBuffer长,或者比ByteBuffer短; 针对以上的几种情况,应该如何去处理呢?有两种方案:

  1. 固定消息长度,数据包大小一样,服务器按照预定长度读取,缺点是浪费带宽。
  2. 按分隔符拆分,但是效率低。
  3. TLV格式,即Type类型、Length长度、Value数据,类型和长度已知的情况下,就可以方便获取消息的大小,从而分配合适的buffer,缺点是buffer需要提前分配,如果内容过大,则会影响server的吞吐量
  • Http1.1 是TLV格式
  • Http2.0 是LTV格式

上代码(⚠️一定要注意代码中的注释):

  • server
代码语言:javascript复制
@Slf4j
public class Server {

    private static void split(ByteBuffer source) {
        source.flip();
        for (int i = 0; i < source.limit(); i  ) {
            // 找到一条完整消息
            if (source.get(i) == 'n') {
                int length = i   1 - source.position();
                // 把这条完整消息存入新的 ByteBuffer
                ByteBuffer target = ByteBuffer.allocate(length);
                // 从 source 读,向 target 写
                for (int j = 0; j < length; j  ) {
                    target.put(source.get());
                }
                debugAll(target);
            }
        }
        source.compact();
    }


    public static void main(String[] args) throws IOException {
        // 1. 创建 selector, 管理多个 channel
        Selector selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        // 2. 建立 selector 和 channel 的联系(注册)
        // SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件
        SelectionKey sscKey = ssc.register(selector, 0, null);
        // key 只关注 accept 事件
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        log.debug("sscKey:{}", sscKey);
        ssc.bind(new InetSocketAddress(8080));
        while (true) {
            // 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
            // select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理
            selector.select();
            // 4. 处理事件, selectedKeys 内部包含了所有发生的事件
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                // 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
                iter.remove();
                log.debug("key: {}", key);
                // 5. 区分事件类型
                if (key.isAcceptable()) { // 如果是 accept
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);
                    ByteBuffer buffer = ByteBuffer.allocate(16) // 利用 附件的方式将buffer注册关联到selectionKey上
                    SelectionKey scKey = sc.register(selector, 0, buffer);
                    scKey.interestOps(SelectionKey.OP_READ);
                    log.debug("{}", sc);
                    log.debug("scKey:{}", scKey);
                } else if (key.isReadable()) { // 如果是 read
                    try {
                        SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
                        // 取上次注册的buffer
                        ByteBuffer buffer = (ByteBuffer)key.attachment();
                        int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1
                        if(read == -1) {
                            key.cancel();
                        } else {
                            split(buffer)
                            // 这里说明原有的buffer满了
                            if (buffer.position() == buffer.limit()) {
                                ByteBuffer newByteBuffer = ByteBuffer.allocate(buffer.capacity() * 2 )
                                buffer.flip();
                                newByteBuffer.put(buffer);
                                // 替换原有的附件buffer
                                key.attach(newByteBuffer); 
                            }
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        key.cancel();  // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
                    }
                }
            }
        }
    }
}
  • client
代码语言:javascript复制
public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
        SocketAddress address = sc.getLocalAddress();
        sc.write(Chaset.defaultCharset().encode("012345677890abcdef"))
        System.in.read();
    }
}

demo 解析: 上述server端代码用到了附件这个概念

代码语言:javascript复制
ByteBuffer buffer = ByteBuffer.allocate(16) // 利用 附>件的方式将buffer注册关联到selectionKey上
代码语言:javascript复制
SelectionKey scKey = sc.register(selector, 0, buffer);

重新关联附件buffer key.attach(newByteBuffer);

说明:

上述代码就是简单做了一个消息边界的处理,相信大家也看到了一些问题,它只能是做到自动扩容,无法自适应,也就是缩小。暂时先提前告诉大家Netty是可以做到自适应的。

如何处理消息边界问题以及ByteBuffer大小分配的问题已经说完了,接下来给大家说一下ByteBuffer的大小如何分配的注意点。

每个Channel都需要记录可能被切分的消息,因为ByteBuffer不能够被多个Channel共同使用,因此需要为每个channel维护一个独立的ByteBUffer

  • ByteBuffer不能太大,比如一个ByteBuffer1Mb的话,需要支持百万连接就要1Tb内存,因此需要设计大小可变的ByteBUffer
    • 思路一:首先分配一个较小的buffer,例如4k,如果发现数据不够,再分配8k的buffer,将4kbuffer内容拷贝至8k的buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能。
    • 思路二:用多个数组组成buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

好了本次的文章就到这里了后续再为大家带来关于Netty的更多内容。切记:一定要好好消化上述的demo案例。

0 人点赞