1. 实例说明
最近遇到一个组件,其 client 和 server 端的通信协议如下:
- 报文头部长度:80 byte;其中前 16 byte 为字符串格式的数字,表示 body 的长度;后 64 byte 为服务名称。
- 报文体:从第 81 byte 开始为报文体,采用 URL encode 后的字符串。
0 15 79
---------------- ---------------------------------------------------------------------
| body length | service name |
---------------- ---------------------------------------------------------------------
| |
| body |
| |
--------------------------------------------------------------------------------------
整个报文采用 UTF-8 编码,Server 端使用 C 开发的。
接下来,采用 Java Netty 模拟该组件的功能,以演示私有协议下 netty 的粘包/拆包的实现。
2. Decoder
代码语言:javascript复制import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.CharsetUtil;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class DemoPrivateProtocolDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("ByteBuf.readableBytes " in.readableBytes());
while (true) {
if (in.readableBytes() < 80) {
break;
}
in.markReaderIndex();
String strLen = in.readCharSequence(16, CharsetUtil.UTF_8).toString().trim();
int bodyLength = Integer.parseInt(strLen);
in.skipBytes(64);
if (in.readableBytes() < bodyLength) {
in.resetReaderIndex();
break;
}
String body = in.readCharSequence(bodyLength, StandardCharsets.UTF_8).toString();
out.add(body);
}
}
}
3. Server
代码语言:javascript复制import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class SocketServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DemoPrivateProtocolDecoder());
pipeline.addLast(new DemoServerHandler());
}
});
int port = 9000;
ChannelFuture channelFuture = serverBootstrap.bind(port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("绑定端口 " port " 成功!");
} else {
System.err.println("绑定端口 " port " 失败!");
}
}).sync();
channelFuture.channel().closeFuture().sync();
}
finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
代码语言:javascript复制import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class DemoServerHandler extends ChannelInboundHandlerAdapter {
private static AtomicInteger count = new AtomicInteger(0);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String)msg;
Map<String, String> results = new HashMap<>();
String[] resultTokens = body.split("&");
for (String result : resultTokens) {
String[] tokens = result.split("=");
if (tokens.length > 0) {
if (tokens.length == 2) {
results.put(tokens[0], URLDecoder.decode(tokens[1], StandardCharsets.UTF_8.name()));
}
else {
results.put(tokens[0], "");
}
}
}
System.out.println("---> received: " count.incrementAndGet() );
for (Map.Entry<String, String> entry : results.entrySet()) {
System.out.println(" ( " entry.getKey() ", " entry.getValue() " )");
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
4. Client
代码语言:javascript复制import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class SocketClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new SocketClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 9000).addListener(future -> {
if (future.isSuccess()) {
System.out.println("连接服务器成功!");
} else {
System.err.println("连接服务器失败!");
}
}).sync();
channelFuture.channel().closeFuture().sync();
}
finally {
eventLoopGroup.shutdownGracefully();
}
}
}
代码语言:java复制import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class SocketClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Map<String, String> msgMap = new HashMap<>();
for (int j = 0; j < 1000; j ) {
for (int i = 0; i < 1000; ) {
System.out.println("---> send: " ( i 3600 * j));
msgMap.put("name", "Michael " i);
msgMap.put("sex", i % 2 == 0 ? "F" : "M");
msgMap.put("age", "" i);
ByteBuf byteBuf = this.encode(msgMap);
ctx.writeAndFlush(byteBuf);
}
TimeUnit.MILLISECONDS.sleep(5);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
private ByteBuf encode(Map<String, String> data) throws Exception {
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, String> entry : data.entrySet()) {
sb.append(entry.getKey()).append("=")
.append(URLEncoder.encode(entry.getValue(), CharsetUtil.UTF_8.name()))
.append("&");
}
String body = sb.toString();
int bodyLength = body.length();
System.out.println("---> [send packet] body length: " bodyLength );
ByteBuf out = Unpooled.buffer(80 bodyLength);
out.writerIndex(0);
out.writeCharSequence(String.valueOf(bodyLength), StandardCharsets.UTF_8);
out.writerIndex(16);
out.writeCharSequence("DemoService", StandardCharsets.UTF_8);
out.writerIndex(80);
out.writeCharSequence(body, StandardCharsets.UTF_8);
return out;
}
}
5. 运行结果
下面是代码运行后的截图,可以看出 TCP 报文被 Netty 正确的进行了粘包和拆包处理。