最近学习netty,为了实践一些知识点,写了一个小demo,完成了client和server端之间的加密数据传输,并使用了protobuf对数据进行封装,代码虽然简单,但对初学者学习netty应该会有些许帮助,特此记录分享。 首先来看Server的实现,Server和所有的netty示例代码差不多,都是构建netty的ServerBootstrap。
代码语言:javascript复制package xyz.xindoo.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
public class Server {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LengthFieldBasedFrameDecoder(10240, 0, 2, 0, 2));
p.addLast(new LengthFieldPrepender(2));
p.addLast(new DecryptHandler());
p.addLast(new EncryptHandler());
p.addLast(new MessageDecoder());
p.addLast(new MessageEncoder());
p.addLast(new ServerHandler());
}
});
try {
ChannelFuture f = b.bind(9999).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在这个示例中,我其实是实现了3层编解码,第一层的LengthFieldXXX对应的是netty中的变成编码,第二层的DecryptHandler和EncryptHandler实现了数据的加解密,第三次的MessageDecode和MessageEncode是对数据进行protobuf序列化和反序列化。而最后的ServerHandler实现了对数据的处理,实际这里没啥逻辑,就是返回了消息体的原始长度。整体消息出入处理流程如下: 接下来我们看下Client的具体实现。
代码语言:javascript复制package xyz.xindoo.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
/**
* @author zhangweibin005
* @date 2022/7/12
*/
public class Client {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(bossGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LengthFieldBasedFrameDecoder(10240, 0, 2, 0, 2));
p.addLast(new LengthFieldPrepender(2));
p.addLast(new DecryptHandler());
p.addLast(new EncryptHandler());
p.addLast(new MessageDecoder());
p.addLast(new MessageEncoder());
p.addLast(new ClientHandler());
}
});
try {
Channel c = bootstrap.connect("127.0.0.1", 9999).sync().channel();
ClientHandler clientHandler = c.pipeline().get(ClientHandler.class);
clientHandler.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Client和Server乍眼一看几乎一模一样,仔细看其实是有些小细节不同,和Server不同的是Clint使用了Bootstrap而不是ServerBootstrap。另外这里需要特别提醒下,很多时候我们在写Server或者Client时所有的编解码都是成对出现的,而且通常使用同一种,会导致一种误区,Server端或者Client Encoder和Decoder必须成对出现。比如用了StringEncoder就必须用StringDecoder。 实际上这不完全对,成对出现是指Server和Client端需要使用对应的Decoder和Encoder,也就是出单其中某一端,其实可以使用不同的Decoder和Encoder。 接下来我们看下其他几个的代码,因为LengthFieldBasedFrameDecoder和LengthFieldPrepender是netty提供的,所以这里不再展示。我们先来看下加解密部分DecryptHandler和EncryptHandler。
代码语言:javascript复制package xyz.xindoo.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import lombok.extern.slf4j.Slf4j;
import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.List;
@Slf4j
public class EncryptHandler extends MessageToMessageEncoder<ByteBuf> {
private static SecretKeySpec secretKey;
private static byte[] key = Constants.SK.getBytes();
private static Cipher cipher;
static {
MessageDigest sha = null;
try {
sha = MessageDigest.getInstance("SHA-1");
key = sha.digest(key);
key = Arrays.copyOf(key, 16);
secretKey = new SecretKeySpec(key, "AES");
cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
cipher.init(Cipher.ENCRYPT_MODE, secretKey);
} catch (Exception e) {
log.error("", e);
}
}
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
byte[] array = new byte[msg.readableBytes()];
msg.getBytes(msg.readerIndex(), array);
// 调用cipher的api对数据完成加密
out.add(Unpooled.copiedBuffer(cipher.doFinal(array)));
}
}
代码语言:javascript复制package xyz.xindoo.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.List;
@Slf4j
public class DecryptHandler extends MessageToMessageDecoder<ByteBuf> {
private static SecretKeySpec secretKey;
private static byte[] key = Constants.SK.getBytes();
private static Cipher cipher;
static {
MessageDigest sha = null;
try {
sha = MessageDigest.getInstance("SHA-1");
key = sha.digest(key);
key = Arrays.copyOf(key, 16);
secretKey = new SecretKeySpec(key, "AES");
cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
cipher.init(Cipher.DECRYPT_MODE, secretKey);
} catch (Exception e) {
log.error("", e);
}
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
int length = msg.readableBytes();
byte[] array = new byte[length];
msg.getBytes(msg.readerIndex(), array);
// 使用cipher对数据解密
out.add(Unpooled.copiedBuffer(cipher.doFinal(array)));
}
}
这里我使用了javax.crypto.Cipher类,并使用AES方式对数据完成加解密。注意,Cipher并不是线程安全的,所以多线程之间不能同时使用同一个Cipher对象,如果有了解Netty原理的话,你应该可以理解,我这里这么用没啥问题。 接下来我们看看protobuf的部分,我实现用protobuf定义了message数据格式,只有简单的三个字段,如下:
代码语言:javascript复制syntax = "proto3";
option java_outer_classname = "MessageProto";
message Message {
int64 ts = 1;
string name = 2;
string msg = 3;
}
通过protoc生成了MessageProto.java代码,这里生成的java代码太长,我就不贴出来了,生成的命令如下:
代码语言:javascript复制 protoc message.proto --java_out=.
MessageProto.java就可以复制到代码中使用了,关于MessageProto的具体使用方法,可以参考MessageDecoder和MessageEncoder。
代码语言:javascript复制package xyz.xindoo.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.util.List;
public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
byte[] array = new byte[msg.readableBytes()];
msg.getBytes(msg.readerIndex(), array);
MessageProto.Message message = MessageProto.Message.parseFrom(array);
out.add(message);
}
}
代码语言:javascript复制package xyz.xindoo.netty;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
public class MessageEncoder extends MessageToMessageEncoder<MessageProto.Message> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageProto.Message msg, List<Object> out) throws Exception {
byte[] bytes = msg.toByteArray();
out.add(Unpooled.copiedBuffer(bytes));
}
}
最后我们来看下ServerHandler和ClientHandler的实现。
代码语言:javascript复制package xyz.xindoo.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ServerHandler extends SimpleChannelInboundHandler<MessageProto.Message> {
@Override
public void channelRead0(ChannelHandlerContext ctx, MessageProto.Message msg) throws Exception {
System.out.println(msg.getMsg());
MessageProto.Message resp = MessageProto.Message.newBuilder()
.setTs(System.currentTimeMillis())
.setMsg("消息长度:" msg.getMsg().length())
.build();
ctx.writeAndFlush(resp);
}
}
代码语言:javascript复制package xyz.xindoo.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
public class Client {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(bossGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LengthFieldBasedFrameDecoder(10240, 0, 2, 0, 2));
p.addLast(new LengthFieldPrepender(2));
p.addLast(new DecryptHandler());
p.addLast(new EncryptHandler());
p.addLast(new MessageDecoder());
p.addLast(new MessageEncoder());
p.addLast(new ClientHandler());
}
});
try {
Channel c = bootstrap.connect("127.0.0.1", 9999).sync().channel();
ClientHandler clientHandler = c.pipeline().get(ClientHandler.class);
clientHandler.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}