本文总结了Flink Streaming的算子操作,统统简单实现一次算子操作类型,更加熟悉了Flink带来的便利,有时间可以浏览一次,理解一次,后面具体使用的时候,可以进行查看
Operators将一个或多个DataStream转换为新的DataStream。Flink程序可以将多种转换组合成复杂的数据流拓扑。
版本:Flink 1.10.0 语言:Scala
以下实现都使用了Scala语言,有需要Java版本的,可以直接官网查看
下面包含三部分,分别为
a. DataStream Transformations
b. Physical partitioning
c. Task chaining and resource groups
1. DataStream Transformations
Map DataStream → DataStream
取一个元素并产生一个元素。一个映射函数,将输入流的值加倍:
dataStream.map { x => x * 2 }
FlatMap DataStream → DataStream
取一个元素并产生零个,一个或多个元素。平面图功能,可将句子拆分为单词:
代码语言:javascript复制dataStream.flatMap { str => str.split(" ") }
Filter DataStream → DataStream
为每个元素评估一个布尔函数,并保留该函数返回true的布尔函数。过滤出零值的过滤器:
代码语言:javascript复制dataStream.filter { _ != 0 }
KeyBy DataStream → KeyedStream
在逻辑上将流划分为不相交的分区,每个分区都包含同一键的元素。在内部,这是通过哈希分区实现的。此转换返回KeyedStream
代码语言:javascript复制dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
Reduce KeyedStream → DataStream
对键控数据流进行“滚动”压缩。将当前元素与最后一个减小的值合并并发出新值。一个reduce函数,用于创建部分和流
代码语言:javascript复制keyedStream.reduce { _ _ }
Fold KeyedStream → DataStream
带有初始值的键控数据流上的“滚动”折叠。将当前元素与上一个折叠值组合在一起并发出新值。折叠函数,应用于序列(1,2,3,4,5)时,会发出序列“ start-1”,“ start-1-2”,“ start-1-2-3”,...根据相同的Key进行不断的折叠,新的key会进行新的折叠
代码语言:javascript复制val result: DataStream[String] =
keyedStream.fold("start")((str, i) => { str "-" i })
Aggregations KeyedStream → DataStream
在键控数据流上滚动聚合。min和minBy之间的区别是min返回最小值,而minBy返回该字段中具有最小值的元素(与max和maxBy相同).
代码语言:javascript复制keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
Window KeyedStream → WindowedStream
可以在已经分区的KeyedStreams上定义Windows。Windows根据某些特征将每个键中的数据分组(例如,最近5秒钟内到达的数据).
代码语言:javascript复制dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
WindowAll DataStream → AllWindowedStream
Windows可以在常规DataStreams上定义。Windows会根据某些特征(例如,最近5秒钟内到达的数据)对所有流事件进行分组。警告:*在许多情况下,这是非并行*转换。所有记录将被收集到windowAll运算符的一项任务中.
代码语言:javascript复制dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
Window Apply WindowedStream → DataStream AllWindowedream → DataStream
将一般功能应用于整个窗口。下面是一个手动求和窗口元素的函数。注意:如果使用windowAll转换,则需要使用AllWindowFunction代替.
代码语言:javascript复制windowedStream.apply { WindowFunction }
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
Window Reduce WindowedStream → DataStream
将功能化约简函数应用于窗口并返回缩减后的值.
代码语言:javascript复制windowedStream.reduce { _ _ }
Window Fold WindowedStream → DataStream
Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5":
代码语言:javascript复制val result: DataStream[String] =
windowedStream.fold("start", (str, i) => { str "-" i })
Aggregations on windows WindowedStream → DataStream
聚合窗口的内容。min和minBy之间的区别是min返回最小值,而minBy返回该字段中具有最小值的元素(与max和maxBy相同).
代码语言:javascript复制windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")
Union DataStream* → DataStream
两个或多个数据流的并集,创建一个包含所有流中所有元素的新流。注意:如果您将数据流与其自身合并,则在结果流中每个元素将获得两次.
代码语言:javascript复制dataStream.union(otherStream1, otherStream2, ...)
Window Join DataStream,DataStream → DataStream
在给定键和公共窗口上连接两个数据流
代码语言:javascript复制dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply { ... }
Window CoGroup DataStream,DataStream → DataStream
代码语言:javascript复制dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}
Connect DataStream,DataStream → ConnectedStreams
“连接”两个保持其类型的数据流,从而允许两个流之间共享状态.注意1. Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。2. Connect只能操作两个流,Union可以操作多个。
代码语言:javascript复制someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMap ConnectedStreams → DataStream
与连接的数据流上的map和flatMap相似
代码语言:javascript复制connectedStreams.map(
(_ : Int) => true,
(_ : String) => false
)
connectedStreams.flatMap(
(_ : Int) => true,
(_ : String) => false
)
Split DataStream → SplitStream
Split the stream into two or more streams according to some criterion.
代码语言:javascript复制val split = someDataStream.split(
(num: Int) =>
(num % 2) match {
case 0 => List("even")
case 1 => List("odd")
}
)
Select SplitStream → DataStream
从拆分流中选择一个或多个流.
代码语言:javascript复制val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")
Iterate DataStream → IterativeStream → DataStream
通过将一个运算符的输出重定向到某个先前的运算符,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码从流开始,并连续应用迭代主体。大于0的元素将被发送回反馈通道,其余元素将被转发到下游.
代码语言:javascript复制initialStream.iterate {
iteration => {
val iterationBody = iteration.map {/*do something*/}
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}
Extract Timestamps DataStream → DataStream
从记录中提取时间戳,以便与使用事件时间语义的窗口一起使用。参见事件时间.
代码语言:javascript复制stream.assignTimestamps { timestampExtractor }
2. Physical partitioning
Custom partitioning DataStream → DataStream
使用用户定义的分区程序为每个元素选择目标任务.
代码语言:javascript复制dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)
Random partitioning DataStream → DataStream
根据均匀分布对元素进行随机划分.
代码语言:javascript复制dataStream.shuffle()
Rebalancing (Round-robin partitioning) DataStream → DataStream
分区元素轮循,每个分区创建相等的负载。在存在数据偏斜的情况下对性能优化有用.
代码语言:javascript复制dataStream.rebalance()
Rescaling DataStream → DataStream
将元素循环地分区到下游操作的子集。如果您希望拥有管道,例如,从源的每个并行实例散开到几个映射器的子集以分配负载,但又不希望 rebalance() 引起完全的重新平衡,则这很有用。这将仅需要本地数据传输,而不需要通过网络传输数据,这取决于其他配置值,例如TaskManager的插槽数。上游操作向其发送元素的下游操作的子集取决于两个上游操作的并行度和下游操作。例如,如果上游操作具有并行性2,而下游操作具有并行性4,则一个上游操作将元素分配给两个下游操作,而另一个上游操作将分配给另外两个下游操作。另一方面,如果下游操作具有并行性2而上游操作具有并行性4,则两个上游操作将分配给一个下游操作,而其他两个上游操作将分配给其他下游操作。彼此不是整数倍,一个或几个下游操作将具有与上游操作不同的输入数量。请参见此图以查看上例中的连接模式:
代码语言:javascript复制dataStream.rescale()
Broadcasting DataStream → DataStream
向每个分区广播元素.
代码语言:javascript复制dataStream.broadcast()
3. Task chaining and resource groups
Start new chain
从此运算符开始,开始新的链。两个映射器将被链接,并且过滤器将不会链接到第一个映射器.
代码语言:javascript复制someStream.filter(...).map(...).startNewChain().map(...)
Disable chaining
禁止将链路OperateChain的连接操作
代码语言:javascript复制someStream.map(...).disableChaining()
Set slot sharing group
设置操作的插槽共享组。Flink会将具有相同插槽共享组的操作放入同一插槽,同时将没有插槽共享组的操作保留在其他插槽中。这可以用来隔离插槽。如果所有输入操作都在同一插槽共享组中,则插槽共享组将从输入操作继承。默认插槽共享组的名称为“ default”,可以通过调用slotSharingGroup(“ default”)将操作显式放入该组中。.
代码语言:javascript复制someStream.filter(...).slotSharingGroup("name")
作者:Johngo