Druid和ES查询结果通用解析方法

2022-11-30 17:10:42 浏览数 (1)

做数据的同学相信大家对Druid和Es都不陌生,Druid可以说是一款基于时序的查询引擎,支持数据实时摄入,在数据摄入前指定维度和指标,提供基于时间层面的预聚合,Druid会把一个数据点当做一个实际发生的事实,在数据摄入后就不能修改。常被应用于一些实时的场景,比如对数据实时分时间段分组聚合。ES同样是一款高效的查询引擎,支持数据的批量导入,同样支持数据实时的摄入,也支持数据批量导入,相比于Druid不仅对聚合高度支持,同时兼顾强大的搜索能力,ES主要是基于对摄入数据进行分词,同时构建索引增加查询聚合的速度。通常我一般将ES用作一些离线的场景,对离线场景支持指标的快速查询和聚合。

Druid实践

        Druid提供良好的Rest风格的访问方式,方便开发者快速上手,其提供的查询与聚合的方式多种多样,一般我们最常用的查询是select,聚合方式是groupBy,具体使用方式大家可以上网百度,这里主要介绍对于查询结果的解析。Druid返回的数据格式一般是一个JSON格式的数组,数组的每一个元素都是一个时间点的数据,如下图:

代码语言:javascript复制
[
    {
        "version":"v1",
        "timestamp":"2018-07-08T08:05:00.000Z",
        "event":{
            "dim1":"test1",
            "dim2":"test2",
            "sumMetric1":12345,
            "sumMetric2":14567
        }
    }
]

查询结果主要组成部分有两个,一个是timestamp,另一个是event,timestamp代表事实发生的时间,event主要包含聚合的维度和指标。如上图dim1和dim2是聚合的维度,metric是聚合的指标。显然druid的查询结果是平铺展示的,不论是普通的select还是groupby,但是这样的展示形式不适合于groupby的展示方式,比如dim1的组成值有“d11”和“d12”,而dim2的组成值有“d21”和“d22”,那么查询结果在同一个时间点有四条展示数据,[d11 d21,d11 d22,d12 d21,d12 d22] 这样显然不太便于查看,我们更希望的展示结果可能如下图,在查询结果中,按照聚合查询的结构展示,这样更方便看清数据,重点在于一个时间点,在数组中只有一个数据点。

代码语言:javascript复制
[
      {
          "timestamp":"2018-07-08T08:05:00.000Z",
          "event":{
              "d11":{
                 "d21":{
                    "metric":123
                 },
                 "d22":{
                    "metric":234
                 }
              },
              "d12":{
                 "d21":{
                    "metric":235
                 },
                 "d22":{
                    "metric":345
                 }
              }
          }
      }
]

为了将平铺的结果处理成为结果化的结果,需要在构建查询的时候,就把聚合方式记录下来,我以一个简单的例子来,解释聚合的存储,如下图一个简单的聚合,对dim1和dim2分组,组内进行metric1和metric2的SUM聚合,这样的聚合方式可以使用一个树来存储整个聚合方式,如图所示,顶层聚合是group by dim1,其子聚合是group by dim2,接下来的子聚合分别是sum metric1和sum metric2,所以聚合可以用一个类来表示--Aggregation,聚合类型(aggType)可以是group by ,sum,max,min,count,avg,聚合字段(aggField)为了表明在哪个字段上聚合的,同时为了支持给聚合起别名,增加alias字段,最终要的就是利用一个list存储子聚合。

代码语言:javascript复制
SELECT 
      SUM(metric1),
      SUM(metric2)
FROM test
GROUP BY
      dim1,dim2
代码语言:javascript复制
/**
**  表示聚合的类
**/
public class Aggregation{
 
     private String aggType;//聚合类型

     private String aggField;//聚合字段

     private String alias;//聚合名字

     private List<Aggregation> subAggs = new LinkedList<>();//子聚合

     public static Aggregation buildAggregation(String aggType,String aggField,String alias){
         return new Aggregation(aggType,aggField,alias);
     }

     private Aggregation(String aggType,String aggField,String alias){
          this.aggType = aggType;
          this.aggField = aggField;
          this.alias = alias;
     }

     public void subAggs(Aggregation ... subAggs){
         for(Aggregation aggregation:subAggs){
             this.subAggs.add(subAggs);
         }
     }

     setters/getters...
}

有了能够代表聚合的类,那么上图在查询的时候,构建聚合的代码可以像下面这样写(如果封装得好的话,这些代码可以埋在构建实际查询的代码中):

代码语言:javascript复制
Aggregation groupByDim1 = Aggregation.buildAggregation("groupBy","dim1","groupByDim1");
Aggregation groupByDim2 = Aggregation.buildAggregation("groupBy","dim2","groupByDim2");
Aggregation sumMetric1 = Aggregation.buildAggregation("sum","metric1","sumMetric1");
Aggregation sumMetric2 = Aggregation.buildAggregation("sum","metric2","sumMetric2");
groupByDim1.subAggs(groupByDim2);
groupByDim2.subAggs(sumMetric1,sumMetic2);

那么接下来就是查询结果的解析了,有了上面的构建的聚合,就方便对查询结果做解析了,下面我大概写个解析思路:

代码语言:javascript复制
public Map<String,JSONObject> parseDruidResult(Aggregation agg,List<JSONObject> searchResult){
     Map<String,JSONObject> tempResult = new TreeMap<>();
     for(JSONObject eventInfo:searchResult){
         String timestamp = eventInfo.getString("timestamp");
         JSONObject structedResult = null;   
         if(!tempResult.containsKey(timestamp)){
             structedResult = new JSONObject();     
             tempResult.put(timestamp,structedResult);
         }else{
             structedResult = tempResult.get(timestamp);
         }
         assembleResult(agg,structedResult,eventInfo.getJSONObject("event"));
     }
代码语言:javascript复制
   return tempResult;

}

代码语言:javascript复制
public void assembleResult(Aggregation aggregation,JSONObject structedResult,JSONObject event)){
    if(!isLeaf(aggregation.getAggType)){//isLeaf 判断是否是聚合的叶子节点,这里sum,count,max,min,avg没有子聚合肯定是叶子节点,其他都是非叶节点
        JSONObject subStructedResult = new JSONObject();
        structedResult.put(event.get(aggregation.getAggField()),subStructedResult);
        for(Aggregation subAgg:aggregation.getSubAggs){
           assembleResult(subAgg,subStructedResult,event);
        }
    }else{
        structedResult.put(aggregation.getAlias(),event.get(aggregation.getAlias()));
    }
}

其实解析的思路就是,根据树形的聚合结果来解析平铺的查询结果,以满足结构化查询的需求。以上代码值得说明的有两点,1,在Druid的查询结果中,维度是以field的名称放在event中,指标之一alias的名称放在event中,而对维度的聚合对应非叶节点的聚合,对指标的聚合对应叶节点的聚合,所以在代码中,对应取不同的数据。2,查询结果为了方便处理以map来存放解析结果的,key是timestamp,value是这个时间点的结构化结果,为了转换成为我们想要数组形式,可以遍历map,为了时间有序,可以用TreeMap存放中间结果。

ES实践

      ES对外也提供良好的RestApi查询方式,并且新版client不需要我们拼接json去查询或解析查询结果,可以使用java Api方便解析,这里我们就是使用新版的java Api来查询ES,ES相对于Druid,聚合结果不是平铺的,而是结果化的,但是这样的结构化结果,甚至比平铺的结果还复杂,需要我们通过java代码一层层解析出来。假如执行一个复杂的聚合,结果解析可能非常复杂,甚至难以排查出现的错误,举个例子,假如一个复杂的聚合(其实实际当中也不算复杂)如下图:

图中dim1和dim2为两个维度,dim2是比dim1更低的一个维度,分别对这两个维度进行sum,聚合两个指标metric1和metric2,这个例子很简单,比如dim1代表年,dim2代表月份,metric1是点击量,metric2是阅读量,那么上面的聚合树就代表,分别按照月份和年份统计点击量和阅读量。构建上面聚合树的代码可以是下面这样的:

代码语言:javascript复制
Aggregation sumMetric1 = Aggregation.buildAggregation("sum","metric1","sumMetric1");
Aggregation sumMetric2 = Aggregation.buildAggregation("sum","metric2","sumMetric2");
Aggregation groupByDim1 = Aggregation.buildAggregation("groupBy","dim1","groupByDim1");
Aggregation groupByDim2 = Aggregation.buildAggregation("groupBy","dim2","groupByDim2");
groupByDim1.subAggs(groupByDim2,sumMetric1,sumMetric2);
groupByDim2.subAggs(sumMetric1,sumMetric2);

通用的解析方法还要依赖于聚合树,我的思路如下:

代码语言:javascript复制
public void parseEsResult(Aggregation agg,Aggregations esOriginAggs,JSONObject result){
    if(isLeaf(agg.getAggType())){
        //叶节点的解析方式
        switch(agg.getAggType){
           case "sum":
              ParsedSum parsedSum = esOriginAggs.get(agg.getAlias());
              result.put(agg.getAlias(),parsedSum.getValue());
            break;
           case "count":
              ParsedValueCount parsedValueCount = esOriginAggs.get(agg.getAlias());
              result.put(parsedValueCount.getValue())
           case "max":
              ParsedMax parsedMax = esOriginAggs.get(agg.getAlias());
              result.put(agg.getAlias(),parseMax.getValue());
           ....//还有多种指标聚合方式,只写几个常用的,其他可以自己发挥
        }
    }else{
        //非叶节点的解析方式
        switch(agg.getAggType()){
           case "groupBy":
              //注意es中使用Terms来实现group by
              ParsedTerms parsedTerms = esOriginAggs.get(agg.getAlias());
              for(Bucket termsBucket:parsedTerms.getBuckets()){
                  JSONObject subResult = new JSONObject();
                  for(Aggregation subAgg:agg.getSubAggs()){
                     parseEsResult(subAgg,termsBucket.getAggregations(),subResult);
                  }
                  result.put(termsBucket.getKey(),subResult)
              }
           break;//还有多种解析方式,只写个常用的,其他可以自己发挥
        }
    }
}

这样的解析方式,假设dim1由d11,d12组成,dim2由d21,d22组成,那么上述通用代码的解析的结果如下,这样一套通用的代码可以防止重复实现解析es结果的代码,造成代码冗余。

代码语言:javascript复制
{
   "d11":{
      "d21":{
         "sumMetric1":1234,
         "sumMetric2":3456
      },
      "d22":{
         "sumMetric1":345,
         "sumMetric2":456
      },
      "sumMetric1":678,
      "sumMetric2":7890
    },
   "d12":{
      "d21":{
         "sumMetric1":1234,
         "sumMetric2":3456
      },
      "d22":{
         "sumMetric1":345,
         "sumMetric2":456
      },
      "sumMetric1":678,
      "sumMetric2":7890
    }
}

0 人点赞