zookeeper源码分析(5)-序列化协议

2020-06-22 11:39:32 浏览数 (1)

在网络传输时,传输的是二进制数据,所以发送端需要将序列化对象转变为二进制数据,也就是序列化过程。接收端需要将二进制数据转化为序列化对象,也就是反序列化过程。在序列化和反序列化过程中,需要定义一种对数据相互转变的一致性协议,也就是序列化协议。zookeeper使用Jute作为序列化组件。首先看下Jute的使用:

代码语言:javascript复制
public class RequestHeader implements Record {
  private int xid;
  private int type;
  public RequestHeader() {
  }
  public RequestHeader(
        int xid,
        int type) {
    this.xid=xid;
    this.type=type;
  }
  public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(this,tag);
//先写入xid
    a_.writeInt(xid,"xid");
    a_.writeInt(type,"type");
    a_.endRecord(this,tag);
  }
  public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(tag);
//先读出xid
    xid=a_.readInt("xid");
    type=a_.readInt("type");
    a_.endRecord(tag);
}

//测试方法
public void testJute() throws IOException {
        //实现Record接口,自定义序列化
        RequestHeader requestHeader = new RequestHeader(1, ZooDefs.OpCode.create);
        System.out.print("requestHeader:  "  requestHeader );
        //序列化
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        BinaryOutputArchive binaryOutputArchive = BinaryOutputArchive.getArchive(outputStream);
        requestHeader.serialize(binaryOutputArchive,"header");
        //通常是TCP网络通信对象
        ByteBuffer bb = ByteBuffer.wrap(outputStream.toByteArray());
        //反序列化
        RequestHeader requestHeader1 = new RequestHeader();
        ByteBufferInputStream inputStream = new ByteBufferInputStream(bb);
        BinaryInputArchive binaryInputArchive =  BinaryInputArchive.getArchive(inputStream);
        requestHeader1.deserialize(binaryInputArchive,"header");
        System.out.print("requestHeader1:  "   requestHeader1);
        outputStream.close();
        inputStream.close();
      
    }

1.定义序列化对象RequestHeader,需要实现Record接口的serializedeserialize接口 2.构建序列化器BinaryOutputArchive,调用serialize方法将对象序列化流中 3.构建反序列化器BinaryInputArchive,调用deserialize方法将流反序列化为对象

从上面的使用我们可以看出,对RequestHeader对象的序列化 就是对其成员变量xid,type的按顺序的写入序列化器BinaryOutputArchive,反序列化就是从反序列化器BinaryInputArchive按顺序的读出xid,type。 所以序列化组件Jute的实现关键就是对序列化对象序列化器反序列化器的设计。 序列化对象 所有的序列化对象都要实现Record接口,它定义了serializedeserialize方法用于子类自己实现自己的序列化和反序列方式。

代码语言:javascript复制
public interface Record {
    public void serialize(OutputArchive archive, String tag)
        throws IOException;
    public void deserialize(InputArchive archive, String tag)
        throws IOException;
}

zookeeper的org.apache.zookeeper.proto包下定义了很多用于网络通信和数据存储所需要的序列化对象。 序列化器 在zookeeper中序列化就是将Record对象变为二进制数据的过程,序列化器接口为OutputArchive

代码语言:javascript复制
public interface OutputArchive {
    public void writeByte(byte b, String tag) throws IOException;
    public void writeBool(boolean b, String tag) throws IOException;
    public void writeInt(int i, String tag) throws IOException;
    public void writeLong(long l, String tag) throws IOException;
    public void writeFloat(float f, String tag) throws IOException;
    public void writeDouble(double d, String tag) throws IOException;
    public void writeString(String s, String tag) throws IOException;
    public void writeBuffer(byte buf[], String tag)
        throws IOException;
    public void writeRecord(Record r, String tag) throws IOException;
    public void startRecord(Record r, String tag) throws IOException;
    public void endRecord(Record r, String tag) throws IOException;
    public void startVector(List<?> v, String tag) throws IOException;
    public void endVector(List<?> v, String tag) throws IOException;
    public void startMap(TreeMap<?,?> v, String tag) throws IOException;
    public void endMap(TreeMap<?,?> v, String tag) throws IOException;

}

有三种实现:BinaryOutputArchive,CsvOutputArchive和XmlOutputArchive,分别对应无特殊格式,有csv格式和有xml格式的数据序列化。 BinaryOutputArchive

代码语言:javascript复制
public class BinaryOutputArchive implements OutputArchive {
    private ByteBuffer bb = ByteBuffer.allocate(1024);

    private DataOutput out;
    
    public static BinaryOutputArchive getArchive(OutputStream strm) {
        return new BinaryOutputArchive(new DataOutputStream(strm));
    }
    
    /** Creates a new instance of BinaryOutputArchive */
    public BinaryOutputArchive(DataOutput out) {
        this.out = out;
    }
    
    public void writeByte(byte b, String tag) throws IOException {
        out.writeByte(b);
    }
    
    public void writeBool(boolean b, String tag) throws IOException {
        out.writeBoolean(b);
    }
··········省略代码·····
}

可以看到BinaryOutputArchive其实是对DataOutput out的包装,从而实现了对各种数据类型的写入,tag并不会写入进二进制数据中,而对于CsvOutputArchive和XmlOutputArchive,如果想在二进制数据中保存对应格式,就需要tag控制,如XmlOutputArchive中

代码语言:javascript复制
public void writeBool(boolean b, String tag) throws IOException {
        printBeginEnvelope(tag);
        stream.print("<boolean>");
        stream.print(b ? "1" : "0");
        stream.print("</boolean>");
        printEndEnvelope(tag);
    }
private void printBeginEnvelope(String tag) {
        if (!compoundStack.empty()) {
            String s = compoundStack.peek();
            if ("struct".equals(s)) {
                putIndent();
                stream.print("<member>n");
                addIndent();
                putIndent();
                stream.print("<name>" tag "</name>n");
                putIndent();
                stream.print("<value>");
            } else if ("vector".equals(s)) {
                stream.print("<value>");
            } else if ("map".equals(s)) {
                stream.print("<value>");
            }
        } else {
            stream.print("<value>");
        }
    }

反序列化器 在zookeeper中反序列化就是将二进制数据变为Record对象的过程,反序列化器接口为InputArchive

代码语言:javascript复制
public interface InputArchive {
    public byte readByte(String tag) throws IOException;
    public boolean readBool(String tag) throws IOException;
    public int readInt(String tag) throws IOException;
    public long readLong(String tag) throws IOException;
    public float readFloat(String tag) throws IOException;
    public double readDouble(String tag) throws IOException;
    public String readString(String tag) throws IOException;
    public byte[] readBuffer(String tag) throws IOException;
    public void readRecord(Record r, String tag) throws IOException;
    public void startRecord(String tag) throws IOException;
    public void endRecord(String tag) throws IOException;
    public Index startVector(String tag) throws IOException;
    public void endVector(String tag) throws IOException;
    public Index startMap(String tag) throws IOException;
    public void endMap(String tag) throws IOException;
}

同样有三种实现:BinaryInputArchive,CsvInputArchive和XmlInputArchive,分别对应无特殊格式,有csv格式和有xml格式的数据序列化。

代码语言:javascript复制
public class BinaryInputArchive implements InputArchive {
    private DataInput in;
    
    static public BinaryInputArchive getArchive(InputStream strm) {
        return new BinaryInputArchive(new DataInputStream(strm));
    }
   
    /** Creates a new instance of BinaryInputArchive */
    public BinaryInputArchive(DataInput in) {
        this.in = in;
    }
    
    public byte readByte(String tag) throws IOException {
        return in.readByte();
    }
    
    public boolean readBool(String tag) throws IOException {
        return in.readBoolean();
    }
    }
··········省略代码·····
}

可以看到BinaryInputArchive其实是对DataInput in的包装,从而实现了对各种数据类型的读取,tag并不会写入进二进制数据中。

实际zookeeper的客户端在向服务端发送请求时,通信协议体如下:

len为请求数据的总长度,占4位。

请求头就是事例中的RequestHeader的xid和type。xid用于记录客户端请求发起的先后顺序,占4位。type代表请求的操作类型,占4位。这样子在服务端反序列化时,就可以根据type的值来选择对应的Record来读取请求体内容。

0 人点赞