Netty TCP解决粘包拆包

2023-10-28 08:17:10 浏览数 (2)

1、前言

TCP(Transmission Control Protocol)是一种在计算机网络中广泛使用的协议,用于可靠的、面向连接的数据通信。

  1. 可靠性:TCP是一种可靠的协议,它确保数据从发送方传输到接收方,无丢失、无损坏、有序传递。如果发生数据包的丢失或损坏,TCP会重新发送这些数据包,直到接收方成功接收。
  2. 面向连接:TCP是一种面向连接的协议,这意味着在数据传输之前,发送方和接收方需要建立一个连接,数据在连接上传输。传输完成后,连接会被释放。
  3. 全双工通信:TCP支持全双工通信,这意味着在建立连接后,双方可以同时发送和接收数据。
  4. 流式传输:TCP提供了一种流式传输服务,数据被划分为小的数据块,以字节流的方式传输。接收方根据数据块的边界进行数据的解析。
  5. 拥塞控制:TCP包含拥塞控制机制,用于避免过多的数据流量导致网络拥塞。TCP发送方会根据网络的拥塞情况来动态调整数据传输速度。
  6. 校验和:TCP使用校验和来检测数据的完整性。接收方会验证数据的校验和,以确保数据在传输过程中没有发生错误。
  7. 有序传输:TCP保证数据包的有序传输,即使数据包到达的顺序与发送的顺序不同,接收方也会对其进行重新排序。

2、粘包与拆包现象

由于TCP是面向流式传输的一种协议,所以就像水管里的水一样,无界限的传输。然而在接收端,数据可能以不同的方式到达,就比如正常包粘包、拆包。

2.1、现象描述

假设客户端发送2个连续的数据包到服务器,数据包用packet1packet2分别表示,则服务器接收到的数据可以分为3种情况:

情况1: 服务器接收到2个数据包,没有拆包,也没有粘包问题;

情况2: 服务器只接收到一个数据包(存在粘包问题)

  • 因为tcp不会丢失数据包,因此这一个数据包就封装了2个原生数据包的信息,这种现象叫做粘包
  • 在这种情况,接收者并不知道2个原生包的界限,因此接收者很难处理;

情况3: 接收者接收到2个冗余或不完整的数据包(粘包与拆包问题同时发生)

  • 接收者接收到2个数据包,但这2个数据包要么不完整,要么掺杂了其他数据包的部分数据
  • 在这种情况下,粘包拆包同时发生。
  • 如果这2个包不被特殊处理,对于接收者来说也很难处理;

2.2、代码演示粘包拆包现象

业务场景:客户端连续发送10条消息(字符串)到服务器,查看服务器接收情况

客户端发送消息代码:

服务器接收消息代码:

接收消息的打印效果:

代码语言:javascript复制
=================================服务器收到的数据 hello server0 服务器累计收到 [1] 个消息包=================================服务器收到的数据 hello server1 服务器累计收到 [2] 个消息包=================================服务器收到的数据 hello server2hello server3hello server4hello server5hello server6 服务器累计收到 [3] 个消息包=================================服务器收到的数据 hello server7hello server8hello server9 服务器累计收到 [4] 个消息包点击复制

收起

解释:

  • 客户端发送了10条消息,服务器接收到了 4个数据包,而不是10个数据包 ;
  • 显然,发生了tcp粘包;
  • 这10条消息本来是10个数据报文,却被合并(粘)为4个数据包;
  • 问题是: 如何把这4个数据包还原为10个数据包呢 (在高并发情况下,各式各样的数据包会更多)
  • 如果无法还原,则服务器无法正确解析报文并做相应处理;

3、粘包与拆包主要原因

1、粘包原因:

发送的数据大小 小于 发送缓冲区,TCP就会把发送的数据多次写入缓冲区,此时发生粘包

接收数据方的应用层没有及时从 接收缓冲区读取数据,也会发生粘包

2、拆包原因:

发送的数据大小 大于 TCP发送缓冲区,就会发生拆包

发送的数据大小 大于 报文最大长度,也会拆包

4、粘包与拆包解决方法

解决粘包拆包的关键在于 为每一个数据包添加界限标识,一般常用的方法如下:

方法1、发送方为每一个数据包添加报文头部。头部至少包含数据包长度(类似http协议的头部length)。 通过这种方式,接收方通过读取头部的长度知道当前数据包的界限,并在界限处停止读取。

方法2、发送方以固定长度封装数据包。如果不足,则补0填充。

方法3、自定义设置数据包的界限标识,如添加特别标识(如======)。接收方通过标识可以识别不同的数据包;

5、代码实现

这里的解决方法是采用方法1,设置每个数据包的长度到报文头部;

5.1、协议数据包封装类

代码语言:javascript复制
/** * @Description 协议数据包  */public class ProtocolMessage {     private int length;    private byte[] content;     /**     * @description 构造器      */    public ProtocolMessage() {    }     public int getLength() {        return length;    }     public void setLength(int length) {        this.length = length;    }     public byte[] getContent() {        return content;    }     public void setContent(byte[] content) {        this.content = content;    }}点击复制

收起

5.2、Netty服务端

代码语言:javascript复制
public class ProtocolNettyServer89 {     public static void main(String[] args) throws InterruptedException {        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap serverBootstrap = new ServerBootstrap();            serverBootstrap.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .childHandler(new ProtocolNettyServerInitializer()); // 自定义一个初始化类            // 自动服务器            ChannelFuture channelFuture = serverBootstrap.bind(8089).sync();            System.out.println("服务器启动成功");            // 监听关闭            channelFuture.channel().closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}点击复制

展开剩余的17%

初始化类 ProtocolNettyServerInitializer()

代码语言:javascript复制
public class ProtocolNettyServerInitializer extends ChannelInitializer<SocketChannel> {    @Override    protected void initChannel(SocketChannel ch) throws Exception {        ChannelPipeline pipeline = ch.pipeline();        // 添加入站解码器-把字节转为协议报文便于业务逻辑处理        pipeline.addLast(new ProtocolMessageDecoder());        // 添加出站编码器-把协议报文转为字节便于网络传输        pipeline.addLast(new ProtocolMessageEncoder());         // 添加业务逻辑handler        pipeline.addLast(new ProtocolNettyServerHandler());    }}点击复制

处理类 ProtocolNettyServerHandler()

代码语言:javascript复制
public class ProtocolNettyServerHandler extends SimpleChannelInboundHandler<ProtocolMessage> {     private int count = 0;     @Override    protected void channelRead0(ChannelHandlerContext ctx, ProtocolMessage msg) throws Exception {        // 接收到数据并处理        int length = msg.getLength();        String bodyStr = new String(msg.getContent(), StandardCharsets.UTF_8);        System.out.println("====================================");        System.out.println("服务器接收的消息如下:");        System.out.println("报文长度:"   length);        System.out.println("报文体内容: "   bodyStr);        System.out.println("服务器累计接收到的消息包数量 = "     this.count);         //  回复客户端        byte[] body = ("我是服务器"   count).getBytes(StandardCharsets.UTF_8);        int responseLen = body.length;        // 构建一个响应协议包        ProtocolMessage responseMsg = new ProtocolMessage();        responseMsg.setLength(responseLen);        responseMsg.setContent(body);        ctx.writeAndFlush(responseMsg);    }     @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        cause.printStackTrace();        ctx.close();    }}点击复制

收起

5.3、Netty客户端

代码语言:javascript复制
public class ProtocolNettyClient89 {    public static void main(String[] args) throws InterruptedException {        EventLoopGroup group = new NioEventLoopGroup();        try {            Bootstrap bootstrap = new Bootstrap();            bootstrap.group(group)                    .channel(NioSocketChannel.class)                    .handler(new ProtocolNettyClientInitializer());  // 自定义一个初始化类            // 连接服务器            ChannelFuture channelFuture = bootstrap.connect("localhost", 8089).sync();            channelFuture.channel().closeFuture().sync();        } finally {            group.shutdownGracefully();        }    }}点击复制

初始化类 ProtocolNettyClientInitializer()

代码语言:javascript复制
public class ProtocolNettyClientInitializer extends ChannelInitializer<SocketChannel> {    @Override    protected void initChannel(SocketChannel ch) throws Exception {        ChannelPipeline pipeline = ch.pipeline();        // 添加出站处理器- 协议报文转字节以便网络传输        pipeline.addLast(new ProtocolMessageEncoder());        // 添加入站解码器-把字节转为协议报文对象以便业务逻辑处理        pipeline.addLast(new ProtocolMessageDecoder());         // 添加一个自定义handler,处理业务逻辑        pipeline.addLast(new ProtocolNettyClientHandler());    }}点击复制

处理类 ProtocolNettyClientHandler()

代码语言:javascript复制
public class ProtocolNettyClientHandler extends SimpleChannelInboundHandler<ProtocolMessage> {    private int count;     @Override    protected void channelRead0(ChannelHandlerContext ctx, ProtocolMessage msg) throws Exception {        // 读取服务器响应报文        int length = msg.getLength();        byte[] body = msg.getContent();        System.out.println("=============================");        System.out.println("客户端接收的消息如下:");        System.out.println("长度 = "   length);        System.out.println("报文体 = "   new String(body, StandardCharsets.UTF_8));        System.out.println("客户端累计接收的消息包数量 = "     count);    }     @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        // 发送10条数据到服务器        for (int i = 1; i <= 5; i  ) {            byte[] body = ("你好服务器,我是客户端张三"   i).getBytes(StandardCharsets.UTF_8);            // 创建协议包对象            ProtocolMessage message = new ProtocolMessage();            message.setContent(body);            message.setLength(body.length);            // 发送            ctx.writeAndFlush(message);        }    }     @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        cause.printStackTrace();        ctx.close();    }}点击复制

收起

5.4、编码类与解码类

解码类 ProtocolMessageDecoder():

代码语言:javascript复制
/** * @Description 协议报文解码器  */public class ProtocolMessageDecoder extends ByteToMessageDecoder {    @Override    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {        System.out.println("ProtocolMessageDecoder.decode() 被调用");        //  把字节 转为 协议报文        int length = in.readInt();        byte[] body = new byte[length];        in.readBytes(body);        // 封装成 ProtocolMessage,放入out,送入下一个 Handler处理        ProtocolMessage protocolMessage = new ProtocolMessage();        protocolMessage.setLength(length);        protocolMessage.setContent(body);        // 添加到out        out.add(protocolMessage);    }}点击复制

收起

编码类 ProtocolMessageEncoder ():

代码语言:javascript复制
/** * @Description 协议消息编码器  */public class ProtocolMessageEncoder extends MessageToByteEncoder<ProtocolMessage> {    @Override    protected void encode(ChannelHandlerContext ctx, ProtocolMessage msg, ByteBuf out) throws Exception {        System.out.println("ProtocolMessageEncoder.encode() 被调用");        out.writeInt(msg.getLength());        out.writeBytes(msg.getContent());    }}点击复制

6、再次测试

客户端发送5条消息到服务器:

服务器接收的数据包为 5个,如下(显然没有发生拆包粘包现象):

代码语言:javascript复制
ProtocolMessageDecoder.decode() 被调用====================================服务器接收的消息如下:报文长度:40报文体内容: 你好服务器,我是客户端张三1服务器累计接收到的消息包数量 = 1ProtocolMessageEncoder.encode() 被调用ProtocolMessageDecoder.decode() 被调用====================================服务器接收的消息如下:报文长度:40报文体内容: 你好服务器,我是客户端张三2服务器累计接收到的消息包数量 = 2ProtocolMessageEncoder.encode() 被调用ProtocolMessageDecoder.decode() 被调用====================================服务器接收的消息如下:报文长度:40报文体内容: 你好服务器,我是客户端张三3服务器累计接收到的消息包数量 = 3ProtocolMessageEncoder.encode() 被调用ProtocolMessageDecoder.decode() 被调用====================================服务器接收的消息如下:报文长度:40报文体内容: 你好服务器,我是客户端张三4服务器累计接收到的消息包数量 = 4ProtocolMessageEncoder.encode() 被调用ProtocolMessageDecoder.decode() 被调用====================================服务器接收的消息如下:报文长度:40报文体内容: 你好服务器,我是客户端张三5服务器累计接收到的消息包数量 = 5ProtocolMessageEncoder.encode() 被调用点击复制

收起

7、结尾

以上内容就是netty使用数据包添加报文头部的方式解决粘包拆包的现象

0 人点赞