Netty实战之序列化与反序列化协议
序列化与反序列化概念
序列化
客户端与服务器端通讯,不能将对象进行直接传输的。通讯的本质为流传输,所以,我们可以将对象序列化成流进行传输。
实质:将对象转化为字节的过程
反序列化
服务端接收到序列化过后的对象,需要反序列化后,才能将之转化为对象拿来使用
实质:将字节转换为对象的过程
序列化的模式(两种)
对象持久化概念:将对象转化为字节,存储到硬盘上的过程
网络传输对象概念: 将对象转化为字节的形式(序列化 ),二进制方式发送给服务端,服务端再将字节转换为对象的过程(反序列化),总的过程即为rpc远程通讯
序列化图解
什么rpc远程调用
服务器与服务器之间进行通讯
序列化的模式:
对象持久化:将对象转化为字节,存储到硬盘上的操作。
网络对象传输:客户端将对象转化为字节(序列化),再将数据传输到服务端,服务端将数据从字节转化为对象(反序列化)。
序列化有哪几种方式?
- 采用JSON序列化进行传输
- 使用XML协议进行传输
- 使用protoBuf,谷歌自定义的协议
- 使用MessagePack进行序列化传输
- 大公司开发的自定义协议
MessagePack的使用案例:
这里我们把Json序列化也加入了进来,实际的看下MessagePack和JSON的区别:
代码语言:javascript复制 User user = new User(1, "张三", 24);
// 使用json来序列化
String userJson = JSON.toJSONString(user);
System.out.println("fastJson" userJson);
// 使用MessagePack来将对象进行序列化
MessagePack messagePack = new MessagePack();
byte[] write = messagePack.write(user);
Value read = messagePack.read(write);
System.out.println("MessagePack:" read);
测试结果为:
MessagePack和JSON序列的区别:
比较项 | MessagePack | JSON |
---|---|---|
示例 | [1,“张三”,24] | {“age”:24,“id”:1,“name”:“张三”} |
体积 | 体积小,是将对象的值存入到序列化数组中 | 体积稍大,以key-value的形式 |
灵活性 | 灵活性差,client与server中属性的顺序必须保持一致,否则会出错 | 灵活性较高 |
为什么要使用序列化
网络传输采用的是流传输,在Nio和Netty中都采用的是Buffer来进行传输。所以,对象是不能直接进行传输的。需要序列化成字节或者buffer才能继续进行传输
使用MessagePack进行对象实例
添加pom依赖
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>netty-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.12.Final</version>
</dependency>
<!-- 序列化组件,更加小巧 -->
<!-- https://mvnrepository.com/artifact/org.msgpack/msgpack -->
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
<!-- 序列化组件:fastJson -->
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
</project>
添加User实体类
要使用MessagePack进行序列化,需要在实体类上加入@Message注解
代码语言:javascript复制package com.itmayiedu.entity;
import org.msgpack.annotation.Message;
@Message
public class User {
private Integer id;
private String name;
private Integer age;
public User() {
}
public User(Integer id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User{"
"id=" id
", name='" name '''
", age='" age '''
'}';
}
}
创建自定义编码器MessagePackEncoder
代码语言:javascript复制package com.itmayiedu.code;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.msgpack.MessagePack;
/**
* 自定义MessagePack编码器
*/
public class MessagePackEncoder extends MessageToByteEncoder {
protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, ByteBuf byteBuf) throws Exception {
// messagePack序列化
MessagePack messagePack = new MessagePack();
// 采用byteBuf传输
byteBuf.writeBytes(messagePack.write(msg));
}
}
创建自定义解码器MessagePackDecoder
代码语言:javascript复制package com.itmayiedu.code;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.msgpack.MessagePack;
import java.util.List;
/**
* MessagePackDecoder编码器
*/
public class MessagePackDecoder extends MessageToMessageDecoder<ByteBuf> {
// 解码器
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {
// 定义数组
final byte[] array;
// 获取读取长度
final int length = byteBuf.readableBytes();
array = new byte[length];
byteBuf.getBytes(byteBuf.readerIndex(), array, 0, length);
//mp的read方法将其反序列化为object对象
MessagePack mp = new MessagePack();
list.add(mp.read(array));
}
}
创建自定义解码器NettyServer
代码语言:javascript复制package com.itmayiedu.demo;
import com.itmayiedu.code.MessagePackDecoder;
import com.itmayiedu.code.MessagePackEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* 搭建NettyServer
*
*/
public class NettyServerDemo {
static final int port = 8080;
public static void main(String[] args) {
// 1. 创建两个线程池,分别作为boss线程池和work线程池
// boss线程池
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
// work线程池
NioEventLoopGroup workGroup = new NioEventLoopGroup();
// 创建NettyServer
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 将线程池加入到serverBootStrap中 设置为服务端: NioServerSocketChannel.class
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
// 添加消息处理
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MessagePackEncoder());
ch.pipeline().addLast(new MessagePackDecoder());
ch.pipeline().addLast(new ServerHandler());
}
});
try{
ChannelFuture channelFuture = serverBootstrap.bind(port);
System.out.println("服务端已经启动,port: " port);
channelFuture.channel().closeFuture().sync();
}catch(Exception e) {
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
创建ServerHandler
代码语言:javascript复制package com.itmayiedu.demo;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ServerHandler extends SimpleChannelInboundHandler<Object> {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
// 读取消息的方法
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务端接收到消息:" msg);
}
}
搭建NettyClient
代码语言:javascript复制package com.itmayiedu.demo;
import com.itmayiedu.code.MessagePackDecoder;
import com.itmayiedu.code.MessagePackEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.io.IOException;
import java.net.InetSocketAddress;
public class NettyClientDemo {
public static void main(String[] args) throws IOException {
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress("127.0.0.1", 8080))
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MessagePackEncoder());
ch.pipeline().addLast(new MessagePackDecoder());
ch.pipeline().addLast(new ClientHandler());
}
});
try{
ChannelFuture sync = bootstrap.connect().sync();
sync.channel().closeFuture().sync();
}catch (Exception e) {
}finally {
group.shutdownGracefully();
}
}
}
创建ClientHandler
代码语言:javascript复制package com.itmayiedu.demo;
import com.itmayiedu.entity.User;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.msgpack.MessagePack;
public class ClientHandler extends SimpleChannelInboundHandler<Object> {
/**
* 发送消息的方法
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
User user = new User(1, "张三", 25);
ctx.writeAndFlush(user);
}
/**
* 获取消息的方法
* @param ctx
* @param msg
* @throws Exception
*/
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("客户端接收到消息: " msg);
}
}