Oceanus 在腾讯微视数据的实践-统计某时间段内的uv、pv

2021-11-02 12:21:32 浏览数 (1)

导语

在实时计算中,经常会遇到需要计算某个时间段内的pv、uv这类需求,完成该类需求有多种方式,本文以微视数据端内计算启动数据的pv、uv为应用场景,来介绍常用的两种实现方式。

业务背景

为了实时监控微视端内app启动以及启动方式的情况,需要实时的统计每10分钟及每小时pv、uv。这里pv,每收到一条启动日志即 1,uv则需要依据启动的唯一标识qimei来做去重处理。

实现介绍

实现pv、uv的统计主要微视数据尝试过两种方式,一是窗口方式:主要是使用flink window valueState,统计的结果可以直接输出;另外一种是使用redis,借用外部存储系统redis来完成,两种实现方式各有优劣吧。

窗口方式:使用窗口的方式,来计算pv、uv,即根据需求的时间段,来设定窗口的大小,例如需要计算10分钟内的pv、uv则需要开一个10分钟时长的统计窗口,对于pv不需要做去重处理,对于uv,需要借用flink自带的valueState来保存中间数据,同时需要借用set、hyperloglog或者bitmap(roaringbitmap)等数据结构来做去重。计算pv较简单,在这里不做介绍,例如下面使用hyperloglog来做去重,来计算uv,在maven中添加导入hyperloglog的依赖:

代码语言:javascript复制
<dependency>
      <groupId>com.clearspring.analytics</groupId>
      <artifactId>stream</artifactId>
      <version>2.9.8</version>
</dependency>

 程序执行主体:构造flink的执行环境、从kafka中读取数据,对数据流做map、aggregate等操作,将处理的数据写入到虫洞kafka中。

代码语言:javascript复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    env.getConfig().setAutoWatermarkInterval(5000L);
    env.getConfig().setUseSnapshotCompression(true);

    //kafka配置
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", KAFKA_BROKERS);
    properties.setProperty("enable.auto.commit", "true");
    properties.setProperty("auto.commit.interval.ms", "5000");

    properties.setProperty("group.id", CONSUMER_GROUP);
    properties.setProperty("client.id", CONSUMER_GROUP);  //用于虫洞验证
    FlinkKafkaConsumer011<AppActionModel> kafkaConsumer =
        new FlinkKafkaConsumer011<>(KAFKA_TOPIC, new AppActionDeSerializer(), properties);
    kafkaConsumer.setStartFromLatest();
    kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
    DataStream<AppActionModel> dataStream = env.addSource(kafkaConsumer).setParallelism(80);

    DataStream<StatPvUv> resultStream = dataStream.map(new Dau10minMapFunction())
        .timeWindowAll(Time.minutes(10)).aggregate(new DauHllAggregateFuction());

    //写kafka配置
    Properties propsProducer = new Properties();
    propsProducer.setProperty("bootstrap.servers", "9.144.224.168:9092");
    propsProducer.put("serializer.class", "kafka.serializer.StringEncoder");
    //propsProducer.setProperty("metadata.broker.list", "9.144.224.168:9092");
    //propsProducer.setProperty("client.id", "wesee_core_kpi_10min");  //用于虫洞验证
    FlinkKafkaProducer011 flinkKafkaProducer = new FlinkKafkaProducer011("wesee_core_kpi_10min",
        new SimpleStringSchema(),propsProducer);

    resultStream.map(new MapFunction<StatPvUv, String>() {
      @Override
      public String map(StatPvUv statPvUv) throws Exception {

        String time = DateUtil.dateTo12String(new Date());
        StringBuilder sb = new StringBuilder();
        sb.append(time).append("|").append(statPvUv.getUv()).append("|")
            .append(statPvUv.getPv());
        return sb.toString();
      }
    }).addSink(flinkKafkaProducer);

    env.execute("Dau10minUvPvToKafkaTopology");

 Dau10minMapFuction主要是获取数据流中的qimei:

代码语言:javascript复制
public class Dau10minMapFunction extends RichMapFunction<AppActionModel, String> {
  @Override
  public String map(AppActionModel value) throws Exception {
    String qimei = value.getQimei();
    return qimei;
  }
}

DauHllAggregateFuction主要是借用HyperloglogPlus来做去重,该数据结构是一种模糊的去重,其原理可参见https://en.wikipedia.org/wiki/HyperLogLog,如果需要精确去重,可以将HyperloglogPlus结构替换为roaringbitmap等数据结构。采用HyperloglogPlus的优势在于,该数据结构占用内存空间减少。

代码语言:javascript复制
public class DauHllAggregateFuction implements AggregateFunction<String, AccHllBean, StatPvUv> {
  @Override
  public AccHllBean createAccumulator() {
    return new AccHllBean(0, new HyperLogLogPlus(5,18));
  }
  @Override
  public AccHllBean add(String s, AccHllBean accBean) {

    HyperLogLogPlus hll=accBean.getSets();
    hll.offer(s);
    return new AccHllBean(accBean.getPv()   1, hll);
  }
  @Override
  public StatPvUv getResult(AccHllBean accr) {
    long pv = accr.getPv();
    long uv = accr.getSets().cardinality();
    return new StatPvUv(pv, uv);
  }
  @Override
  public AccHllBean merge(AccHllBean acc1, AccHllBean acc2) {
    HyperLogLogPlus tmp = new HyperLogLogPlus(5,18);
    long resutl = acc1.getPv()   acc2.getPv();
    try {
      tmp.addAll(acc1.getSets());
      tmp.addAll(acc2.getSets());
    }catch (Exception e){
      System.out.println("hll merger erro");
    }
    return new AccHllBean(resutl, tmp);
  }
}

 AccHllBean是封装的javaBean:

代码语言:javascript复制
public class AccHllBean implements Serializable {

  private long pv;

  private HyperLogLogPlus sets;

  public long getPv() {
    return pv;
  }

  public void setPv(long pv) {
    this.pv = pv;
  }

  public HyperLogLogPlus getSets() {
    return sets;
  }

  public void setSets(HyperLogLogPlus sets) {
    this.sets = sets;
  }

  public AccHllBean(long pv, HyperLogLogPlus sets) {
    this.pv = pv;
    this.sets = sets;
  }
}

 对于数据量比较少的需求,去重结构还可以使用set集合。

借用redis:使用redis方式来计算某时间段的pv、uv,如果是需要计算任意时间段内,可以使用redis的zset结构或者是通过hash分片,都是把统计的时间窗口放在redis的key上,计算uv,可以借用redis提供的bitmap或者hyperloglog来完成。实现如下:构造flink运行环境,对数据流做简单的清洗(构造写入redis的key和value)

代码语言:javascript复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(60 * 1000 * 2);
    //kafka配置
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", KAFKA_BROKERS);
    properties.setProperty("enable.auto.commit", "true");
    properties.setProperty("auto.commit.interval.ms", "5000");

    properties.setProperty("group.id", CONSUMER_GROUP);
    properties.setProperty("client.id", CONSUMER_GROUP);  //用于虫洞验证
    FlinkKafkaConsumer011<AppActionModel> kafkaConsumer =
        new FlinkKafkaConsumer011<>(KAFKA_TOPIC, new AppActionDauPvUvDeSerializer(), properties);
    kafkaConsumer.setStartFromLatest();
    kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
    // 使用日志里面的 reportTime作为 eventTime
    DataStream<AppActionModel> dataStream = env.addSource(kafkaConsumer);
    dataStream.addSink(new SinkDau10minAnd1hourPvUvToRedis());
    env.execute("Dau10minUvPvToRedis");

如果计算的pv、uv需要以接口方式提供或者写入到kafka,增需要再写一个程序 ,定时读取redis。

两种方式对比:采用窗口的方式来计算pv、uv,代码实现起来更复杂一下,可以直接将统计的结果写入到kafka中,并且不需要额外的存储资源。借用redis来计算pv、uv,代码实现较简单,统计的数据,可以按照实际需要直接保存在redis中,由于构造存储统计数据的key是按照日志上报的时间,该方式具有更长的延迟数据处理能力。

转自:merlingyu(余杨),研发工程师

0 人点赞