Netty 私有协议粘包拆包实例

2022-03-27 15:20:58 浏览数 (2)

1. 实例说明

最近遇到一个组件,其 client 和 server 端的通信协议如下:

  • 报文头部长度:80 byte;其中前 16 byte 为字符串格式的数字,表示 body 的长度;后 64 byte 为服务名称。
  • 报文体:从第 81 byte 开始为报文体,采用 URL encode 后的字符串。
代码语言:javascript复制
0                15                                                                   79
 ---------------- --------------------------------------------------------------------- 
|   body length  |                         service name                                |
 ---------------- --------------------------------------------------------------------- 
|                                                                                      |
|                                        body                                          |
|                                                                                      |
 -------------------------------------------------------------------------------------- 

整个报文采用 UTF-8 编码,Server 端使用 C 开发的。

接下来,采用 Java Netty 模拟该组件的功能,以演示私有协议下 netty 的粘包/拆包的实现。

2. Decoder

代码语言:javascript复制
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.CharsetUtil;

import java.nio.charset.StandardCharsets;
import java.util.List;


public class DemoPrivateProtocolDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        System.out.println("ByteBuf.readableBytes "   in.readableBytes());

        while (true) {
            if (in.readableBytes() < 80) {
                break;
            }
            in.markReaderIndex();

            String strLen = in.readCharSequence(16, CharsetUtil.UTF_8).toString().trim();
            int bodyLength = Integer.parseInt(strLen);

            in.skipBytes(64);
            if (in.readableBytes() < bodyLength) {
                in.resetReaderIndex();
                break;
            }

            String body = in.readCharSequence(bodyLength, StandardCharsets.UTF_8).toString();
            out.add(body);
        }
    }
}

3. Server

代码语言:javascript复制
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class SocketServer {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new DemoPrivateProtocolDecoder());
                            pipeline.addLast(new DemoServerHandler());
                        }
                    });

            int port = 9000;
            ChannelFuture channelFuture = serverBootstrap.bind(port).addListener(future -> {
                if (future.isSuccess()) {
                    System.out.println("绑定端口 "   port   " 成功!");
                } else {
                    System.err.println("绑定端口 "   port   " 失败!");
                }
            }).sync();

            channelFuture.channel().closeFuture().sync();
        }
        finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

代码语言:javascript复制
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class DemoServerHandler extends ChannelInboundHandlerAdapter {

    private static AtomicInteger count = new AtomicInteger(0);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        String body = (String)msg;

        Map<String, String> results = new HashMap<>();

        String[] resultTokens = body.split("&");
        for (String result : resultTokens) {
            String[] tokens = result.split("=");
            if (tokens.length > 0) {
                if (tokens.length == 2) {
                    results.put(tokens[0], URLDecoder.decode(tokens[1], StandardCharsets.UTF_8.name()));
                }
                else {
                    results.put(tokens[0], "");
                }
            }
        }

        System.out.println("---> received: "   count.incrementAndGet() );
        for (Map.Entry<String, String> entry : results.entrySet()) {
            System.out.println("    ( "   entry.getKey()   ", "   entry.getValue()   " )");
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

4. Client

代码语言:javascript复制
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class SocketClient {

    public static void main(String[] args) throws InterruptedException {

        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new SocketClientHandler());
                        }
                    });

            ChannelFuture channelFuture = bootstrap.connect("localhost", 9000).addListener(future -> {
                if (future.isSuccess()) {
                    System.out.println("连接服务器成功!");
                } else {
                    System.err.println("连接服务器失败!");
                }
            }).sync();

            channelFuture.channel().closeFuture().sync();
        }
        finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

代码语言:java复制
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class SocketClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        Map<String, String> msgMap = new HashMap<>();

        for (int j = 0; j < 1000; j  ) {
            for (int i = 0; i < 1000; ) {
                System.out.println("---> send: "   (  i   3600 * j));

                msgMap.put("name", "Michael "   i);
                msgMap.put("sex", i % 2 == 0 ? "F" : "M");
                msgMap.put("age", ""   i);

                ByteBuf byteBuf = this.encode(msgMap);
                ctx.writeAndFlush(byteBuf);
            }

            TimeUnit.MILLISECONDS.sleep(5);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    private ByteBuf encode(Map<String, String> data) throws Exception {

        StringBuilder sb = new StringBuilder();
        for (Map.Entry<String, String> entry : data.entrySet()) {
            sb.append(entry.getKey()).append("=")
                    .append(URLEncoder.encode(entry.getValue(), CharsetUtil.UTF_8.name()))
                    .append("&");
        }

        String body = sb.toString();
        int bodyLength = body.length();

        System.out.println("---> [send packet] body length: "    bodyLength );

        ByteBuf out = Unpooled.buffer(80   bodyLength);

        out.writerIndex(0);
        out.writeCharSequence(String.valueOf(bodyLength), StandardCharsets.UTF_8);

        out.writerIndex(16);
        out.writeCharSequence("DemoService", StandardCharsets.UTF_8);

        out.writerIndex(80);
        out.writeCharSequence(body, StandardCharsets.UTF_8);

        return out;
    }
}

5. 运行结果

下面是代码运行后的截图,可以看出 TCP 报文被 Netty 正确的进行了粘包和拆包处理。

0 人点赞