Map
输入DataStream返回DataStream。
接收一个元素,产生一个元素。下面是转换为双倍值的MapFunction
代码语言:javascript复制DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
FlatMap
输入DataStream返回DataStream。
接收一个元素,产出0个,1个,或者更多的元素。下面是一个字符串拆分为多个字符串的FlatMap
代码语言:javascript复制dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
Filter
输入DataStream返回DataStream。
过滤数据,function返回为true的会被保留,为false的会被排除。
代码语言:javascript复制dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
KeyBy
输入DataStream返回KeyedStream。
按照key将数据拆分为不同的集合,具有相同key的数据放到同一个集合,内部使用hashCode来判断是否属于同一个key。
代码语言:javascript复制dataStream.keyBy(value -> value.getSomeKey());
dataStream.keyBy(value -> value.f0);
Reduce
输入 KeyedStream 返回 DataStream。
将按照key拆分的集合滚动处理。合并当前元素和最后一次合并的结果,然后返回一个新的值。
代码语言:javascript复制keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 value2;
}
});
Window
输入 KeyedStream 返回 WindowedStream。
窗口可以定义在KeyedStreams上,窗口可以将每个key的数据按照某种特征分组,点击链接 windows 查看完整的针对窗口的描述
代码语言:javascript复制dataStrea。m
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)));
WindowAll
输入 DataStream 返回 AllWindowedStream。
窗口可以定义在常规的DataStream上。窗口可以将数据按照某种特征分组,点击链接 windows 查看完整的针对窗口的描述
代码语言:javascript复制dataStream
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
Window Apply
输入 WindowedStream或者AllWindowedStream 输出 DataStream 。
应用一个函数reduce到窗口。点击链接 windows 查看完整的针对窗口的描述
代码语言:javascript复制windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
public void apply (Tuple tuple,
Window window,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum = t.f1;
}
out.collect (new Integer(sum));
}
});
代码语言:javascript复制WindowFunction 类型参数: – 输入值的类型。 – 输出值的类型。 – 密钥的类型。 – 可以应用此窗口函数的Window类型。
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
public void apply (Window window,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum = t.f1;
}
out.collect (new Integer(sum));
}
});
AllWindowFunction类型参数: – 输入值的类型。 – 输出值的类型。 – 可以应用此窗口函数的Window类型。
WindowReduce
输入 WindowedStream 输出 DataStream 。
应用一个Reduct函数到窗口,并返回合并后的值。
代码语言:javascript复制windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String,Integer>(value1.f0, value1.f1 value2.f1);
}
});
Union
合并两个或更多的流,返回新的流包含所有流中的元素。包含重复的。
代码语言:javascript复制dataStream.union(otherStream1, otherStream2, ...);
Window Join
基于指定的key和共同窗口join两个数据流,返回一个新的数据流。
代码语言:javascript复制dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});
Interval Join
输入 KeyedStream,返回一个数据流。
基于在指定时间间隔内的共同key,Join 两个KeyedStream的流。
代码语言:javascript复制// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs 2
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {...});
Window CoGroup
输入两个数据流,返回一个数据流。
将两个流按照指定key和公共窗口合并,某些键可能只包含在两个原始数据集之一中。 在这种情况下,对于不包含具有该特定键的元素的数据集一侧,将使用空输入调用 CoGroup 函数。 CoGroupFunction类型参数:
代码语言:javascript复制– 第一个输入数据集的数据类型。 – 第二个输入数据集的数据类型。 – 返回元素的数据类型。
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});
Connect
输入 DataStream,DataStream ,返回ConnectedStream。
连接两个数据流保持原有类型。连接允许两个流之间共享状态。
代码语言:javascript复制DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
CoMap,CoFlatMap
输入 ConnectedStream 输出 DataStream 。
类似于已关联数据流上的map和flatMap。
代码语言:javascript复制connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});
Iterate
通过将一个操作符的输出重定向到之前的某个操作符,在流中创建一个“反馈”循环。这对于定义不断更新模型的算法尤其有用。下面的代码从一个流开始,并不断地应用迭代体。大于0的元素被发送回反馈通道,其余的元素被向下转发。
代码语言:javascript复制IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value <= 0;
}
});