关于EventTime所带来的问题

2022-04-18 13:30:01 浏览数 (1)

在Flink中,EventTime即事件时间,能够反映事件在某个时间点发生的真实情况,即使在任务重跑情况也能够被还原,计算某一段时间内的数据,那么只需要将EventTime范围的数据聚合计算即可,但是数据在上报、传输过程中难免会发生数据延时,进而造成数据乱序,就需要考虑何时去触发这个计算,Flink使用watermark来衡量当前数据进度,使用时间戳表示,在数据流中随着数据一起传输,当到watermark达用户设定的允许延时时间,就会触发计算。但是在使用EventTime的语义中,会出现一些不可预知的问题,接下来会介绍笔者在使用过程中遇到的一些问题与解决办法。

EventTime倾斜

EventTime倾斜是指在有shuffle的操作中,一个task会接受上游多个task的数据,同样也会接受上游多个watermark,但是存在其中一个task的watermark相对于其他task的watermark滞后很多的情况,根据watermark的对齐机制,会选择多个通道最小watermark值,这样就会导致下游基于EventTime操作一直无法触发或者滞后触发。

情形:在处理上游kafka中业务数据,将业务设定的唯一键作为发送kafka数据的key,那么相同键的数据被分配在相同的partition, 下游flink任务处理使用唯一键作为key进行keyBy操作,然后使用ProcessFunction处理,在ProcessFunction中会注册EventTime定时器,那么就会根据watermark触发onTimer操作。在任务上线运行良好一段时间后,收到反馈没有结果输出。

排查思路:查看任务日志,没有异常日志;查看任务消费情况,该topic的数据正常被消费;查看背压、GC指标,一切正常。最终还是要回归到任务处理逻辑本身,数据的输出由onTimer来触发,而onTimer的调用则是由watermark来决定的,只有当watermark达到注册的事件时间才会触发onTimer操作,那么出现问题的点应该就是watermark,于是查看了该处理节点的watermark值,发现其值一直都处于一个很低的水平,观察发现就算有数据流入watermark值还是未被更新,此刻想到了watermark对齐机制的处理,于是查看上游各个task的watermark情况,发现其中一个task的watermark很长时间都未被更新,查看数据流入情况也没有发现有数据在持续的流入,于是查看该task对应消费的partition监控,果然很长时间没有数据产生,那么原因也很明朗了:由于上游task一直没有数据产生导致其watermark一直未更新,根绝watermark对齐机制,在ProcessFunction节点的watermark也会一直不更新导致无法处理计算。

解决方式:在注册EventTime定时器的同时注册ProcessingTime定时器,那么ProcessingTime定时器触发是由系统时间来决定触发的,随着时间的推进一定会触发输出操作,对于EventTime触发的输出只需要做覆盖即可。

对于消费端为空的情况,例如设置的source并行度大于topic的分区数,那么就会存在source task没有数据可消费,在source端(FlinkKafkaConsuerm)会发送一个IDLE的标志,表示在下游watermark对齐机制中忽略该通道的值,就不会影响watermark的流转。但是如果针对上面的情形,刚开始有数据但是后续无数据,就会造成watermark无法更新,对此Flink在内部实现了IDLE-Timeout的策略,在指定的timeout时间范围内,没有数据输出,就会往下游发送IDLE标志,当有数据流入就会发送ACTIVE标志,重新参与watermark的对齐机制,此功能在1.11版本前处于关闭的状态,具体可看(https://issues.apache.org/jira/browse/FLINK-5018)

消费不均匀

在一个task消费多个partition的情况下,但是partition数据倾斜比较严重,对于目前KafkaConsumer还无法做到均匀的消费每一个partition,就会导致从每个partition拿到的数据不均匀: 其中一个partition拉取的数据多,另外的partition拉取的数据少,那么在下游生成watermark的时候,消费多的partition的数据会提升watermark的值,而后在去拉取消费滞后的partition数据,会判断为延时数据从而被过滤掉。

情形:在一个EventTime-Window计算的业务处理中,source-task与partition是一对多的情况,发现最终生成的数据图表呈下降趋势,查看任务numLateRecordsDropped指标(表示延时丢弃的数据量),一直呈上升趋势,同时查看到该topic的监控中,某些partition消费lag维持在几百左右,某些却在几千左右,由此判断由于消费不均匀导致watermark迅速被提升,从而导致大量数据被判断为延时数据所丢弃。

解决方式:将source-task的并行度设置为与topic parition数一致,那么对于每一个task来说消费能力相当,watermark 能够维持在一个相对平稳的状态,并且在watermark对齐过程中,会选择值最小的通道watermark值,因此能够解决消费不均匀的问题。

数据延时

只要是在Event-Time语义的数据流中,就不可避免一个问题:数据延时,通常情况下会设置一个允许数据延时的大小,也许你会想将延时设置很大,那么同样带来的问题就是增加了处理的延时性,对于处理要求实时的来说是不可取的。对于数据延时来说,通常的做法就是要么将延时数据丢弃、要么单独处理延时数据。

首先了解一下对于Window是如何处理延时数据的,在默认情况下window是会将延时数据丢弃的,如果你想进一步像设置watermark一样再给window设置一个容忍的延时程度,可以设置allowedLateness参数值(默认为0),表示窗口允许的延时,在判断是否延时数据时会将allowedLateness算进去:数据所在窗口的endTime allowedLateness<=currWatermark, 窗口的触发条件仍然是endTime<=currWatermark,但是窗口状态数据清理条件是endTime allowedLateness<=currWatermark,因此当watermark到达触发窗口条件但是未达到清理条件时,也就是在allowedLateness延时范围内,每来一条数据就会触发一次窗口的计算,同时也增加了窗口状态的保留时间,对内存会造成一定的负担。如果在设置allowedLateness后,仍然存在数据延时的情况,可设置sideOutputTag,单独获取到延时的数据而后做进一步的处理。

延时丢弃的方式是最为简单的一种方式,同时也会对数据正确性造成一定的误差,但是如果想处理延时数据,就需要考虑如何与已经输出的数据做合并计算(例如:聚合操作),由于合并过程可能会出现任务失败恢复情况,会导致重复合并,对于不允许重复合并的情况下,在这个过程中又需要考虑数据一致性的问题,可以使用Flink提供的两阶段提交帮助完成。

以上是笔者在实际中使用EventTime语义的情况下遇到的几个问题,但是笔者更加建议尽可能的去EventTime化,将实时处理的语义转换为离线处理的语义,例如对于window的聚合操作转换为对时间字段的聚合操作,尽可能减少由于乱序造成对数据正确性的偏差影响。

0 人点赞