1.前言
本文主要是整理博主收集的 Flink、OLAP 高频面试题。之后每周都会有一篇。
博主把这一期的面试题先贴出来,大家自己感受感受。
- ⭐ Flink 任务 failover 之后,可能会重复写出数据到 Sink 中,你们公司是怎么做到端对端 exactly-once 的?
- ⭐ OLAP 引擎那么多,是因为 ClickHouse 的哪个特点促使它的性能的突出?
- ⭐ ClickHouse 不支持高并发,这是真的吗?Redis支持高并发这也是真的吗?
- ⭐ 让你使用用户心跳日志(20s 上报一次)计算同时在线用户、DAU 指标,你怎么设计链路?
- ⭐ Flink 配置 State TTL 时都有哪些配置项?每种配置项的作用?
- ⭐ Flink State TTL 是怎么做到数据过期的?
下面的答案都是博主收集小伙伴萌的答案 博主自己的理解进行的一个总结。
2.Flink 任务 failover 之后,可能会重复写出数据到 Sink 中,你们公司是怎么做到端对端 exactly-once 的?
端对端 exactly-once 有 3 个条件:
- ⭐ Source 引擎可以重新消费,比如 Kafka 可以重置 offset 进行重新消费
- ⭐ Flink 任务配置 exactly-once,保证 Flink 任务 State 的 exactly-once
- ⭐ Sink 算子支持两阶段或者可重入,保证产出结果的 exactly-once
其中前两项一般大多数引擎都支持,我们需要关注的就是第 3 项,目前有两种常用方法:
- ⭐ Sink 两阶段:由于两阶段提交是随着 Checkpoint 进行的,假设 Checkpoint 是 5min 做一次,那么数据对下游消费方的可见性延迟至少也是 5min,所以会有数据延迟等问题,目前用的比较少。
- ⭐ Sink 支持可重入:举例:
- ⭐ Sink 为 MySQL:可以按照 key update 数据
- ⭐ Sink 为 Druid:聚合类型可以选用 longMax
- ⭐ Sink 为 ClickHouse:查询时使用 longMax 或者使用 ReplacingMergeTree 表引擎将重复写入的数据去重,这里有小伙伴会担心 ReplacingMergeTree 会有性能问题,但是博主认为其实性能影响不会很大,因为 failover 导致的数据重复其实一般情况下是小概率事件,并且重复的数据量也不会很大,也只是一个 Checkpoint 周期内的数据重复,所以使用 ReplacingMergeTree 是可以接受的)
- ⭐ Sink 为 Redis:按照 key 更新数据
3.OLAP 引擎那么多,是因为 ClickHouse 的哪个特点促使它的性能的突出?
一些我们能在其他 OLAP 引擎上面见到的优化有:
- ⭐ 列存储
- ⭐ 编码压缩
- ⭐ 多索引
- ⭐ 物化视图(Cube/Rollup)
在 ClickHouse 上特别突出的有:
- ⭐ 应景优化
- ⭐ 向量化执行
- ⭐ 持续测试和持续改进
3.1.列存储
ClickHouse 采用列存储,这对于分析型请求非常高效。
行存储:从存储系统读取所有满足条件的行数据,然后在内存中过滤出需要的字段,速度较慢。比如,一个表有 10 列,我其实只查 1 列数据的话,行存储还是会把 10 列数据都扫描一遍。
1
列存储:仅从存储系统中读取必要的列数据,无用列不读取,速度非常快。相同的例子,一个表有 10 列,我其实只查 1 列数据的话,列存储就只扫描这一列数据
2
3.2. 编码压缩
由于 ClickHouse 采用列存储,相同列的数据连续存储,且底层数据在存储时是经过排序的,这样数据的局部规律性非常强,有利于获得更高的数据压缩比。
此外,ClickHouse 除了支持 LZ4、ZSTD 等通用压缩算法外,还支持 Delta、DoubleDelta、Gorilla 等专用编码算法,用于进一步提高数据压缩比。
其中 DoubleDelta、Gorilla 是 Facebook 专为时间序数据而设计的编码算法,理论上在列存储环境下,可接近专用时序存储的压缩比,详细可参考 Gorilla 论文。
3
3.3.多索引
列存用于裁剪不必要的字段读取,而索引则用于裁剪不必要的记录读取。ClickHouse 支持丰富的索引,从而在查询时尽可能的裁剪不必要的记录读取,提高查询性能。
ClickHouse 中最基础的索引是主键索引。ClickHouse 的底层数据按建表时指定的 ORDER BY 列进行排序,并按 index_granularity 参数切分成数据块,然后抽取每个数据块的第一行形成一份稀疏的排序索引。
用户在查询时,如果查询条件包含主键列,则可以基于稀疏索引进行快速的裁剪。
ClickHouse 支持更多其他的索引类型,不同索引用于不同场景下的查询裁剪,具体汇总如下,更详细的介绍参考 ClickHouse 官方文档:
4
3.4.物化视图(Cube/Rollup)
OLAP 分析领域有两个典型的方向:
- ⭐ ROLAP:通过列存、索引等各类技术手段,提升查询时性能。
- ⭐ MOLAP:通过预计算提前生成聚合后的结果数据,降低查询读取的数据量,属于计算换性能方式。
前者更为灵活,但需要的技术栈相对复杂;后者实现相对简单,但要达到的极致性能,需要生成所有常见查询对应的物化视图,消耗大量计算、存储资源。
物化视图的原理如下图所示,可以在不同维度上对原始数据进行预计算汇总,这样我们查询时就可以直接查询到聚合好的数据上面,查询效率更高:
5
3.5.场景优化
其会在不同的场景使用不同的算法。
例如,在去重函数 uniqCombined 中,会根据数据量选择不同的算法:数据量比较少的时候,会选择使用 Array 来保存;数据量中等的时候,使用 HashSet;数据量很大的时候,会使用 HyperLogLog 算法。
并且表引擎很丰富,有 20 多种,每种表引擎都做了很多的优化,这个道理就和小伙伴萌工作时为每类工作场景专门设计对应的解决方案一样,效果当然是不错的。
3.6.向量化执行
向量化执行。SIMD 被广泛地应用于文本转换、数据过滤、数据解压和 JSON 转换等场景。相对于单纯使用 CPU,利用寄存器暴力优化也算是一种降维打击,毕竟 "能用机器资源解决的问题就别手动优化"。
以商品订单数据为例,查询某个订单总价格的处理过程,由传统的按行遍历处理的过程,转换为按 Block 处理的过程。
具体如下图:
6
3.7.持续测试和持续改进
由于拥有 Yandex 的天然优势,经常会使用真实数据来进行测试,尝试使用于各个场景。也因此获得了快速的版本更新换代,基本维持在一个月一更新。
并且在业界有新的算法出现时,ClickHouse 的开发人员也会积极去测试。
4.ClickHouse 不支持高并发,这是真的吗?Redis支持高并发这也是真的吗?
其实这个问题主要是为了让大家不要陷入一个固有想法中。
举个例子:
- ClickHouse 中一个表只有 1w 行数据,ClickHouse 的并发能力不会差
- 当 Redis 中存储 200MB value 的 string 时,Redis 的并发也上不去的
只是这些引擎尝尝被用于满足对应场景的需求。
比如 ClickHouse 用于大宽表的灵活 SQL 计算,这种场景的并发肯定不会很高。Redis 常被用于小 key 小 value set,get 场景,那么这种场景的并发肯定也不会低的。
每种引擎都有对应的瓶颈处,只要你没有达到这个瓶颈阈值,并发都不会低。
5.让你使用用户心跳日志(20s 上报一次)计算同时在线用户、DAU 指标,你怎么设计链路?
参考了很多小伙伴的解决方案,大概分为几种:
- ⭐ 有提到 bitmap、hyberloglog、布隆过滤器、redis 等方法计算去重的
- ⭐ 有提到将用户上线标记为 1,下线标记为 0 的,然后将上线下线数据发到消息队列用实时计算引擎统计的
- ⭐ 有提到将用户心跳日志借助 Session Window Dynamic Gap 计算的
博主认为其中第一种方案大家基本都能答上来,第二种和第三种是相对比比较创新的,但是实现逻辑较复杂,大家可以学习对应的思想。
这里博主结合大家的想法给出答案:
首先我们使用最简单直接的方式 2 个指标分拆开来计算:
- ⭐ 同时在线用户:
- 输入:心跳日志
- 计算方法:a. SQL:1min tumble window(count distinct 实际是 MapState) b. DataStream:1min tumble window(去重可用 bitmap、hyberloglog、布隆过滤器)
- 输出:聚合结果数据
- ⭐ DAU:
- 输入:心跳日志
- 计算方法:a. SQL:1day cumulate window(count distinct 实际是 MapState) b. DataStream:1day window continous trigger(去重可用 bitmap、hyberloglog、布隆过滤器)
- 输出:聚合结果数据
上面这个方法在 90% 的场景都没有啥问题,但是如果心跳日志数据 QPS 都很大,则每个任务都去消费一遍,链路稳定性差。
这里我们可以做一次优化,我们可以发现上面这 2 个指标其实是有先后顺序关系的。
- ⭐ 同时在线用户:分钟级别去重
- ⭐ DAU:天级别去重
所以为了减少流量,其实同时在线用户可以作为 DAU 的输入。优化链路如下:
- ⭐ 同时在线用户:
- 输入:心跳日志
- 计算方法:a. SQL:row_number() over (partition by unix_time/60 order by proctime)允许一定误差,所以可以使用 proctime b. DataStream:去重可用 bitmap、hyberloglog、布隆过滤器,输出这一分钟去重后的明细输出
- 输出:同时在线明细
- ⭐ DAU:
- 输入:【同时在线用户】明细数据
- 计算方法:a. SQL:row_number() over (partition by unix_time/24/3600 order by proctime) b. DataStream:去重可用 bitmap、hyberloglog、布隆过滤器,输出这一分钟去重后的明细输出
- 输出:DAU 的明细
最终这样输出的数据无论是在来一个 ads 任务做聚合还是直接导入到 MySQL、ClickHouse、Druid 都可以,因为都只是计算 count 而已。
6.Flink 配置 State TTL 时都有哪些配置项?每种配置项的作用?
Flink 对状态做了能力扩展,即 TTL。它的能力其实和 redis 的过期策略类似,举例:
- ⭐ 支持 TTL 更新类型:更新 TTL 的时机
- ⭐ 访问到已过期数据的时的数据可见性
- ⭐ 过期时间语义:目前只支持处理时间
- ⭐ 具体过期实现:lazy,后台线程
那么首先我们看下什么场景需要用到 TTL 机制呢?举例:
比如计算 DAU 使用 Flink MapState 进行去重,到第二天的时候,第一天的 MapState 就可以删除了,就可以用 Flink State TTL 进行自动删除(当然你也可以通过代码逻辑进行手动删除)。
其实在 Flink DataStream API 中,TTL 功能还是比较少用的。Flink State TTL 在 Flink SQL 中是被大规模应用的,几乎除了窗口类、ETL(DWD 明细处理任务)类的任务之外,SQL 任务基本都会用到 State TTL。
那么我们在要怎么开启 TTL 呢?这里分 DataStream API 和 SQL API:
- ⭐ DataStream API:
private final MapStateDescriptor<String, List<Item>> mapStateDesc =
new MapStateDescriptor<>(
"itemsMap",
BasicTypeInfo.STRING_TYPE_INFO,
new ListTypeInfo<>(Item.class));
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 使用 StateTtlConfig 开启 State TTL
mapStateDesc.enableTimeToLive(StateTtlConfig
.newBuilder(Time.milliseconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(10)
.build());
}
关于 StateTtlConfig 的每个配置项的功能如下图所示:
1
- ⭐ SQL API:
StreamTableEnvironment
.getConfig()
.getConfiguration()
.setString("table.exec.state.ttl", "180 s");
注意:SQL 中 TTL 的策略不如 DataStream 那么多,SQL 中 TTL 只支持下图所示策略:
6
7.Flink State TTL 是怎么做到数据过期的?
首先我们来想想,要做到 TTL 的话,要具备什么条件呢?
想想 Redis 的 TTL 设置,如果我们要设置 TTL 则必然需要给一条数据给一个时间戳,只有这样才能判断这条数据是否过期了。
在 Flink 中设置 State TTL,就会有这样一个时间戳,具体实现时,Flink 会把时间戳字段和具体数据字段存储作为同级存储到 State 中。
举个例子,我要将一个 String 存储到 State 中时:
- ⭐ 没有设置 State TTL 时,则直接将 String 存储在 State 中
- ⭐ 如果设置 State TTL 时,则 Flink 会将 <String, Long> 存储在 State 中,其中 Long 为时间戳,用于判断是否过期。
接下来以 FileSystem 状态后端下的 MapState 作为案例来说:
- ⭐ 如果没有设置 State TTL,则生产的 MapState 的字段类型如下(可以看到生成的就是 HeapMapState 实例):
2
- ⭐ 如果设置了 State TTL,则生成的 MapState 的字段类型如下(可以看到使用到了装饰器的设计模式生成是 TtlMapState):
3
注意: 任务设置了 State TTL 和不设置 State TTL 的状态是不兼容的。这里大家在使用时一定要注意。防止出现任务从 Checkpoint 恢复不了的情况。但是你可以去修改 TTL 时长,因为修改时长并不会改变 State 存储结构。
了解了基础数据结构之后,我们再来看看 Flink 提供的 State 过期的 4 种删除策略:
- ⭐ lazy 删除策略:就是在访问 State 的时候根据时间戳判断是否过期,如果过期则主动删除 State 数据
- ⭐ full snapshot cleanup 删除策略:从状态恢复(checkpoint、savepoint)的时候采取做过期删除,但是不支持 rocksdb 增量 ck
- ⭐ incremental cleanup 删除策略:访问 state 的时候,主动去遍历一些 state 数据判断是否过期,如果过期则主动删除 State 数据
- ⭐ rocksdb compaction cleanup 删除策略:rockdb 做 compaction 的时候遍历进行删除。仅仅支持 rocksdb
7.1.lazy 删除策略
访问 State 的时候根据时间戳判断是否过期,如果过期则主动删除 State 数据。以 MapState 为例,如下图所示,在 MapState.get(key) 时会进行判断是否过期:
这个删除策略是不需要用户进行配置的,只要你打开了 State TTL 功能,就会默认执行。
4
7.2.full snapshot cleanup 删除策略
从状态恢复(checkpoint、savepoint)的时候采取做过期删除,但是不支持 rocksdb 增量 checkpoint。
代码语言:javascript复制StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot()
.build()
7.3.incremental cleanup 删除策略
访问 state 的时候,主动去遍历一些 state 数据判断是否过期,如果过期则主动删除 State 数据。
代码语言:javascript复制StateTtlConfig
.newBuilder(Time.seconds(1))
// 每访问 1 此 state,遍历 1000 条进行删除
.cleanupIncrementally(1000, true)
.build()
5
注意:
- ⭐ 如果没有 state 访问,也没有处理数据,则不会清理过期数据。
- ⭐ 增量清理会增加数据处理的耗时。
- ⭐ 现在仅 Heap state backend 支持增量清除机制。在 RocksDB state backend 上启用该特性无效。
- ⭐ 因为是遍历删除 State 机制,并且每次遍历的条目数是固定的,所以可能会出现部分过期的 State 很长时间都过期不掉导致 Flink 任务 OOM。
7.4.rocksdb compaction cleanup 删除策略
仅仅支持 rocksdb。在 rockdb 做 compaction 的时候遍历进行删除。
代码语言:javascript复制StateTtlConfig
.newBuilder(Time.seconds(1))
// 做 compaction 时每隔 3 个 entry,重新更新一下时间戳(这个时间戳是 Flink 用于和数据中的时间戳来比较判断是否过期)
.cleanupInRocksdbCompactFilter(3)
.build()
注意:rocksdb compaction 时调用 TTL 过滤器会降低 compaction 速度。因为 TTL 过滤器需要解析上次访问的时间戳,并对每个将参与压缩的状态进行是否过期检查。对于集合型状态类型(比如 ListState 和 MapState),会对集合中每个元素进行检查。