Flink 的数据流算子

2021-12-07 16:05:15 浏览数 (1)

Map

输入DataStream返回DataStream。

接收一个元素,产生一个元素。下面是转换为双倍值的MapFunction

代码语言:javascript复制
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});

FlatMap

输入DataStream返回DataStream。

接收一个元素,产出0个,1个,或者更多的元素。下面是一个字符串拆分为多个字符串的FlatMap

代码语言:javascript复制
dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

Filter

输入DataStream返回DataStream。

过滤数据,function返回为true的会被保留,为false的会被排除。

代码语言:javascript复制
dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});

KeyBy

输入DataStream返回KeyedStream。

按照key将数据拆分为不同的集合,具有相同key的数据放到同一个集合,内部使用hashCode来判断是否属于同一个key。

代码语言:javascript复制
dataStream.keyBy(value -> value.getSomeKey());
dataStream.keyBy(value -> value.f0);

Reduce

输入 KeyedStream 返回 DataStream。

将按照key拆分的集合滚动处理。合并当前元素和最后一次合并的结果,然后返回一个新的值。

代码语言:javascript复制
keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1   value2;
    }
});

Window

输入 KeyedStream 返回 WindowedStream。

窗口可以定义在KeyedStreams上,窗口可以将每个key的数据按照某种特征分组,点击链接 windows 查看完整的针对窗口的描述

代码语言:javascript复制
dataStrea。m
  .keyBy(value -> value.f0)
  .window(TumblingEventTimeWindows.of(Time.seconds(5))); 

WindowAll

输入 DataStream 返回 AllWindowedStream。

窗口可以定义在常规的DataStream上。窗口可以将数据按照某种特征分组,点击链接 windows 查看完整的针对窗口的描述

代码语言:javascript复制
dataStream
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
Window Apply

输入 WindowedStream或者AllWindowedStream 输出 DataStream 。

应用一个函数reduce到窗口。点击链接 windows 查看完整的针对窗口的描述

代码语言:javascript复制
windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum  = t.f1;
        }
        out.collect (new Integer(sum));
    }
});

WindowFunction 类型参数: – 输入值的类型。 – 输出值的类型。 – 密钥的类型。 – 可以应用此窗口函数的Window类型。

代码语言:javascript复制
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum  = t.f1;
        }
        out.collect (new Integer(sum));
    }
});

AllWindowFunction类型参数: – 输入值的类型。 – 输出值的类型。 – 可以应用此窗口函数的Window类型。

WindowReduce

输入 WindowedStream 输出 DataStream 。

应用一个Reduct函数到窗口,并返回合并后的值。

代码语言:javascript复制
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<String,Integer>(value1.f0, value1.f1   value2.f1);
    }
});
Union

合并两个或更多的流,返回新的流包含所有流中的元素。包含重复的。

代码语言:javascript复制
dataStream.union(otherStream1, otherStream2, ...);
Window Join

基于指定的key和共同窗口join两个数据流,返回一个新的数据流。

代码语言:javascript复制
dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});
Interval Join

输入 KeyedStream,返回一个数据流。

基于在指定时间间隔内的共同key,Join 两个KeyedStream的流。

代码语言:javascript复制
// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs   2
keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
    .upperBoundExclusive(true) // optional
    .lowerBoundExclusive(true) // optional
    .process(new IntervalJoinFunction() {...});
Window CoGroup

输入两个数据流,返回一个数据流。

将两个流按照指定key和公共窗口合并,某些键可能只包含在两个原始数据集之一中。 在这种情况下,对于不包含具有该特定键的元素的数据集一侧,将使用空输入调用 CoGroup 函数。 CoGroupFunction类型参数:

– 第一个输入数据集的数据类型。 – 第二个输入数据集的数据类型。 – 返回元素的数据类型。

代码语言:javascript复制
dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});
Connect

输入 DataStream,DataStream ,返回ConnectedStream。

连接两个数据流保持原有类型。连接允许两个流之间共享状态。

代码语言:javascript复制
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
CoMap,CoFlatMap

输入 ConnectedStream 输出 DataStream 。

类似于已关联数据流上的map和flatMap。

代码语言:javascript复制
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});
Iterate

通过将一个操作符的输出重定向到之前的某个操作符,在流中创建一个“反馈”循环。这对于定义不断更新模型的算法尤其有用。下面的代码从一个流开始,并不断地应用迭代体。大于0的元素被发送回反馈通道,其余的元素被向下转发。

代码语言:javascript复制
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value <= 0;
    }
});

0 人点赞