flink实战-使用自定义聚合函数统计网站TP指标

2020-09-15 14:13:57 浏览数 (1)

  • 背景
  • 自定义聚合函数
  • 实例讲解

背景

在网站性能测试中,我们经常会选择 TP50、TP95 或者 TP99 等作为性能指标。接下来我们讲讲这些指标的含义、以及在flink中如何实时统计:

  • TP50,top percent 50,即 50% 的数据都满足某一条件;
  • TP95,top percent 95,即 95% 的数据都满足某一条件;
  • TP99,top percent 99,即 99% 的数据都满足某一条件;

我们举一个例子,我们要统计网站一分钟之内的的响应时间的TP90,正常的处理逻辑就是把这一分钟之内所有的网站的响应时间从小到大排序,然后计算出总条数count,然后计算出排名在90%的响应时间是多少(count*0.9),就是我们要的值。

自定义聚合函数

这个需求很明显就是一个使用聚合函数来做的案例,Flink中提供了大量的聚合函数,比如count,max,min等等,但是对于这个需求,却无法满足,所以我们需要自定义一个聚合函数来实现我们的需求。

在前段时间,我们聊了聊flink的聚合算子,具体可参考: flink实战-聊一聊flink中的聚合算子 , 聚合算子是我们在写代码的时候用来实现一个聚合功能,聚合函数其实和聚合算子类似,只不过聚合函数用于在写sql的时候使用。

自定义聚合函数需要继承抽象类org.apache.flink.table.functions.AggregateFunction。并实现下面几个方法。

  • createAccumulator():这个方法会在一次聚合操作的开始调用一次,主要用于构造一个Accumulator,用于存储在聚合过程中的临时对象。
  • accumulate() 这个方法,每来一条数据会调用一次这个方法,我们就在这个方法里实现我们的聚合函数的具体逻辑。
  • getValue() 这个方法是在聚合结束以后,对中间结果做处理,然后将结果返回,最终sql中得到的结果数据就是这个值。

实例讲解

对于TP指标,正常的思路我们可以先创建一个临时变量,里面有一个list,每来一个数据,就放到这个list里面,在getValue方法里,进行排序,取相应的TP值。

但是这种思路会有一个问题,就是如果要聚合的时间范围内,数据过多的话。就会在list存储大量的数据,会造成checkpoint过大,时间过长,最后导致程序失败。得不到正确的结果。

所以我们需要换一个思路,既然最后我们想要的是一个有序列表,那么我们是不是可以把这个list结构优化一下,使用Treemap来存储,map的key就是指标,比如响应时间。value就是对应的指标出现的次数。这样getValue方法里,只需要将map的value值累加,就能得到总数count,然后计算出来相应的tp值的位置position,最后我们再从头累加map的value,直到累加结果大于相应的位置position,则map的key即为所求。

示例如下:我们先构建一个source,只是随机生成一个变量,网站的相应时间response_time。

代码语言:javascript复制
	String sql = "CREATE TABLE source (n"  
		             " response_time INT,n"  
		             " ts AS localtimestamp,n"  
		             " WATERMARK FOR ts AS ts,"  
		             "proctime as proctime()n"  
		             ") WITH (n"  
		             " 'connector' = 'datagen',n"  
		             " 'rows-per-second'='1000',n"  
		             " 'fields.response_time.min'='1',n"  
		             " 'fields.response_time.max'='1000'"  
		             ")";

定义一个聚合函数用的临时变量:

代码语言:javascript复制
	public static class TPAccum{
		public Integer tp;
		public Map<Integer,Integer> map = new HashMap<>();
	}

实现自定义聚合函数类

代码语言:javascript复制
	public static class TP extends AggregateFunction<Integer,TPAccum>{

		@Override
		public TPAccum createAccumulator(){
			return new TPAccum();
		}

		@Override
		public Integer getValue(TPAccum acc){
			if (acc.map.size() == 0){
				return null;
			} else {
				Map<Integer,Integer> map = new TreeMap<>(acc.map);
				int sum = map.values().stream().reduce(0, Integer::sum);

				int tp = acc.tp;
				int responseTime = 0;
				int p = 0;
				Double d = sum * (tp / 100D);
				for (Map.Entry<Integer,Integer> entry: map.entrySet()){
					p  = entry.getValue();
					int position = d.intValue() - 1;
					if (p >= position){
						responseTime = entry.getKey();
						break;
					}

				}
				return responseTime;
			}
		}

		public void accumulate(TPAccum acc, Integer iValue, Integer tp){
			acc.tp = tp;
			if (acc.map.containsKey(iValue)){
				acc.map.put(iValue, acc.map.get(iValue)   1);
			} else {
				acc.map.put(iValue, 1);
			}
		}

	}

实际的查询sql如下:

代码语言:javascript复制
	String sqlSelect =
				"select TUMBLE_START(proctime,INTERVAL '1' SECOND)  as starttime,mytp(response_time,50) from source"  
				" group by TUMBLE(proctime,INTERVAL '1' SECOND)";

完整代码请参考: https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/function/UdafTP.java

0 人点赞