flink时间系统系列之ProcessFunction 使用分析

2022-04-18 11:21:03 浏览数 (1)

flink时间系统系列篇幅目录:

一、时间系统概述介绍

二、Processing Time源码分析

三、Event Time源码分析

四、时间系统在窗口函数中的应用分析

五、ProcessFunction 使用分析

六、实例讲解:如何做定时输出

ProcessFunction 是flink 提供面向用户low-level 层级的api,通过ProcessFunction可以访问state、注册处理时间/事件时间定时器来帮助我们完成一些比较复杂的操作,但是其有一个限制那就是只用使用在keyedStream中,是由于根据getRuntimeContext 得到的StreamingRuntimeContext 只提供了KeyedStateStore的访问权限,所以只能访问keyd state, 另外根据前面的分析可知,注册的定时器必须是与key相关,也就解释了在ProcessFunction中只能在keyedStream做定时器注册。目前在flink中,提供了ProcessFunction与KeyedProcessFunction 这两个面向用户的api,但是ProcessFunction却无法帮助我们注册定时器,透过源码(ProcessOperator)可以发现,注册时会主动抛出UnsupportedOperationException异常。今天重点在于分析KeyedProcessFunction 是如何完成定时功能。

首先以官方文档为例来了解其用法,完成单词计数,并且定时输出功能,文档里面是定义了一个继承ProcessFunction 的的类,猜想这里应该是很早之前的版本文档。

做一个简单的代码流程分析:首先得到一个Tuple2[String,String]类型的数据流,然后按照第一个位置的字段进行分组,那么相同的字段发送到下游相同的节点,后面使用继承ProcessFunction 的CountWithTimeoutFunction 这么个函数,在内部定义了一个名为myState 类型为ValueState的状态,数据处理逻辑processElement:当一条数据流入,首先判断myState中是否存在该key的数据,不存在则计数1,存在则 1, 然后更新到myState,然后通过ctx.timerService.registerEventTimeTimer注册一个事件时间往后推迟60s 的定时用,当达到触发条件(watermark大于等于注册的时间)就会触发定时任务执行onTimer 方法,然后执行判断并且输出。

接下来从源码角度去理解ProcessFunction是如何实现这个功能的,从KeyedStream 为入口查看:

我们重点所要分析的类就是KeyedProcessOperator,它继承了AbstractUdfStreamOperator并且实现了Triggerable接口,而AbstractUdfStreamOperator 又继承了AbstractStreamOperator,

该operator在初始化open中定义了一个名为user-timers 的InternalTimerService服务,然后包装在TimerService对象中,提供给ContextImpl对象与OnTimerContextImpl对象,

在看其processElement方法,将ContextImpl对象最为参数传给了用户ProcessFunction函数的processElement方法中,也就为用户api层级提供了访问时间、注册定时器的入口,

接下来看下定时器的执行逻辑,在open初始化方法中初始化InternalTimerService传入了一个Triggerable 类型的this对象,也就是当前KeyedProcessOperator对象,由之前的分析可知最后定时调用会调用onEventTime或者onProcessingTime方法,

都会调用invokeUserFunction,

最终调用到了用户ProcessFunction函数中的onTimer方法,调用时传入了OnTimerContextImpl对象,其持有IntervalTimeService服务,也可以注册定时器操作。

以上就是关于ProcessFunction 对于定时器的使用分析。

0 人点赞