实时新增类指标标准化处理方案

2022-04-04 10:29:14 浏览数 (1)

实时新增类指标标准化处理方案的一些经验。

实时新增类指标

大体上可以将实时新增类指标以以下两种维度进行分类。

  • identity id 类型维度

identity id 类型

备注

number(long) 类型 identity id

数值类型 identity id 的好处在于可以使用 Bitmap 类组件做到精确去重。

字符类型 identity id

字符类型 identity id 去重相对复杂,有两种方式,在误差允许范围之内使用 BloomFilter 进行去重,或者使用 key-value 组件进行精确去重。

  • 产出数据类型

产出数据类型

备注

明细类数据

此类数据一般是要求将新增的数据明细产出,uv 的含义是做过滤,产出的明细数据中的 identity id 不会有重复。输出明细数据的好处在于,我们可以在下游使用 OLAP 引擎对明细数据进行各种维度的聚合计算,从而很方便的产出不同维度下的 uv 数据。

聚合类数据

将一个时间窗口内的 uv 进行聚合,并且可以计算出分维度的 uv,其产出数据一般都是[维度 uv_count],但是这里的维度一般情况下是都是固定维度。如果需要拓展则需要改动源码。

计算链路

新增产出的链路多数就是以上两种维度因子的相互组合。明细类数据和聚合类数据一般情况下都是时间窗口聚合算子的不同,可以在单张图片中进行标注,所以下述分类按照 identity id 类型进行区分。

  • number(long) 类型 identity id

其中 RoaringBitmap 存储在 Flink State 中,占用内存小,并且是本地内存访问,效率很高。但是需要注意,频繁更新 Flink State 也是非常耗费资源的,因此建议可以在去重之前攒一批数据进行批量去重,常用方法就是开一个窗口,在窗口操作之后批量去重。

RoaringBitmap 代码示例:

代码语言:javascript复制
public interface RoaringBitmapDuplicateable<Model> {

    long DEFAULT_DUPLICATE_MILLS = 24 * 3600 * 1000L;

    BiPredicate<Long, Long> ROARING_BIT_MAP_CLEAR_BI_PREDICATE =
            (start, end) -> end - start >= DEFAULT_DUPLICATE_MILLS;

    // 初始化
    default ValueState<Tuple2<Long, Roaring64NavigableMap>> getBitMapValueState(String name) {
        return this.getRuntimeContext().getState(
                new ValueStateDescriptor<>(name, TypeInformation.of(
                        new TypeHint<Tuple2<Long, Roaring64NavigableMap>>() { }))
        );
    }

    RuntimeContext getRuntimeContext();

    long getLongId(Model model);

    Optional<Logger> getLogger();

    default BiPredicate<Long, Long> roaringBitMapClearBiPredicate() {
        return ROARING_BIT_MAP_CLEAR_BI_PREDICATE;
    }

    default List<Model> duplicateAndGet(List<Model> models, long windowStartTimestamp
            , ValueState<Tuple2<Long, Roaring64NavigableMap>> bitMapValueState) throws IOException {

        Tuple2<Long, Roaring64NavigableMap> bitMap = checkAndGetState(windowStartTimestamp, bitMapValueState);

        Map<Long, Model> idModelsMap = models
                .stream()
                .collect(Collectors.toMap(this::getLongId, Function.identity(), (oldOne, newOne) -> oldOne));

        Set<Long> ids = idModelsMap.keySet();

        List<Model> newModels = Lists.newLinkedList();
        for (Long id : ids) {
            if (!bitMap.f1.contains(id)) {
                if (idModelsMap.containsKey(id)) {
                    newModels.add(idModelsMap.get(id));
                }
            }
        }

        newModels.stream()
                .map(this::getLongId)
                .forEach(bitMap.f1::add);
        bitMapValueState.update(bitMap);
        return newModels;
    }

    default long duplicateAndCount(List<Model> models, long windowStartTimestamp
            , ValueState<Tuple2<Long, Roaring64NavigableMap>> bitMapValueState) throws IOException {

        Tuple2<Long, Roaring64NavigableMap> bitMap = checkAndGetState(windowStartTimestamp, bitMapValueState);

        Set<Long> ids = models
                .stream()
                .map(this::getLongId)
                .collect(Collectors.toSet());

        List<Long> newIds = Lists.newLinkedList();
        int count = 0;
        for (Long id : ids) {
            if (!bitMap.f1.contains(id)) {
                newIds.add(id);
                count  ;
            }
        }

        newIds.forEach(bitMap.f1::add);
        bitMapValueState.update(bitMap);
        return count;
    }

    default Tuple2<Long, Roaring64NavigableMap> checkAndGetState(long windowStartTimestamp
            , ValueState<Tuple2<Long, Roaring64NavigableMap>> bitMapValueState) throws IOException {

        Tuple2<Long, Roaring64NavigableMap> bitmap = bitMapValueState.value();

        if (null == bitmap) {

            this.getLogger().ifPresent(logger ->
                    logger.info("New RoaringBitMapValueState Timestamp={}", windowStartTimestamp));
            Tuple2<Long, Roaring64NavigableMap> newBitMap = Tuple2.of(windowStartTimestamp, new Roaring64NavigableMap());
            bitMapValueState.update(newBitMap);
            return newBitMap;

        } else if (this.roaringBitMapClearBiPredicate().test(bitmap.f0, windowStartTimestamp)) {

            this.getLogger().ifPresent(logger ->
                    logger.info("Clear RoaringBitMapValueState, from start={} to end={}", bitmap.f0, windowStartTimestamp));

            bitMapValueState.clear();
            bitmap.f1.clear();
            Tuple2<Long, Roaring64NavigableMap> newBitMap = Tuple2.of(windowStartTimestamp, new Roaring64NavigableMap());
            bitMapValueState.update(newBitMap);
            return newBitMap;

        } else {
            return bitmap;
        }
    }
}
  • 字符类型 identity id
1.使用 Flink state
2.使用 key-value 外存

如果选用的是 Redis 作为 key-value 过滤,那么这里会有一个巧用 Redis bit 特性的优化。举一个一般场景下的方案与使用 Redis bit 特性的方案做对比:

场景:

  • 假如需要同一天有几十场活动,并且都希望计算出这几十场活动的 uv;
  • 计算不同维度下的新增类指标,这就需要我们按照所有不同的维度进行 uv 统计;

上述两类场景下,对于不同的活动或者不同维度下的指标,identity id 的范围都相同。

通常方案:

这种场景下,如果有 1 亿用户,需要同时计算 50 个活动或者 50 个不同维度下的 uv。那么理论上最大 key 数量为 1 亿 * 50 = 50 亿个 key。

根据 http://www.redis.cn/redis_memory/ 网站计算,所占用 Redis 内存大概 434GB。

如果我们利用 Redis bit 特性,那么可以按照如下方案进行设计。

Redis bit 方案:

这样做的一个优点,就是这几十场活动的 uv 计算都使用了相同的 Redis key 来计算,可以大幅度减少 Redis 的容量占用。使用此方案的话,以上述通常方案下相同的用户和活动场数来计算,理论上最大 key 数量仅仅为 1 亿,只是 value 数量会占用 50 个 bit。

根据 http://www.redis.cn/redis_memory/ 网站计算,所占用 Redis 内存不到15GB,此处为了计算方便,直接将 50 bit 映射成为 50 字节进行计算。

综上我们可以看出,使用 Redis bit 特性会将存储成本变为原来的 4% 不到。可以极大的节约我们的存储成本。

0 人点赞