做数据的同学相信大家对Druid和Es都不陌生,Druid可以说是一款基于时序的查询引擎,支持数据实时摄入,在数据摄入前指定维度和指标,提供基于时间层面的预聚合,Druid会把一个数据点当做一个实际发生的事实,在数据摄入后就不能修改。常被应用于一些实时的场景,比如对数据实时分时间段分组聚合。ES同样是一款高效的查询引擎,支持数据的批量导入,同样支持数据实时的摄入,也支持数据批量导入,相比于Druid不仅对聚合高度支持,同时兼顾强大的搜索能力,ES主要是基于对摄入数据进行分词,同时构建索引增加查询聚合的速度。通常我一般将ES用作一些离线的场景,对离线场景支持指标的快速查询和聚合。
Druid实践
Druid提供良好的Rest风格的访问方式,方便开发者快速上手,其提供的查询与聚合的方式多种多样,一般我们最常用的查询是select,聚合方式是groupBy,具体使用方式大家可以上网百度,这里主要介绍对于查询结果的解析。Druid返回的数据格式一般是一个JSON格式的数组,数组的每一个元素都是一个时间点的数据,如下图:
代码语言:javascript复制[
{
"version":"v1",
"timestamp":"2018-07-08T08:05:00.000Z",
"event":{
"dim1":"test1",
"dim2":"test2",
"sumMetric1":12345,
"sumMetric2":14567
}
}
]
查询结果主要组成部分有两个,一个是timestamp,另一个是event,timestamp代表事实发生的时间,event主要包含聚合的维度和指标。如上图dim1和dim2是聚合的维度,metric是聚合的指标。显然druid的查询结果是平铺展示的,不论是普通的select还是groupby,但是这样的展示形式不适合于groupby的展示方式,比如dim1的组成值有“d11”和“d12”,而dim2的组成值有“d21”和“d22”,那么查询结果在同一个时间点有四条展示数据,[d11 d21,d11 d22,d12 d21,d12 d22] 这样显然不太便于查看,我们更希望的展示结果可能如下图,在查询结果中,按照聚合查询的结构展示,这样更方便看清数据,重点在于一个时间点,在数组中只有一个数据点。
代码语言:javascript复制[
{
"timestamp":"2018-07-08T08:05:00.000Z",
"event":{
"d11":{
"d21":{
"metric":123
},
"d22":{
"metric":234
}
},
"d12":{
"d21":{
"metric":235
},
"d22":{
"metric":345
}
}
}
}
]
为了将平铺的结果处理成为结果化的结果,需要在构建查询的时候,就把聚合方式记录下来,我以一个简单的例子来,解释聚合的存储,如下图一个简单的聚合,对dim1和dim2分组,组内进行metric1和metric2的SUM聚合,这样的聚合方式可以使用一个树来存储整个聚合方式,如图所示,顶层聚合是group by dim1,其子聚合是group by dim2,接下来的子聚合分别是sum metric1和sum metric2,所以聚合可以用一个类来表示--Aggregation,聚合类型(aggType)可以是group by ,sum,max,min,count,avg,聚合字段(aggField)为了表明在哪个字段上聚合的,同时为了支持给聚合起别名,增加alias字段,最终要的就是利用一个list存储子聚合。
代码语言:javascript复制SELECT
SUM(metric1),
SUM(metric2)
FROM test
GROUP BY
dim1,dim2
代码语言:javascript复制/**
** 表示聚合的类
**/
public class Aggregation{
private String aggType;//聚合类型
private String aggField;//聚合字段
private String alias;//聚合名字
private List<Aggregation> subAggs = new LinkedList<>();//子聚合
public static Aggregation buildAggregation(String aggType,String aggField,String alias){
return new Aggregation(aggType,aggField,alias);
}
private Aggregation(String aggType,String aggField,String alias){
this.aggType = aggType;
this.aggField = aggField;
this.alias = alias;
}
public void subAggs(Aggregation ... subAggs){
for(Aggregation aggregation:subAggs){
this.subAggs.add(subAggs);
}
}
setters/getters...
}
有了能够代表聚合的类,那么上图在查询的时候,构建聚合的代码可以像下面这样写(如果封装得好的话,这些代码可以埋在构建实际查询的代码中):
代码语言:javascript复制Aggregation groupByDim1 = Aggregation.buildAggregation("groupBy","dim1","groupByDim1");
Aggregation groupByDim2 = Aggregation.buildAggregation("groupBy","dim2","groupByDim2");
Aggregation sumMetric1 = Aggregation.buildAggregation("sum","metric1","sumMetric1");
Aggregation sumMetric2 = Aggregation.buildAggregation("sum","metric2","sumMetric2");
groupByDim1.subAggs(groupByDim2);
groupByDim2.subAggs(sumMetric1,sumMetic2);
那么接下来就是查询结果的解析了,有了上面的构建的聚合,就方便对查询结果做解析了,下面我大概写个解析思路:
代码语言:javascript复制public Map<String,JSONObject> parseDruidResult(Aggregation agg,List<JSONObject> searchResult){
Map<String,JSONObject> tempResult = new TreeMap<>();
for(JSONObject eventInfo:searchResult){
String timestamp = eventInfo.getString("timestamp");
JSONObject structedResult = null;
if(!tempResult.containsKey(timestamp)){
structedResult = new JSONObject();
tempResult.put(timestamp,structedResult);
}else{
structedResult = tempResult.get(timestamp);
}
assembleResult(agg,structedResult,eventInfo.getJSONObject("event"));
}
代码语言:javascript复制 return tempResult;
}
代码语言:javascript复制public void assembleResult(Aggregation aggregation,JSONObject structedResult,JSONObject event)){
if(!isLeaf(aggregation.getAggType)){//isLeaf 判断是否是聚合的叶子节点,这里sum,count,max,min,avg没有子聚合肯定是叶子节点,其他都是非叶节点
JSONObject subStructedResult = new JSONObject();
structedResult.put(event.get(aggregation.getAggField()),subStructedResult);
for(Aggregation subAgg:aggregation.getSubAggs){
assembleResult(subAgg,subStructedResult,event);
}
}else{
structedResult.put(aggregation.getAlias(),event.get(aggregation.getAlias()));
}
}
其实解析的思路就是,根据树形的聚合结果来解析平铺的查询结果,以满足结构化查询的需求。以上代码值得说明的有两点,1,在Druid的查询结果中,维度是以field的名称放在event中,指标之一alias的名称放在event中,而对维度的聚合对应非叶节点的聚合,对指标的聚合对应叶节点的聚合,所以在代码中,对应取不同的数据。2,查询结果为了方便处理以map来存放解析结果的,key是timestamp,value是这个时间点的结构化结果,为了转换成为我们想要数组形式,可以遍历map,为了时间有序,可以用TreeMap存放中间结果。
ES实践
ES对外也提供良好的RestApi查询方式,并且新版client不需要我们拼接json去查询或解析查询结果,可以使用java Api方便解析,这里我们就是使用新版的java Api来查询ES,ES相对于Druid,聚合结果不是平铺的,而是结果化的,但是这样的结构化结果,甚至比平铺的结果还复杂,需要我们通过java代码一层层解析出来。假如执行一个复杂的聚合,结果解析可能非常复杂,甚至难以排查出现的错误,举个例子,假如一个复杂的聚合(其实实际当中也不算复杂)如下图:
图中dim1和dim2为两个维度,dim2是比dim1更低的一个维度,分别对这两个维度进行sum,聚合两个指标metric1和metric2,这个例子很简单,比如dim1代表年,dim2代表月份,metric1是点击量,metric2是阅读量,那么上面的聚合树就代表,分别按照月份和年份统计点击量和阅读量。构建上面聚合树的代码可以是下面这样的:
代码语言:javascript复制Aggregation sumMetric1 = Aggregation.buildAggregation("sum","metric1","sumMetric1");
Aggregation sumMetric2 = Aggregation.buildAggregation("sum","metric2","sumMetric2");
Aggregation groupByDim1 = Aggregation.buildAggregation("groupBy","dim1","groupByDim1");
Aggregation groupByDim2 = Aggregation.buildAggregation("groupBy","dim2","groupByDim2");
groupByDim1.subAggs(groupByDim2,sumMetric1,sumMetric2);
groupByDim2.subAggs(sumMetric1,sumMetric2);
通用的解析方法还要依赖于聚合树,我的思路如下:
代码语言:javascript复制public void parseEsResult(Aggregation agg,Aggregations esOriginAggs,JSONObject result){
if(isLeaf(agg.getAggType())){
//叶节点的解析方式
switch(agg.getAggType){
case "sum":
ParsedSum parsedSum = esOriginAggs.get(agg.getAlias());
result.put(agg.getAlias(),parsedSum.getValue());
break;
case "count":
ParsedValueCount parsedValueCount = esOriginAggs.get(agg.getAlias());
result.put(parsedValueCount.getValue())
case "max":
ParsedMax parsedMax = esOriginAggs.get(agg.getAlias());
result.put(agg.getAlias(),parseMax.getValue());
....//还有多种指标聚合方式,只写几个常用的,其他可以自己发挥
}
}else{
//非叶节点的解析方式
switch(agg.getAggType()){
case "groupBy":
//注意es中使用Terms来实现group by
ParsedTerms parsedTerms = esOriginAggs.get(agg.getAlias());
for(Bucket termsBucket:parsedTerms.getBuckets()){
JSONObject subResult = new JSONObject();
for(Aggregation subAgg:agg.getSubAggs()){
parseEsResult(subAgg,termsBucket.getAggregations(),subResult);
}
result.put(termsBucket.getKey(),subResult)
}
break;//还有多种解析方式,只写个常用的,其他可以自己发挥
}
}
}
这样的解析方式,假设dim1由d11,d12组成,dim2由d21,d22组成,那么上述通用代码的解析的结果如下,这样一套通用的代码可以防止重复实现解析es结果的代码,造成代码冗余。
代码语言:javascript复制{
"d11":{
"d21":{
"sumMetric1":1234,
"sumMetric2":3456
},
"d22":{
"sumMetric1":345,
"sumMetric2":456
},
"sumMetric1":678,
"sumMetric2":7890
},
"d12":{
"d21":{
"sumMetric1":1234,
"sumMetric2":3456
},
"d22":{
"sumMetric1":345,
"sumMetric2":456
},
"sumMetric1":678,
"sumMetric2":7890
}
}