实时新增类指标标准化处理方案的一些经验。
实时新增类指标
大体上可以将实时新增类指标以以下两种维度进行分类。
- identity id 类型维度
identity id 类型 | 备注 |
---|---|
number(long) 类型 identity id | 数值类型 identity id 的好处在于可以使用 Bitmap 类组件做到精确去重。 |
字符类型 identity id | 字符类型 identity id 去重相对复杂,有两种方式,在误差允许范围之内使用 BloomFilter 进行去重,或者使用 key-value 组件进行精确去重。 |
- 产出数据类型
产出数据类型 | 备注 |
---|---|
明细类数据 | 此类数据一般是要求将新增的数据明细产出,uv 的含义是做过滤,产出的明细数据中的 identity id 不会有重复。输出明细数据的好处在于,我们可以在下游使用 OLAP 引擎对明细数据进行各种维度的聚合计算,从而很方便的产出不同维度下的 uv 数据。 |
聚合类数据 | 将一个时间窗口内的 uv 进行聚合,并且可以计算出分维度的 uv,其产出数据一般都是[维度 uv_count],但是这里的维度一般情况下是都是固定维度。如果需要拓展则需要改动源码。 |
计算链路
新增产出的链路多数就是以上两种维度因子的相互组合。明细类数据和聚合类数据一般情况下都是时间窗口聚合算子的不同,可以在单张图片中进行标注,所以下述分类按照 identity id 类型进行区分。
- number(long) 类型 identity id
其中 RoaringBitmap 存储在 Flink State 中,占用内存小,并且是本地内存访问,效率很高。但是需要注意,频繁更新 Flink State 也是非常耗费资源的,因此建议可以在去重之前攒一批数据进行批量去重,常用方法就是开一个窗口,在窗口操作之后批量去重。
RoaringBitmap 代码示例:
代码语言:javascript复制public interface RoaringBitmapDuplicateable<Model> {
long DEFAULT_DUPLICATE_MILLS = 24 * 3600 * 1000L;
BiPredicate<Long, Long> ROARING_BIT_MAP_CLEAR_BI_PREDICATE =
(start, end) -> end - start >= DEFAULT_DUPLICATE_MILLS;
// 初始化
default ValueState<Tuple2<Long, Roaring64NavigableMap>> getBitMapValueState(String name) {
return this.getRuntimeContext().getState(
new ValueStateDescriptor<>(name, TypeInformation.of(
new TypeHint<Tuple2<Long, Roaring64NavigableMap>>() { }))
);
}
RuntimeContext getRuntimeContext();
long getLongId(Model model);
Optional<Logger> getLogger();
default BiPredicate<Long, Long> roaringBitMapClearBiPredicate() {
return ROARING_BIT_MAP_CLEAR_BI_PREDICATE;
}
default List<Model> duplicateAndGet(List<Model> models, long windowStartTimestamp
, ValueState<Tuple2<Long, Roaring64NavigableMap>> bitMapValueState) throws IOException {
Tuple2<Long, Roaring64NavigableMap> bitMap = checkAndGetState(windowStartTimestamp, bitMapValueState);
Map<Long, Model> idModelsMap = models
.stream()
.collect(Collectors.toMap(this::getLongId, Function.identity(), (oldOne, newOne) -> oldOne));
Set<Long> ids = idModelsMap.keySet();
List<Model> newModels = Lists.newLinkedList();
for (Long id : ids) {
if (!bitMap.f1.contains(id)) {
if (idModelsMap.containsKey(id)) {
newModels.add(idModelsMap.get(id));
}
}
}
newModels.stream()
.map(this::getLongId)
.forEach(bitMap.f1::add);
bitMapValueState.update(bitMap);
return newModels;
}
default long duplicateAndCount(List<Model> models, long windowStartTimestamp
, ValueState<Tuple2<Long, Roaring64NavigableMap>> bitMapValueState) throws IOException {
Tuple2<Long, Roaring64NavigableMap> bitMap = checkAndGetState(windowStartTimestamp, bitMapValueState);
Set<Long> ids = models
.stream()
.map(this::getLongId)
.collect(Collectors.toSet());
List<Long> newIds = Lists.newLinkedList();
int count = 0;
for (Long id : ids) {
if (!bitMap.f1.contains(id)) {
newIds.add(id);
count ;
}
}
newIds.forEach(bitMap.f1::add);
bitMapValueState.update(bitMap);
return count;
}
default Tuple2<Long, Roaring64NavigableMap> checkAndGetState(long windowStartTimestamp
, ValueState<Tuple2<Long, Roaring64NavigableMap>> bitMapValueState) throws IOException {
Tuple2<Long, Roaring64NavigableMap> bitmap = bitMapValueState.value();
if (null == bitmap) {
this.getLogger().ifPresent(logger ->
logger.info("New RoaringBitMapValueState Timestamp={}", windowStartTimestamp));
Tuple2<Long, Roaring64NavigableMap> newBitMap = Tuple2.of(windowStartTimestamp, new Roaring64NavigableMap());
bitMapValueState.update(newBitMap);
return newBitMap;
} else if (this.roaringBitMapClearBiPredicate().test(bitmap.f0, windowStartTimestamp)) {
this.getLogger().ifPresent(logger ->
logger.info("Clear RoaringBitMapValueState, from start={} to end={}", bitmap.f0, windowStartTimestamp));
bitMapValueState.clear();
bitmap.f1.clear();
Tuple2<Long, Roaring64NavigableMap> newBitMap = Tuple2.of(windowStartTimestamp, new Roaring64NavigableMap());
bitMapValueState.update(newBitMap);
return newBitMap;
} else {
return bitmap;
}
}
}
- 字符类型 identity id
1.使用 Flink state
2.使用 key-value 外存
如果选用的是 Redis 作为 key-value 过滤,那么这里会有一个巧用 Redis bit 特性的优化。举一个一般场景下的方案与使用 Redis bit 特性的方案做对比:
场景:
- 假如需要同一天有几十场活动,并且都希望计算出这几十场活动的 uv;
- 计算不同维度下的新增类指标,这就需要我们按照所有不同的维度进行 uv 统计;
上述两类场景下,对于不同的活动或者不同维度下的指标,identity id 的范围都相同。
通常方案:
这种场景下,如果有 1 亿用户,需要同时计算 50 个活动或者 50 个不同维度下的 uv。那么理论上最大 key 数量为 1 亿 * 50 = 50 亿个 key。
根据 http://www.redis.cn/redis_memory/ 网站计算,所占用 Redis 内存大概 434GB。
如果我们利用 Redis bit 特性,那么可以按照如下方案进行设计。
Redis bit 方案:
这样做的一个优点,就是这几十场活动的 uv 计算都使用了相同的 Redis key 来计算,可以大幅度减少 Redis 的容量占用。使用此方案的话,以上述通常方案下相同的用户和活动场数来计算,理论上最大 key 数量仅仅为 1 亿,只是 value 数量会占用 50 个 bit。
根据 http://www.redis.cn/redis_memory/ 网站计算,所占用 Redis 内存不到15GB,此处为了计算方便,直接将 50 bit 映射成为 50 字节进行计算。
综上我们可以看出,使用 Redis bit 特性会将存储成本变为原来的 4% 不到。可以极大的节约我们的存储成本。