Druid源码阅读(二):Druid Segment存储格式

2020-05-10 11:43:56 浏览数 (1)

一、Druid Segment介绍

Druid流数据摄入后会以Index形式保存在内存中,同时会定期将Index序列化成Segment文件持久化到可靠存储中(如HDFS),批数据摄入会直接通过离线任务生成Segment存储,供服务加载使用。本节先对照Druid官方文档中对Segment的描述[1],介绍下Druid Segment,然后在下一节以一个测试Segment为例,并结合Druid源码,详细说明Druid是如何存储数据的。

1. Segment核心数据结构

图一 Druid数据模型[1]图一 Druid数据模型[1]

Druid是一个列式存储的数据库,每一列数据会单独保存并管理,在查询时只会计算相关列的数据。Druid中每一行数据包含3部分:时间戳维度指标。时间戳就是该条数据产生或保存的时间,主要作用有划分Segment、按照某种时间粒度预聚合数据等。指标就是一些整数或浮点数数值(也可能是复杂数据结构如hyperUnique),用来在查询时聚合得到结果。维度相对复杂一些,因为在查询时要支持filter、group by等操作,每一列维度值会保存3个数据结构来记录:

  1. 取值字典:将每个值映射为一个ID;
  2. 每一行取值的ID;
  3. 每个取值对应的Bitmap,在Segment存储中会以concise或roaring的方式压缩。

以一个例子说明,假设某一份数据有10行,在某一列的取值分别为["A", "A", "B", "C", "B", "C", "D", "C", "D", "D"],其对应的存储情况如下:

1. 取值字典:

代码语言:json复制
"A": 0, 
"B": 1, 
"C": 2, 
"D": 3,

2. 每行取值ID:

代码语言:javascript复制
[0, 0, 1, 2, 1, 2, 3, 2, 3, 3]

3. Bitmap:

代码语言:javascript复制
0: [1, 1, 0, 0, 0, 0, 0, 0, 0, 0],
1: [0, 0, 1, 0, 1, 0, 0, 0, 0, 0],
2: [0, 0, 0, 1, 0, 1, 0, 1, 0, 0],
3: [0, 0, 0, 0, 0, 0, 1, 0, 1, 1]

可以看出,对于单值维度,每行只有一个取值,因此对应的Bitmap中每行对应只有一个1,但对于多值维度,每行就可能有多个1。

2. Segment存储

Segment包括一个index.zip文件和一个descriptor.json文件。descriptor.json中记录了Segment ID、index文件地址、维度列、指标列信息,Druid Router中看到的Segment详细信息就是来自这个文件。

index.zip是Segment数据的明细存储,文件解压后,会有version.bin、meta.smoosh、00000.smoosh和factory.json 4个文件,在第二节中就会具体说明这些文件的功能。

Druid使用Segment时会以多级缓存的方式使用,除了可靠存储中有Segment,Druid还会在内存中以及服务机器硬盘上加载Segment作为缓存。

二、Druid Segment文件解析

本节会以测试的一个Segment文件为例,具体解析Druid Segment文件中的信息。这个Segment是开启了rollup的,预聚合后有60(0x3c)行数据,不包括"__time"列共有23列,其中有12列维度,11列指标,指标有longSum和thetaSketch(基数计算)两种聚合方式。

1. version.bin

version.bin文件中保存的是Segment的索引格式版本,目前都是V9版本,其余版本都已经被标记为Legacy。对于V9版本索引,其内容是4字节0x00 0x00 0x00 0x09。

读取Segment的第一步就是检查这个version.bin文件,以决定用何种方式读取Segment文件。为了兼容老版本的Segment存储方式,若version.bin不存在,Druid还会去尝试读取index.drd文件。

代码语言:java复制
public static int getVersionFromDir(File inDir) throws IOException
{
    File versionFile = new File(inDir, "version.bin");
    if (versionFile.exists()) {
      return Ints.fromByteArray(Files.toByteArray(versionFile));
    }

    final File indexFile = new File(inDir, "index.drd");
    int version;
    if (indexFile.exists()) {
      try (InputStream in = new FileInputStream(indexFile)) {
        version = in.read();
      }
      return version;
    }

    throw new IOE("Invalid segment dir [%s]. Can't find either of version.bin or index.drd.", inDir);
}

2. meta.smoosh

meta.smoosh中保存着Segment元信息。下面是样例Segment的meta.smoosh文件。第一行有3个字段,第一个字段是"v1",这是一个固定字符串,Druid在读取meta.smoosh时会校验这个字符串;第三个字段表示xxxxx.smoosh文件个数,这里值为1,表示只有一个00000.smoosh文件。下面每一行均有4个字段,分别为列名、该列数据所在的.smoosh文件ID、该列数据在.smoosh文件中起始字节数、该列数据在.smoosh文件中终止字节数。例如第二行表示"__time"列的数据位于00000.smoosh文件中的[0, 257)字节范围内,第三行表示"after_talk_indb_user_count"列的数据位于00000.smoosh文件中的[257, 854)字节范围内。所有列的数据拼接起来组成了00000.smoosh文件,最大的终止字节数也就对应着00000.smoosh文件的大小。

meta.smoosh文件中有两行比较特别:index.drdmetadata.drd。这两行并不是列的数据,而是Segment对应的描述信息。在介绍00000.smoosh文件时会详细介绍这两部分。

代码语言:json复制
v1,2147483647,1
__time,0,0,257
after_talk_indb_user_count,0,257,854
before_click_not_indb_user_count,0,854,2603
click_wpa_count,0,2603,2833
click_wpa_user_count,0,2833,4582
corpuin,0,11938,12195
count,0,4582,4812
device_type,0,12443,12691
exit_url_count,0,4812,5042
exit_user_count,0,5042,6599
host,0,12195,12443
index.drd,0,15490,16179
is_new,0,12691,12939
metadata.drd,0,16179,18017
page_viewtime,0,6599,6829
pv,0,6829,7059
source_detail,0,15169,15490
source_type,0,14870,15169
url,0,14374,14622
url_title,0,14622,14870
uv,0,7059,9688
valid_conv_user_count,0,9688,10621
valid_recep_user_count,0,10621,11938
wpa_cate,0,14126,14374
wpa_id,0,12939,13878
wpa_type,0,13878,14126

下面这段代码的功能就是加载meta.smoosh文件,从代码中也可以看出,Druid将meta.smoosh中的信息保存为一个Map<String, Metadata>,Key为列名,Value为Metadata(文件ID、起始字节数、终止字节数)。这段代码中还有一个有趣的事情,作者将这个meta信息的Map命名为internalFiles,这是因为原本每列数据都是独立存储的。但如果每列数据都单独保存为一个文件的话,就会产生很多小文件,严重影响文件系统的读写性能。因此Druid将每个字段的数据生成好之后,会以二进制文件拼接的方式,将多个小文件拼接成一个.smoosh文件以提升性能。在持久化的Segment中数据都保存在00000.smoosh文件中,但在逻辑上,不同列的数据存储是分开的。

代码语言:java复制
public static SmooshedFileMapper load(File baseDir) throws IOException
{
    File metaFile = FileSmoosher.metaFile(baseDir);

    BufferedReader in = null;
    try {
      in = new BufferedReader(new InputStreamReader(new FileInputStream(metaFile), StandardCharsets.UTF_8));

      String line = in.readLine();
      if (line == null) {
        throw new ISE("First line should be version,maxChunkSize,numChunks, got null.");
      }

      String[] splits = line.split(",");
      if (!"v1".equals(splits[0])) {
        throw new ISE("Unknown version[%s], v1 is all I know.", splits[0]);
      }
      if (splits.length != 3) {
        throw new ISE("Wrong number of splits[%d] in line[%s]", splits.length, line);
      }
      final Integer numFiles = Integer.valueOf(splits[2]);
      List<File> outFiles = Lists.newArrayListWithExpectedSize(numFiles);

      for (int i = 0; i < numFiles;   i) {
        outFiles.add(FileSmoosher.makeChunkFile(baseDir, i));
      }

      Map<String, Metadata> internalFiles = new TreeMap<>();
      while ((line = in.readLine()) != null) {
        splits = line.split(",");

        if (splits.length != 4) {
          throw new ISE("Wrong number of splits[%d] in line[%s]", splits.length, line);
        }
        internalFiles.put(
            splits[0],
            new Metadata(Integer.parseInt(splits[1]), Integer.parseInt(splits[2]), Integer.parseInt(splits[3]))
        );
      }

      return new SmooshedFileMapper(outFiles, internalFiles);
    }
    finally {
      Closeables.close(in, false);
    }
}

3. xxxxx.smoosh

xxxxx.smoosh文件主要包括metadata.drd、index.drd、列数据三部分,下面就分别介绍这三部分数据。

  • metadata.drd

metadata.drd是一个如下所示的json,包含了指标列的聚合方式、时间维度格式、时间粒度是否开启rollup等信息。开启rollup表示生成Segment时会对数据进行预聚合,开启rollup可以大幅减少数据存储量,提升查询效率。关闭rollup则Segment中会保留写入的原始数据。

代码语言:json复制
{
  "container": {},
  "aggregators": [
    {
      "type": "thetaSketch",
      "name": "after_talk_indb_user_count",
      "fieldName": "after_talk_indb_user_count",
      "size": 1024,
      "shouldFinalize": true,
      "isInputThetaSketch": false,
      "errorBoundsStdDev": null
    },
    {
      "type": "longSum",
      "name": "click_wpa_count",
      "fieldName": "click_wpa_count",
      "expression": null
    },
    {
      "type": "thetaSketch",
      "name": "click_wpa_user_count",
      "fieldName": "click_wpa_user_count",
      "size": 16384,
      "shouldFinalize": true,
      "isInputThetaSketch": false,
      "errorBoundsStdDev": null
    },
    ......
  ],
  "timestampSpec": {
    "column": "timestamp",
    "format": "auto",
    "missingValue": null
  },
  "queryGranularity": {
    "type": "period",
    "period": "PT1H",
    "timeZone": "Asia/Shanghai",
    "origin": null
  },
  "rollup": true
}
  • index.drd

index.drd包含所有列名(GenericIndex结构)、所有维度名(GenericIndex结构)、Segment开始时间(long 8 bytes)、Segment结束时间(long 8 bytes)和Bitmap类型({"type": "concise"} or {"type": "roaring"})。下面这段代码的功能就是解析index.drd这部分数据。

代码语言:javascript复制
ByteBuffer indexBuffer = smooshedFiles.mapFile("index.drd");
/**
* Index.drd should consist of the segment version, the columns and dimensions of the segment as generic
* indexes, the interval start and end millis as longs (in 16 bytes), and a bitmap index type.
*/
final GenericIndexed<String> cols = GenericIndexed.read(
    indexBuffer,
    GenericIndexed.STRING_STRATEGY,
    smooshedFiles
);
final GenericIndexed<String> dims = GenericIndexed.read(
    indexBuffer,
    GenericIndexed.STRING_STRATEGY,
    smooshedFiles
);
final Interval dataInterval = Intervals.utc(indexBuffer.getLong(), indexBuffer.getLong());
final BitmapSerdeFactory segmentBitmapSerdeFactory;

/**
* This is a workaround for the fact that in v8 segments, we have no information about the type of bitmap
* index to use. Since we cannot very cleanly build v9 segments directly, we are using a workaround where
* this information is appended to the end of index.drd.
*/
if (indexBuffer.hasRemaining()) {
  segmentBitmapSerdeFactory = mapper.readValue(SERIALIZER_UTILS.readString(indexBuffer), BitmapSerdeFactory.class);
} else {
  segmentBitmapSerdeFactory = new BitmapSerde.LegacyBitmapSerdeFactory();
}

这段代码中出现了GenericIndex这个数据结构,Segment中很多字段的保存都使用到这个数据结构,不妨在这里详细介绍下。如下图二和图三分别是GenericIndex的两个不同版本的二进制格式。可以看出GenericIndex实际上保存的是一个元素列表,元素的具体内容是二进制序列化后的byte数组。图三中V2格式的GenericIndex只是展示了头部,元素的具体值会根据numOfElementsPerValueFile和numElements去对应的文件块中读取。

V2版本的GenericIndex适用于元素个数较多的情况,由于示例的Segment比较小(只有60行),其中使用的都是V1版本的GenericIndex。

图二 GenericIndex V1格式图二 GenericIndex V1格式
图三 GenericIndex V2格式图三 GenericIndex V2格式
  • Columns

Column的数据开始是一个json格式的ColumnDescriptor,描述该列的类型,这里类型有complex、float、long、double、stringDictionary、floatV2、doubleV2、longV2。其中维度列使用stringDictionary,指标列使用其他类型。complex类型是指保存的字符数组表示复杂数据结构(由typeName决定)序列化后的二进制buffer而非数值,如thetaSketch、hyperUnique等类型。

代码语言:json复制
{
  "valueType": "COMPLEX",
  "hasMultipleValues": false,
  "parts": [
    {
      "type": "complex",
      "typeName": "thetaSketch"
    }
  ]
}

下面以几种常用数据类型为例,分析下其中都保存了哪些信息。

1. stringDictionary类型

Druid使用stringDictionary类型来保存维度列,在ColumnDescriptor之后,共有3部分数据,分别对应第一节中维度列保存的3个数据结构:取值字典每行取值ID每个取值对应的Bitmap

图四展示了某维度列在Segment中的二进制存储。其中红色框中的数据为该列的取值字典(GenericIndex结构),可以看到该列有26(0x1a)个取值分别为["199", "206", "220"......],它们分别映射为ID 0-25,注意这里所有维度的取值均为字符串类型;蓝色框中的数据为每行取值ID(GenericIndex结构),其中只有一个元素,该元素中用一个压缩后的数据结构来保存每行取值ID,由于ID值为0-25,每个ID需要8字节(1 byte)表示(这里的bit数只有[1, 2, 4, 8, 16, 32, 64]这几种,会选取最小可用的bit数),即这里是使用了60byte表示60行对应的ID(0x00 0x01 0x03 0x05 0x06 0x09 ...... 0x0e 0x18),可以看出第1个byte、第17个byte和第36个byte为0x00,对应取值为"199";绿色框中的数据为每个取值对应的Bitmap(GenericIndex结构),可以看到共有26(0x1a)个Bitmap,其中第一个维度值"199"对应的Bitmap为concise Bitmap小端序的0x80010001 80000010,由concise Bitmap的解码方式[2]可以解出如下的Bitmap,可以看出第1行、第17行和第36行取值为1,和每行取值ID的数据是一致的。

代码语言:javascript复制
000 0000 0000 0000
0000 0000 0001 0000
000 0000 0000 0001
0000 0000 0000 0001 
图四 stringDictionary类型数据示例图四 stringDictionary类型数据示例

这里Bitmap是有concise和roaring两种压缩方式的,因为concise Bitmap压缩解压方式比较简单,所以以concise Bitmap为例说明。但官方文档给出的建议是,尽量使用roaring Bitmap,一般情况下roaring Bitmap都会比concise Bitmap有更好的压缩比。有兴趣的同学也可以再深入了解下这两种Bitmap压缩方式,这里就不展开了。

2. 数值类型(long、float、double)

数值类型是Druid存储指标列最常用的类型,可以用来计数、求和、求最大最小值等统计信息。这里以long类型为例说明数值类型是如何存储的。long类型数值存储有3种编码方式:DELTA format保存所有数值中最小的值,以及每个数值相对于最小值的差值,这样相比于直接保存所有数值可以减少存储空间(因为差值可能不需要8 byte保存),这种方式适用于数据值比较集中的情况;TABLE format将所有取值保存为一个列表,以及每行对应的取值ID,这种方式类似于stringDictionary的保存方式,只不过没有Bitmap的数据,这种方式适用于数值基数较小的情况下;LONGS format直接保存每一行的取值。数据保存时,会检查数据的最大值、最小值、基数等统计信息,然后用压缩比最大的编码方式保存。float和double没有多种编码方式,只会直接保存每一行的取值。

图五展示了Segment中某指标列的存储。该列使用Table format存储:红色框中的GenericIndex是取值列表,可以看出该列共有7个取值,分别为[2, 14, 4, 16, 6, 8, 18];绿色框中的GenericIndex表示每行对应的ID,由于只有7个取值,每行对应的取值ID只需要用4 bit表示(这里的bit数只有[1, 2, 4, 8, 16, 32, 64]这几种,会选取最小可用的bit数),因此只需要30byte表示60行数据中每行的取值[0, 0, 1, 2, 2, 3, 4, 0, ......., 2, 0, 2, 2, 0, 2, 2, 2]。

图五 long类型数据示例(Table format)图五 long类型数据示例(Table format)
  • complex类型

Druid使用Complex类型做一些基于概率的统计算法,比较常用的有hyperUnique、thetaSketch基数算法,算法的详细信息可以参考Druid官网的说明[3]。

图六 Complex数据类型图六 Complex数据类型

下面以thetaSketch为例,看看Druid Segment中是如何保存这种数据的。图七展示了Segment中某thetaSketch列的存储,可以看到ColumnDescriptor之后只有红色框中的一个GenericIndex结构,其中元素个数为60(0x3c),即该列的每一行都是一个Sketch序列化后的二进制数组,数组大小根据列数据的特性可能有所不同,对于示例的这一列,每个Sketch只占4字节大小。

图七 thetaSketch类型数据示例图七 thetaSketch类型数据示例

三、总结与展望

通过本文的介绍,相信读者对Druid Segment中存储的信息以及Druid支持的数据类型有所了解。目前Druid在实际应用中遇到的最大问题是缺少支持精确的基数计算的数据结构。关于这个问题,社区中有人提出了用Bitmap结构做精确去重[4],但目前应该还没有合入Druid项目主干。希望Druid社区能够早日解决这个问题吧。

参考文献

[1] https://druid.apache.org/docs/latest/design/segments.html

[2] bitmap压缩算法对比,https://bug1874.com/03-22-2017/bitmap-compress-algorithm.html

[3] https://druid.apache.org/docs/latest/querying/aggregations.html#approximate-aggregations

[4] Druid精确去重的设计与实现,https://zhuanlan.zhihu.com/p/60097605

0 人点赞