Netty 粘包和拆包问题及解决方案

2024-05-22 08:57:40 浏览数 (2)

持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第10天,点击查看活动详情

1. 粘包和拆包

产生粘包和拆包问题的主要原因是,操作系统在发送 TCP 数据的时候,底层会有一个缓冲区,例如 1024 个字节大小,如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP 则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题;如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP 就会将其拆分为多次发送,这就是拆包,也就是将一个大的包拆分为多个小包进行发送。

上图中演示了粘包和拆包的三种情况:

  • A 和 B 两个包都刚好满足 TCP 缓冲区的大小,或者说其等待时间已经达到 TCP 等待时长,从而还是使用两个独立的包进行发送;
  • A 和 B 两次请求间隔时间内较短,并且数据包较小,因而合并为同一个包发送给服务端;
  • B 包比较大,因而将其拆分为两个包 B_1 和 B_2 进行发送,而这里由于拆分后的 B_2 比较小,其又与 A 包合并在一起发送。

2. 常见解决方案

对于粘包和拆包问题,常见的解决方案有四种:

  • 客户端在发送数据包的时候,每个包都固定长度,比如 1024 个字节大小,如果客户端发送的数据长度不足 1024 个字节,则通过补充空格的方式补全到指定长度;
  • 客户端在每个包的末尾使用固定的分隔符,例如 rn,如果一个包被拆分了,则等待下一个包发送过来之后找到其中的 rn,然后对其拆分后的头部部分与前一个包的剩余部分进行合并,这样就得到了一个完整的包;
  • 将消息分为头部和消息体,在头部中保存有当前整个消息的长度,只有在读取到足够长度的消息之后才算是读到了一个完整的消息;
  • 通过自定义协议进行粘包和拆包的处理。

3. Netty 提供的粘包拆包解决方案

3.1 FixedLengthFrameDecoder

对于使用固定长度的粘包和拆包场景,可以使用FixedLengthFrameDecoder,该解码器会每次读取固定长度的消息,如果当前读取到的消息不足指定长度,那么就会等待下一个消息到达后进行补足。其使用也比较简单,只需要在构造函数中指定每个消息的长度即可。

这里需要注意的是,FixedLengthFrameDecoder只是一个解码器,Netty 也只提供了一个解码器,这是因为对于解码是需要等待下一个包的进行补全的,代码相对复杂,而对于编码器,用户可以自行编写,因为编码时只需要将不足指定长度的部分进行补全即可。下面的示例中展示了如何使用FixedLengthFrameDecoder来进行粘包和拆包处理:

代码语言:javascript复制
public class EchoServer {
​
  public void bind(int port) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      ServerBootstrap bootstrap = new ServerBootstrap();
      bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            
            ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
            
            ch.pipeline().addLast(new StringDecoder());
            
            ch.pipeline().addLast(new FixedLengthFrameEncoder(20));
            
            ch.pipeline().addLast(new EchoServerHandler());
          }
        });
​
      ChannelFuture future = bootstrap.bind(port).sync();
      future.channel().closeFuture().sync();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }
​
  public static void main(String[] args) throws InterruptedException {
    new EchoServer().bind(8080);
  }
}

上面的 pipeline 中,对于入栈数据,这里主要添加了FixedLengthFrameDecoderStringDecoder,前面一个用于处理固定长度的消息的粘包和拆包问题,第二个则是将处理之后的消息转换为字符串。最后由EchoServerHandler处理最终得到的数据,处理完成后,将处理得到的数据交由FixedLengthFrameEncoder处理,该编码器是我们自定义的实现,主要作用是将长度不足 20 的消息进行空格补全。下面是FixedLengthFrameEncoder的实现代码:

代码语言:javascript复制
public class FixedLengthFrameEncoder extends MessageToByteEncoder<String> {
  private int length;
​
  public FixedLengthFrameEncoder(int length) {
    this.length = length;
  }
​
  @Override
  protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out)
      throws Exception {
    
    if (msg.length() > length) {
      throw new UnsupportedOperationException(
          "message length is too large, it's limited "   length);
    }
    
    if (msg.length() < length) {
      msg = addSpace(msg);
    }
​
    ctx.writeAndFlush(Unpooled.wrappedBuffer(msg.getBytes()));
  }
​
  
  private String addSpace(String msg) {
    StringBuilder builder = new StringBuilder(msg);
    for (int i = 0; i < length - msg.length(); i  ) {
      builder.append(" ");
    }
​
    return builder.toString();
  }
}

这里FixedLengthFrameEncoder实现了 decode() 方法,在该方法中,主要是将消息长度不足 20 的消息进行空格补全。EchoServerHandler的作用主要是打印接收到的消息,然后发送响应给客户端:

代码语言:javascript复制
public class EchoServerHandler extends SimpleChannelInboundHandler<String> {
​
  @Override
  protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    System.out.println("server receives message: "   msg.trim());
    ctx.writeAndFlush("hello client!");
  }
}

对于客户端,其实现方式基本与服务端的使用方式类似,只是在最后进行消息发送的时候与服务端的处理方式不同。如下是客户端EchoClient的代码:

代码语言:javascript复制
public class EchoClient {
​
  public void connect(String host, int port) throws InterruptedException {
    EventLoopGroup group = new NioEventLoopGroup();
    try {
      Bootstrap bootstrap = new Bootstrap();
      bootstrap.group(group)
        .channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .handler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            
            ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
            
            ch.pipeline().addLast(new StringDecoder());
            
            ch.pipeline().addLast(new FixedLengthFrameEncoder(20));
            
            ch.pipeline().addLast(new EchoClientHandler());
          }
        });
​
      ChannelFuture future = bootstrap.connect(host, port).sync();
      future.channel().closeFuture().sync();
    } finally {
      group.shutdownGracefully();
    }
  }
​
  public static void main(String[] args) throws InterruptedException {
    new EchoClient().connect("127.0.0.1", 8080);
  }
}

对于客户端而言,其消息的处理流程其实与服务端是相似的,对于入站消息,需要对其进行粘包和拆包处理,然后将其转码为字符串,对于出站消息,则需要将长度不足 20 的消息进行空格补全。客户端与服务端处理的主要区别在于最后的消息处理 handler 不一样,也即这里的EchoClientHandler,如下是该 handler 的源码:

代码语言:javascript复制
public class EchoClientHandler extends SimpleChannelInboundHandler<String> {
​
  @Override
  protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    System.out.println("client receives message: "   msg.trim());
  }
​
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.writeAndFlush("hello server!");
  }
}

这里客户端的处理主要是重写了channelActive()channelRead0()两个方法,这两个方法的主要作用在于,channelActive()会在客户端连接上服务器时执行,也就是说,其连上服务器之后就会往服务器发送消息。而channelRead0()主要是在服务器发送响应给客户端时执行,这里主要是打印服务器的响应消息。对于服务端而言,前面我们我们可以看到,EchoServerHandler只重写了channelRead0()方法,这是因为服务器只需要等待客户端发送消息过来,然后在该方法中进行处理,处理完成后直接将响应发送给客户端。如下是分别启动服务端和客户端之后控制台打印的数据:

代码语言:javascript复制
// server
server receives message: hello server!
// client
client receives message: hello client!

3.2 LineBasedFrameDecoder 与 DelimiterBasedFrameDecoder

对于通过分隔符进行粘包和拆包问题的处理,Netty 提供了两个编解码的类,LineBasedFrameDecoderDelimiterBasedFrameDecoder。这里LineBasedFrameDecoder的作用主要是通过换行符,即n或者rn对数据进行处理;而DelimiterBasedFrameDecoder的作用则是通过用户指定的分隔符对数据进行粘包和拆包处理。同样的,这两个类都是解码器类,而对于数据的编码,也即在每个数据包最后添加换行符或者指定分割符的部分需要用户自行进行处理。这里以DelimiterBasedFrameDecoder为例进行讲解,如下是EchoServer中使用该类的代码片段,其余部分与前面的例子中的完全一致:

代码语言:javascript复制
@Override
protected void initChannel(SocketChannel ch) throws Exception {
    String delimiter = "_$";
​
    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,
        Unpooled.wrappedBuffer(delimiter.getBytes())));
    
    ch.pipeline().addLast(new StringDecoder());
    
    ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter));
    
    ch.pipeline().addLast(new EchoServerHandler());
}

上面 pipeline 的设置中,添加的解码器主要有DelimiterBasedFrameDecoderStringDecoder,经过这两个处理器处理之后,接收到的字节流就会被分隔,并且转换为字符串数据,最终交由EchoServerHandler处理。这里DelimiterBasedFrameEncoder是我们自定义的编码器,其主要作用是在返回的响应数据之后添加分隔符。如下是该编码器的源码:

代码语言:javascript复制
public class DelimiterBasedFrameEncoder extends MessageToByteEncoder<String> {
​
  private String delimiter;
​
  public DelimiterBasedFrameEncoder(String delimiter) {
    this.delimiter = delimiter;
  }
​
  @Override
  protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) 
      throws Exception {
    
    ctx.writeAndFlush(Unpooled.wrappedBuffer((msg   delimiter).getBytes()));
  }
}

对于客户端而言,这里的处理方式与服务端类似,其 pipeline 的添加方式如下:

代码语言:javascript复制
@Override
protected void initChannel(SocketChannel ch) throws Exception {
    String delimiter = "_$";
    
    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, 
        Unpooled.wrappedBuffer(delimiter.getBytes())));
    
    ch.pipeline().addLast(new StringDecoder());
    
    ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter));
    
    ch.pipeline().addLast(new EchoClientHandler());
}

这里客户端的处理方式与服务端基本一致,其与示例一中的代码完全一致,这里则不予展示。

3.3 LengthFieldBasedFrameDecoder 与 LengthFieldPrepender

这里LengthFieldBasedFrameDecoderLengthFieldPrepender需要配合起来使用,其实本质上来讲,这两者一个是解码,一个是编码的关系。它们处理粘拆包的主要思想是在生成的数据包中添加一个长度字段,用于记录当前数据包的长度。LengthFieldBasedFrameDecoder会按照参数指定的包长度偏移量数据对接收到的数据进行解码,从而得到目标消息体数据;而LengthFieldPrepender则会在响应的数据前面添加指定的字节数据,这个字节数据中保存了当前消息体的整体字节数据长度。LengthFieldBasedFrameDecoder的解码过程如下图所示:

LengthFieldPrepender的编码过程如下图所示:

关于LengthFieldBasedFrameDecoder,这里需要对其构造函数参数进行介绍:

  • maxFrameLength:指定了每个包所能传递的最大数据包大小;
  • lengthFieldOffset:指定了长度字段在字节码中的偏移量;
  • lengthFieldLength:指定了长度字段所占用的字节长度;
  • lengthAdjustment:对一些不仅包含有消息头和消息体的数据进行消息头的长度的调整,这样就可以只得到消息体的数据,这里的 lengthAdjustment 指定的就是消息头的长度;
  • initialBytesToStrip:对于长度字段在消息头中间的情况,可以通过 initialBytesToStrip 忽略掉消息头以及长度字段占用的字节。

这里我们以 json 序列化为例对LengthFieldBasedFrameDecoderLengthFieldPrepender的使用方式进行讲解。如下是EchoServer的源码:

代码语言:javascript复制
public class EchoServer {
​
  public void bind(int port) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      ServerBootstrap bootstrap = new ServerBootstrap();
      bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            
            
            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
            
            ch.pipeline().addLast(new LengthFieldPrepender(2));
            
            ch.pipeline().addLast(new JsonDecoder());
            
            ch.pipeline().addLast(new JsonEncoder());
            
            ch.pipeline().addLast(new EchoServerHandler());
          }
        });
​
      ChannelFuture future = bootstrap.bind(port).sync();
      future.channel().closeFuture().sync();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }
​
  public static void main(String[] args) throws InterruptedException {
    new EchoServer().bind(8080);
  }
}

这里EchoServer主要是在 pipeline 中添加了两个编码器和两个解码器,编码器主要是负责将响应的 User 对象序列化为 json 对象,然后在其字节数组前面添加一个长度字段的字节数组;解码器主要是对接收到的数据进行长度字段的解码,然后将其反序列化为一个 User 对象。下面是JsonDecoder的源码:

代码语言:javascript复制
public class JsonDecoder extends MessageToMessageDecoder<ByteBuf> {
​
  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) 
      throws Exception {
    byte[] bytes = new byte[buf.readableBytes()];
    buf.readBytes(bytes);
    User user = JSON.parseObject(new String(bytes, CharsetUtil.UTF_8), User.class);
    out.add(user);
  }
}

JsonDecoder首先从接收到的数据流中读取字节数组,然后将其反序列化为一个 User 对象。下面我们看看JsonEncoder的源码:

代码语言:javascript复制
public class JsonEncoder extends MessageToByteEncoder<User> {
​
  @Override
  protected void encode(ChannelHandlerContext ctx, User user, ByteBuf buf)
      throws Exception {
    String json = JSON.toJSONString(user);
    ctx.writeAndFlush(Unpooled.wrappedBuffer(json.getBytes()));
  }
}

JsonEncoder将响应得到的 User 对象转换为一个 json 对象,然后写入响应中。对于EchoServerHandler,其主要作用就是接收客户端数据,并且进行响应,如下是其源码:

代码语言:javascript复制
public class EchoServerHandler extends SimpleChannelInboundHandler<User> {
​
  @Override
  protected void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {
    System.out.println("receive from client: "   user);
    ctx.write(user);
  }
}

对于客户端,其主要逻辑与服务端的基本类似,这里主要展示其 pipeline 的添加方式,以及最后发送请求,并且对服务器响应进行处理的过程:

代码语言:javascript复制
@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
    ch.pipeline().addLast(new LengthFieldPrepender(2));
    ch.pipeline().addLast(new JsonDecoder());
    ch.pipeline().addLast(new JsonEncoder());
    ch.pipeline().addLast(new EchoClientHandler());
}
public class EchoClientHandler extends SimpleChannelInboundHandler<User> {
​
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.write(getUser());
  }
​
  private User getUser() {
    User user = new User();
    user.setAge(27);
    user.setName("zhangxufeng");
    return user;
  }
​
  @Override
  protected void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {
    System.out.println("receive message from server: "   user);
  }
}

这里客户端首先会在连接上服务器时,往服务器发送一个 User 对象数据,然后在接收到服务器响应之后,会打印服务器响应的数据。

3.4 自定义粘包与拆包器

对于粘包与拆包问题,其实前面三种基本上已经能够满足大多数情形了,但是对于一些更加复杂的协议,可能有一些定制化的需求。对于这些场景,其实本质上,我们也不需要手动从头开始写一份粘包与拆包处理器,而是通过继承LengthFieldBasedFrameDecoderLengthFieldPrepender来实现粘包和拆包的处理。

如果用户确实需要不通过继承的方式实现自己的粘包和拆包处理器,这里可以通过实现MessageToByteEncoderByteToMessageDecoder来实现。这里MessageToByteEncoder的作用是将响应数据编码为一个 ByteBuf 对象,而ByteToMessageDecoder则是将接收到的 ByteBuf 数据转换为某个对象数据。通过实现这两个抽象类,用户就可以达到实现自定义粘包和拆包处理的目的。如下是这两个类及其抽象方法的声明:

代码语言:javascript复制
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) 
        throws Exception;
}
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
    protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) 
        throws Exception;
}

0 人点赞