知识点:
Netty框架如何引导服务端监听网络端口并读写消息 Netty框架如何连接远程服务器并读写消息 Netty框架ChannelInboundHandlerAdapter部分事件使用方法 Netty框架Channel管道使用方法
前言
上一篇对Netty框架做了一个大概的介绍,并对核心部件Channel、ChannelHeadler、Future、事件从概念与作用上做了说明,另外还与Java NIO 在编码上做了一个简单的对比,引出了EventLoop。本篇使用一个小示例来了解下Netty框架怎么使用,真正的用起来。
交互图
服务端示例
代码语言:java复制public static void main(String[] args) throws InterruptedException {
//创建EventLoop
NioEventLoopGroup master = new NioEventLoopGroup();
//创建服务端引导
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(master);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
//指定使用NioServerSocketChannel作为Channel类型
bootstrap.channel(NioServerSocketChannel.class);
//注册一个ChannelInboundHandlerAdapter类型处理channel的连接
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//在channel的pipeline中加入消息处理类型为ChannelInboundHandlerAdapter
ch.pipeline().addLast(new MessageChannel());
}
});
//使用bind方法启动服务端的监听
ChannelFuture future = bootstrap.bind(30888).sync();
future.channel().closeFuture().sync();
master.shutdownGracefully().sync();
}
public static class MessageChannel extends ChannelInboundHandlerAdapter {
//处理消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//使用ByteBuf获取消息
ByteBuf buf = (ByteBuf) msg;
String message = buf.toString(CharsetUtil.UTF_8);
System.out.println("Client message:" message);
//服务端写消息给客户端
ctx.writeAndFlush(ctx.alloc().buffer().writeBytes(("Copy message:" message).getBytes()));
if ("bye".equalsIgnoreCase(message.replace("n", ""))) {
ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("bye".getBytes()));
//关闭连接
ctx.close();
}
}
//处理连接被激活的事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client connect:" ctx.channel().remoteAddress());
super.channelActive(ctx);
//写消息给客户端
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("Connect success!".getBytes()));
}
//处理异常,异常后关闭连接
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
cause.printStackTrace();
ctx.close();
}
}
首先创建ServiceBootstrap实例。因为你准备使用NIO网络模型传输数据,所以使用NioEventLoopGroup来接受和处理新的连接,并且将Channel的类型指定为NioServiceSocketChannel。在这之后使用本地地址与指定的端口将服务器绑定到这个地址上监听新的连接请求。
在请求的处理上使用特殊的类ChannelInitializer。这是一个关键的类型,当有一个新的连接被接受时会创建一个新的Channel实例,而ChannelInitializer会把MessageChannel类型的实例添加至Channel的Pipeline中;而MessageChannel将会处理入站消息并输出一些信息给客户端。(这里使用MessageChannel模拟业务处理逻辑)
- MessageChannel 实现了业务逻辑
- main方法引导服务启动
引导过程中的步骤如下:
- 创建一个ServiceBootstrap实例以引导和绑定服务器监听的地址与端口
- 创建并分配一个NioEventLoopGroup实例以进行事件的处理
- 当有新连接时使用一个ChannelInitializer实例化一个Channel
- 使用ServiceBootstrap.bind()方法启动服务器监听
客户端示例
代码语言:java复制public static void main(String[] arg) throws InterruptedException {
NioEventLoopGroup group=new NioEventLoopGroup()
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group)
bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
//绑定远程服务地址与端口
bootstrap.remoteAddress("127.0.0.1", 30888);
final MessageChannel channel = new MessageChannel();
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(channel);
}
});
//连接远程服务器
ChannelFuture channelFuture = bootstrap.connect().sync();
channelFuture.channel().closeFuture().sync();
group.shutdownGracefully();
}
@ChannelHandler.Sharable
public static class MessageChannel extends SimpleChannelInboundHandler<ByteBuf> {
//处理消息读
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("Client received: " msg.toString(CharsetUtil.UTF_8));
}
//处理连接信息
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("正在连接...");
//主动发送一条消息至远程服务器
ctx.channel().write(Unpooled.copiedBuffer("客户端的消息n".getBytes()));
//冲刷消息
ctx.channel().flush();
super.channelActive(ctx);
}
//处理连接关闭事件
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接关闭...");
super.channelInactive(ctx);
}
//处理异常,异常时关系channel
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
cause.printStackTrace();
ctx.close();
}
}
客户端的代码逻辑与服务端的逻辑基本相同,在Bootstrap类型上使用了Bootstrap类而不是ServiceBootStrap类,这和我们之前讲Java NIO时基本相同,就不多阐述了。这里可以看到不同的是MessageChannel类型客户端继承自SimpleChannelInboundHandler类型,服务端继承自ChannelInboundHandlerAdapter类型;其实没有什么区别,它们都是继承自ChannelInboundHandlerAdapter类型,只是SimpleChannelInboundHandler更方便处理。