Netty(二)之整合Marshalling传输实体类

2023-12-25 17:17:23 浏览数 (1)

pom

代码语言:javascript复制
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>5.0.0.Alpha1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial -->
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-serial</artifactId>
            <version>1.3.14.GA</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.20</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-river -->
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-river</artifactId>
            <version>2.0.5.Final</version>
        </dependency>

自定义实体类

使用lombok添加的get、set、无参构造方法

代码语言:javascript复制
package jbossmarshalling;

import lombok.Data;

import java.io.Serializable;
import java.util.Date;

/**
 * @author CBeann
 * @create 2019-08-31 11:05
 */
@Data
public class Book implements Serializable {

    private Integer id;
    private String name;
    private Date date;




}

MarshallingCodeCFactory编解码工具类

固定格式,类似工具类

代码语言:javascript复制
package jbossmarshalling;

import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

/**
 * @author CBeann
 * @create 2019-08-31 14:36
 */
public final class MarshallingCodeCFactory {

    //创建JBoss Marshalling解码器MarshallingDecoder
    public static MarshallingDecoder buildMarshallingDecoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
        return decoder;
    }

    //创建JBoss Marshalling编码器MarshallingEncoder
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}

TimeServerHandler

代码语言:javascript复制
package jbossmarshalling;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * @author CBeann
 * @create 2019-08-27 18:31
 */
public class TimeServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        //服务器读客户端发送来的数据

        Book body = (Book) msg;
        System.out.println("The TimeServer receive :"   body);


        //服务器向客户端回应请求
        Book response = new Book();
        response.setId(11);
        response.setName("hello");
        ctx.writeAndFlush(response);

    }
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //将消息发送队列中的消息写入到SocketChannel中发送给对方
        ctx.flush();
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }


}

TimeServer

代码语言:javascript复制
package jbossmarshalling;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * @author CBeann
 * @create 2019-08-27 18:22
 */
public class TimeServer {


    public static void main(String[] args) throws Exception {
        int port = 8080;
        //配置服务器端的NIO线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //重点
                            socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                            socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            socketChannel.pipeline().addLast(new TimeServerHandler());
                        }
                    });
            //绑定端口
            ChannelFuture f = b.bind(port).sync();
            //等待服务端监听端口关闭
            f.channel().closeFuture().sync();


        } catch (Exception e) {

        } finally {
            //优雅关闭,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

TimeClientHandler

代码语言:javascript复制
package jbossmarshalling;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

/**
 * @author CBeann
 * @create 2019-08-27 18:47
 */
public class TimeClientHandler extends ChannelHandlerAdapter {


    //客户端读取服务器发送的数据
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {

            Book body = (Book) msg;
            System.out.println("Now is:"   body);
        } catch (Exception e) {

        } finally {
            //标配
            ReferenceCountUtil.release(msg);
        }


    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //将消息发送队列中的消息写入到SocketChannel中发送给对方
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

TimeClient

代码语言:javascript复制
package jbossmarshalling;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.util.Date;

/**
 * @author CBeann
 * @create 2019-08-27 18:43
 */
public class TimeClient {


    public static void main(String[] args) throws Exception {
        int port = 8080;
        String host = "127.0.0.1";
        //配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                            socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            socketChannel.pipeline().addLast(new TimeClientHandler());
                        }
                    });
            //发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();

            Book book = new Book();
            book.setId(1);
            book.setName("十万个为什么");
            book.setDate(new Date());


            //f.channel().writeAndFlush(Unpooled.copiedBuffer("您好".getBytes()));
            f.channel().writeAndFlush(book);
            //Thread.sleep0);//防止TCP粘包


            //等待客户端链路关闭
            f.channel().closeFuture().sync();

        } catch (Exception e) {

        } finally {
            //优雅关闭
            group.shutdownGracefully();
        }
    }
}

参考

Netty之传输POJO(使用JBoss的Marshalling序列化方式)_Learning-CSDN博客_marshalling netty

出现的问题

1)代码没有问题,就是客户端服务器端不交互

依赖有问题

代码语言:javascript复制
 <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>5.0.0.Alpha1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial -->
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-serial</artifactId>
            <version>1.3.14.GA</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-river -->
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-river</artifactId>
            <version>2.0.5.Final</version>
        </dependency>

结果

亲测有效

0 人点赞