在 《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们介绍了滚动窗口。本节我们要介绍滑动窗口。
滑动(Sliding)和滚动(Tumbling)的区别
正如其名,“滑动”是指这个窗口沿着一定的方向,按着一定的速度“滑行”。
而滚动窗口,则是一个个“衔接着”,而不是像上面那样交错着。
它们的相同之处就是:只有窗口内的事件数量到达窗口要求的数值时,这些窗口才会触发计算。
样例
我们只要对《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》中的代码做轻微的改动即可。为了简化样例,我们只看Key为E的元素的滑动。
代码语言:javascript复制word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]
def word_count():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
# write all the data to one file
env.set_parallelism(1)
source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
# define the source
# mappging
source = env.from_collection(word_count_data, source_type_info)
# source.print()
# keying
keyed=source.key_by(lambda i: i[0])
窗口为2,滑动距离为1
count_window会根据传入的第二参数决定是构建滚动(CountTumblingWindowAssigner)窗口还是滑动(CountSlidingWindowAssigner)窗口。
代码语言:javascript复制 def count_window(self, size: int, slide: int = 0):
"""
Windows this KeyedStream into tumbling or sliding count windows.
:param size: The size of the windows in number of elements.
:param slide: The slide interval in number of elements.
.. versionadded:: 1.16.0
"""
if slide == 0:
return WindowedStream(self, CountTumblingWindowAssigner(size))
else:
return WindowedStream(self, CountSlidingWindowAssigner(size, slide))
我们只要给count_window第二个参数传递一个不为0的值,即可达到滑动效果。
代码语言:javascript复制 # reducing
windows_size = 2
sliding_size = 1
reduced=keyed.count_window(windows_size, sliding_size)
.apply(SumWindowFunction(),
Types.TUPLE([Types.STRING(), Types.INT()]))
# # define the sink
reduced.print()
# submit for execution
env.execute()
(E,2) (E,2) (E,2) (E,2) (E,2)
窗口为3,滑动距离为1
代码语言:javascript复制 # reducing
windows_size = 3
sliding_size = 1
reduced=keyed.count_window(windows_size, sliding_size)
.apply(SumWindowFunction(),
Types.TUPLE([Types.STRING(), Types.INT()]))
(E,3) (E,3) (E,3) (E,3)
窗口为3,滑动距离为2
代码语言:javascript复制 # reducing
windows_size = 3
sliding_size = 2
reduced=keyed.count_window(windows_size, sliding_size)
.apply(SumWindowFunction(),
Types.TUPLE([Types.STRING(), Types.INT()]))
(E,3) (E,3)
窗口为3,滑动距离为3
这个就等效于滚动窗口了,因为“滑”过了窗口大小。
代码语言:javascript复制 # reducing
windows_size = 3
sliding_size = 3
reduced=keyed.count_window(windows_size, sliding_size)
.apply(SumWindowFunction(),
Types.TUPLE([Types.STRING(), Types.INT()]))
(E,3) (E,3)
完整代码
代码语言:javascript复制from typing import Iterable
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import CountWindow
class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):
def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):
return [(key, len([e for e in inputs]))]
word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]
def word_count():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
# write all the data to one file
env.set_parallelism(1)
source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
# define the source
# mappging
source = env.from_collection(word_count_data, source_type_info)
# source.print()
# keying
keyed=source.key_by(lambda i: i[0])
# reducing
windows_size = 3
sliding_size = 1
reduced=keyed.count_window(windows_size, sliding_size)
.apply(SumWindowFunction(),
Types.TUPLE([Types.STRING(), Types.INT()]))
# # define the sink
reduced.print()
# submit for execution
env.execute()
if __name__ == '__main__':
word_count()
参考资料
- https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/