java tcp粘包_socket拆包与组班

2022-09-22 14:37:29 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

import java.nio.ByteBuffer;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.DelimiterBasedFrameDecoder;

import io.netty.handler.codec.FixedLengthFrameDecoder;

import io.netty.handler.codec.string.StringDecoder;

import io.netty.handler.codec.string.StringEncoder;

public class Server {

public static void main(String[] args) throws Exception{

//1 创建2个线程,一个是负责接收客户端的连接。一个是负责进行数据传输的

EventLoopGroup pGroup = new NioEventLoopGroup();

EventLoopGroup cGroup = new NioEventLoopGroup();

//2 创建服务器辅助类

ServerBootstrap b = new ServerBootstrap();

b.group(pGroup, cGroup)

.channel(NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG, 1024)

.option(ChannelOption.SO_SNDBUF, 32*1024)

.option(ChannelOption.SO_RCVBUF, 32*1024)

.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel sc) throws Exception {

//设置特殊分隔符

ByteBuf buf = Unpooled.copiedBuffer(“$_”.getBytes());

sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));

//设置字符串形式的解码

sc.pipeline().addLast(new StringDecoder());

sc.pipeline().addLast(new ServerHandler());

}

});

//4 绑定连接

ChannelFuture cf = b.bind(8765).sync();

//等待服务器监听端口关闭

cf.channel().closeFuture().sync();

pGroup.shutdownGracefully();

cGroup.shutdownGracefully();

}

}

import io.netty.bootstrap.Bootstrap;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

import io.netty.handler.codec.DelimiterBasedFrameDecoder;

import io.netty.handler.codec.FixedLengthFrameDecoder;

import io.netty.handler.codec.string.StringDecoder;

import io.netty.handler.codec.string.StringEncoder;

public class Client {

public static void main(String[] args) throws Exception {

EventLoopGroup group = new NioEventLoopGroup();

Bootstrap b = new Bootstrap();

b.group(group)

.channel(NioSocketChannel.class)

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel sc) throws Exception {

//

ByteBuf buf = Unpooled.copiedBuffer(“$_”.getBytes());

sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));

sc.pipeline().addLast(new StringDecoder());

sc.pipeline().addLast(new ClientHandler());

}

});

ChannelFuture cf = b.connect(“127.0.0.1”, 8765).sync();

cf.channel().writeAndFlush(Unpooled.wrappedBuffer(“bbbb$_”.getBytes()));

cf.channel().writeAndFlush(Unpooled.wrappedBuffer(“cccc$_”.getBytes()));

//等待客户端端口关闭

cf.channel().closeFuture().sync();

group.shutdownGracefully();

}

}

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerAdapter;

import io.netty.channel.ChannelHandlerContext;

public class ServerHandler extends ChannelHandlerAdapter {

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

System.out.println(” server channel active… “);

}

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

String request = (String)msg;

System.out.println(“Server :” msg);

String response = “服务器响应:” msg “$_”;

ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {

ctx.close();

}

}

import io.netty.channel.ChannelHandlerAdapter;

import io.netty.channel.ChannelHandlerContext;

import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelHandlerAdapter{

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

System.out.println(“client channel active… “);

}

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

try {

String response = (String)msg;

System.out.println(“Client: ” response);

} finally {

ReferenceCountUtil.release(msg);

}

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

}

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelHandler;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.logging.LogLevel;

import io.netty.handler.logging.LoggingHandler;

import io.netty.handler.timeout.ReadTimeoutHandler;

public class Server {

public Server() {

}

public static void main(String[] args) throws Exception {

EventLoopGroup pGroup = new NioEventLoopGroup();

EventLoopGroup cGroup = new NioEventLoopGroup();

ServerBootstrap b = new ServerBootstrap();

((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)b.group(pGroup, cGroup).channel(NioServerSocketChannel.class)).option(ChannelOption.SO_BACKLOG, 1024)).handler(new LoggingHandler(LogLevel.INFO))).childHandler(new ChannelInitializer() {

protected void initChannel(SocketChannel sc) throws Exception {

sc.pipeline().addLast(new ChannelHandler[]{MarshallingCodeCFactory.buildMarshallingDecoder()});

sc.pipeline().addLast(new ChannelHandler[]{MarshallingCodeCFactory.buildMarshallingEncoder()});

sc.pipeline().addLast(new ChannelHandler[]{new ReadTimeoutHandler(5)});

sc.pipeline().addLast(new ChannelHandler[]{new ServerHandler()});

}

});

ChannelFuture cf = b.bind(8765).sync();

cf.channel().closeFuture().sync();

pGroup.shutdownGracefully();

cGroup.shutdownGracefully();

}

}

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelHandler;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

import io.netty.handler.logging.LogLevel;

import io.netty.handler.logging.LoggingHandler;

import io.netty.handler.timeout.ReadTimeoutHandler;

import java.util.concurrent.TimeUnit;

public class Client {

private EventLoopGroup group;

private Bootstrap b;

private ChannelFuture cf;

public static Client getInstance() {

return Client.SingletonHolder.instance;

}

private Client() {

this.group = new NioEventLoopGroup();

this.b = new Bootstrap();

((Bootstrap)((Bootstrap)((Bootstrap)this.b.group(this.group)).channel(NioSocketChannel.class)).handler(new LoggingHandler(LogLevel.INFO))).handler(new ChannelInitializer() {

protected void initChannel(SocketChannel sc) throws Exception {

sc.pipeline().addLast(new ChannelHandler[]{MarshallingCodeCFactory.buildMarshallingDecoder()});

sc.pipeline().addLast(new ChannelHandler[]{MarshallingCodeCFactory.buildMarshallingEncoder()});

sc.pipeline().addLast(new ChannelHandler[]{new ReadTimeoutHandler(5)});

sc.pipeline().addLast(new ChannelHandler[]{new ClientHandler()});

}

});

}

public void connect() {

try {

this.cf = this.b.connect(“127.0.0.1”, 8765).sync();

System.out.println(“远程服务器已经连接, 可以进行数据交换..”);

} catch (Exception var2) {

var2.printStackTrace();

}

}

public ChannelFuture getChannelFuture() {

if (this.cf == null) {

this.connect();

}

if (!this.cf.channel().isActive()) {

this.connect();

}

return this.cf;

}

public static void main(String[] args) throws Exception {

final Client c = getInstance();

ChannelFuture cf = c.getChannelFuture();

for(int i = 1; i <= 3; i) {

Request request = new Request();

request.setId(“” i);

request.setName(“pro” i);

request.setRequestMessage(“数据信息” i);

cf.channel().writeAndFlush(request);

TimeUnit.SECONDS.sleep(4L);

}

cf.channel().closeFuture().sync();

(new Thread(new Runnable() {

public void run() {

try {

System.out.println(“进入子线程…”);

ChannelFuture cf = c.getChannelFuture();

System.out.println(cf.channel().isActive());

System.out.println(cf.channel().isOpen());

Request request = new Request();

request.setId(“4”);

request.setName(“pro4”);

request.setRequestMessage(“数据信息4”);

cf.channel().writeAndFlush(request);

cf.channel().closeFuture().sync();

System.out.println(“子线程结束.”);

} catch (InterruptedException var3) {

var3.printStackTrace();

}

}

})).start();

System.out.println(“断开连接,主线程结束..”);

}

private static class SingletonHolder {

static final Client instance = new Client((Client)null);

private SingletonHolder() {

}

}

}

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/169721.html原文链接:https://javaforall.cn

0 人点赞