导语
在实时计算中,经常会遇到需要计算某个时间段内的pv、uv这类需求,完成该类需求有多种方式,本文以微视数据端内计算启动数据的pv、uv为应用场景,来介绍常用的两种实现方式。
业务背景:
为了实时监控微视端内app启动以及启动方式的情况,需要实时的统计每10分钟及每小时pv、uv。这里pv,每收到一条启动日志即 1,uv则需要依据启动的唯一标识qimei来做去重处理。
实现介绍:
实现pv、uv的统计主要微视数据尝试过两种方式,一是窗口方式:主要是使用flink window valueState,统计的结果可以直接输出;另外一种是使用redis,借用外部存储系统redis来完成,两种实现方式各有优劣吧。
窗口方式:使用窗口的方式,来计算pv、uv,即根据需求的时间段,来设定窗口的大小,例如需要计算10分钟内的pv、uv则需要开一个10分钟时长的统计窗口,对于pv不需要做去重处理,对于uv,需要借用flink自带的valueState来保存中间数据,同时需要借用set、hyperloglog或者bitmap(roaringbitmap)等数据结构来做去重。计算pv较简单,在这里不做介绍,例如下面使用hyperloglog来做去重,来计算uv,在maven中添加导入hyperloglog的依赖:
代码语言:javascript复制<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
<version>2.9.8</version>
</dependency>
程序执行主体:构造flink的执行环境、从kafka中读取数据,对数据流做map、aggregate等操作,将处理的数据写入到虫洞kafka中。
代码语言:javascript复制StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.getConfig().setAutoWatermarkInterval(5000L);
env.getConfig().setUseSnapshotCompression(true);
//kafka配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", KAFKA_BROKERS);
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "5000");
properties.setProperty("group.id", CONSUMER_GROUP);
properties.setProperty("client.id", CONSUMER_GROUP); //用于虫洞验证
FlinkKafkaConsumer011<AppActionModel> kafkaConsumer =
new FlinkKafkaConsumer011<>(KAFKA_TOPIC, new AppActionDeSerializer(), properties);
kafkaConsumer.setStartFromLatest();
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
DataStream<AppActionModel> dataStream = env.addSource(kafkaConsumer).setParallelism(80);
DataStream<StatPvUv> resultStream = dataStream.map(new Dau10minMapFunction())
.timeWindowAll(Time.minutes(10)).aggregate(new DauHllAggregateFuction());
//写kafka配置
Properties propsProducer = new Properties();
propsProducer.setProperty("bootstrap.servers", "9.144.224.168:9092");
propsProducer.put("serializer.class", "kafka.serializer.StringEncoder");
//propsProducer.setProperty("metadata.broker.list", "9.144.224.168:9092");
//propsProducer.setProperty("client.id", "wesee_core_kpi_10min"); //用于虫洞验证
FlinkKafkaProducer011 flinkKafkaProducer = new FlinkKafkaProducer011("wesee_core_kpi_10min",
new SimpleStringSchema(),propsProducer);
resultStream.map(new MapFunction<StatPvUv, String>() {
@Override
public String map(StatPvUv statPvUv) throws Exception {
String time = DateUtil.dateTo12String(new Date());
StringBuilder sb = new StringBuilder();
sb.append(time).append("|").append(statPvUv.getUv()).append("|")
.append(statPvUv.getPv());
return sb.toString();
}
}).addSink(flinkKafkaProducer);
env.execute("Dau10minUvPvToKafkaTopology");
Dau10minMapFuction主要是获取数据流中的qimei:
代码语言:javascript复制public class Dau10minMapFunction extends RichMapFunction<AppActionModel, String> {
@Override
public String map(AppActionModel value) throws Exception {
String qimei = value.getQimei();
return qimei;
}
}
DauHllAggregateFuction主要是借用HyperloglogPlus来做去重,该数据结构是一种模糊的去重,其原理可参见https://en.wikipedia.org/wiki/HyperLogLog,如果需要精确去重,可以将HyperloglogPlus结构替换为roaringbitmap等数据结构。采用HyperloglogPlus的优势在于,该数据结构占用内存空间减少。
代码语言:javascript复制public class DauHllAggregateFuction implements AggregateFunction<String, AccHllBean, StatPvUv> {
@Override
public AccHllBean createAccumulator() {
return new AccHllBean(0, new HyperLogLogPlus(5,18));
}
@Override
public AccHllBean add(String s, AccHllBean accBean) {
HyperLogLogPlus hll=accBean.getSets();
hll.offer(s);
return new AccHllBean(accBean.getPv() 1, hll);
}
@Override
public StatPvUv getResult(AccHllBean accr) {
long pv = accr.getPv();
long uv = accr.getSets().cardinality();
return new StatPvUv(pv, uv);
}
@Override
public AccHllBean merge(AccHllBean acc1, AccHllBean acc2) {
HyperLogLogPlus tmp = new HyperLogLogPlus(5,18);
long resutl = acc1.getPv() acc2.getPv();
try {
tmp.addAll(acc1.getSets());
tmp.addAll(acc2.getSets());
}catch (Exception e){
System.out.println("hll merger erro");
}
return new AccHllBean(resutl, tmp);
}
}
AccHllBean是封装的javaBean:
代码语言:javascript复制public class AccHllBean implements Serializable {
private long pv;
private HyperLogLogPlus sets;
public long getPv() {
return pv;
}
public void setPv(long pv) {
this.pv = pv;
}
public HyperLogLogPlus getSets() {
return sets;
}
public void setSets(HyperLogLogPlus sets) {
this.sets = sets;
}
public AccHllBean(long pv, HyperLogLogPlus sets) {
this.pv = pv;
this.sets = sets;
}
}
对于数据量比较少的需求,去重结构还可以使用set集合。
借用redis:使用redis方式来计算某时间段的pv、uv,如果是需要计算任意时间段内,可以使用redis的zset结构或者是通过hash分片,都是把统计的时间窗口放在redis的key上,计算uv,可以借用redis提供的bitmap或者hyperloglog来完成。实现如下:构造flink运行环境,对数据流做简单的清洗(构造写入redis的key和value)
代码语言:javascript复制StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60 * 1000 * 2);
//kafka配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", KAFKA_BROKERS);
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "5000");
properties.setProperty("group.id", CONSUMER_GROUP);
properties.setProperty("client.id", CONSUMER_GROUP); //用于虫洞验证
FlinkKafkaConsumer011<AppActionModel> kafkaConsumer =
new FlinkKafkaConsumer011<>(KAFKA_TOPIC, new AppActionDauPvUvDeSerializer(), properties);
kafkaConsumer.setStartFromLatest();
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
// 使用日志里面的 reportTime作为 eventTime
DataStream<AppActionModel> dataStream = env.addSource(kafkaConsumer);
dataStream.addSink(new SinkDau10minAnd1hourPvUvToRedis());
env.execute("Dau10minUvPvToRedis");
如果计算的pv、uv需要以接口方式提供或者写入到kafka,增需要再写一个程序 ,定时读取redis。
两种方式对比:采用窗口的方式来计算pv、uv,代码实现起来更复杂一下,可以直接将统计的结果写入到kafka中,并且不需要额外的存储资源。借用redis来计算pv、uv,代码实现较简单,统计的数据,可以按照实际需要直接保存在redis中,由于构造存储统计数据的key是按照日志上报的时间,该方式具有更长的延迟数据处理能力。
转自:merlingyu(余杨),研发工程师