Flink 窗口剔除器 Evictor

2021-09-08 11:13:29 浏览数 (1)

1. 简介

除了 WindowAssignerTrigger 之外,Flink 的窗口模型还允许指定一个可选的 Evictor。Evictor 提供了在使用 WindowFunction 之前或者之后从窗口中删除元素的能力。为此,Evictor 接口提供了两个方法:

代码语言:javascript复制
public interface Evictor<T, W extends Window> extends Serializable {
  // 可选的删除元素,在窗口函数之前调用
  void evictBefore(Iterable<TimestampedValue<T>> elements,
      int size, W window, EvictorContext evictorContext);

  // 可选的删除元素,在窗口函数之后调用
  void evictAfter(Iterable<TimestampedValue<T>> elements,
      int size, W window, EvictorContext evictorContext);

  interface EvictorContext {
      // 当前处理时间
      long getCurrentProcessingTime();
      MetricGroup getMetricGroup();
      // 当前Watermark
      long getCurrentWatermark();
  }
}

evictBefore() 用于在使用窗口函数之前从窗口中删除元素,而 evictAfter() 用于在使用窗口函数之后从窗口中删除元素。

2. 内置 Evictor

Flink 本身内置实现了三种 Evictor,分别是 CountEvictor、DeltaEvictor 和 TimeEvictor。

默认情况下,所有内置的 Evictors 都是在触发窗口函数之前使用。

2.1 CountEvictor

CountEvictor 用于在窗口中保留用户指定数量的元素。如果窗口中的元素超过用户指定的阈值,会从窗口头部开始删除剩余元素。

2.1.1 内部实现

CountEvictor 需要实现 Evictor 接口的 evictBefore 和 evictAfter 方法,以实现调用窗口函数之前和之后的窗口元素删除逻辑:

代码语言:javascript复制
private final boolean doEvictAfter;
@Override
public void evictBefore(
        Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
    if (!doEvictAfter) {
        evict(elements, size, ctx);
    }
}

@Override
public void evictAfter(
        Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
    if (doEvictAfter) {
        evict(elements, size, ctx);
    }
}

doEvictAfter 是在构造 CountEvictor 时传入的一个变量,用以指定是否在使用窗口函数之后对元素进行删除操作。如果不指定,默认为 false,即在使用窗口函数之后不对元素进行删除。从上面代码中可以看出,不论是 evictBefore,还是 evictAfter,最后都会调用 evict() 方法:

代码语言:javascript复制
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
    if (size <= maxCount) {
        return;
    } else {
        int evictedCount = 0;
        for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
                iterator.hasNext(); ) {
            iterator.next();
            evictedCount  ;
            if (evictedCount > size - maxCount) {
                break;
            } else {
                iterator.remove();
            }
        }
    }
}

从上面可以看出,如果当前窗口元素个数小于等于用户指定的阈值则不做删除操作,否则会从窗口迭代器的头部开始删除多余的元素(size - maxCount)。

2.1.2 如何使用

如下代码所示,在触发使用窗口函数之前保留2个元素:

代码语言:javascript复制
DataStream<Tuple2<String, Long>> result = stream
    // 格式转换
    .map(tuple -> Tuple2.of(tuple.f0, tuple.f1)).returns(Types.TUPLE(Types.STRING, Types.LONG))
    // 根据key分组
    .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
        @Override
        public String getKey(Tuple2<String, Long> value) throws Exception {
            return value.f0;
        }
    })
    // 处理时间滚动窗口 滚动大小60s
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    // 在触发使用窗口函数之前保留2个元素
    .evictor(CountEvictor.of(2))
    // 窗口函数
    .process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
        @Override
        public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<Tuple2<String, Long>> out) throws Exception {
            // Watermark
            long watermark = context.currentWatermark();
            String watermarkTime = DateUtil.timeStamp2Date(watermark);
            // 窗口开始与结束时间
            TimeWindow window = context.window();
            String start = DateUtil.timeStamp2Date(window.getStart());
            String end = DateUtil.timeStamp2Date(window.getEnd());
            // 窗口中元素
            List<Long> values = Lists.newArrayList();
            for (Tuple2<String, Long> element : elements) {
                values.add(element.f1);
            }
            LOG.info("[Process] Key: {}, Watermark: [{}|{}], Window: [{}|{}, {}|{}], Values: {}",
                    key, watermarkTime, watermark, start, window.getStart(), end, window.getEnd(), values
            );
        }
    });

完整代码请查阅CountEvictorExample

假如输入流如下所示,我们一起看看输出效果:

代码语言:javascript复制
A,1,2021-08-30 12:07:20
A,2,2021-08-30 12:07:22
A,3,2021-08-30 12:07:33
A,4,2021-08-30 12:07:44
A,5,2021-08-30 12:07:55
A,6,2021-08-30 12:08:34
A,7,2021-08-30 12:08:45
A,8,2021-08-30 12:08:56
A,9,2021-08-30 12:09:30
A,10,2021-08-30 12:09:35

2.2 DeltaEvictor

根据用户自定的 DeltaFunction 函数来计算窗口中最后一个元素与其余每个元素之间的差值,如果差值大于等于用户指定的阈值就会删除该元素。

2.2.1 内部实现

DeltaEvictor 与 CountEvictor 一样,都需要实现 Evictor 接口的 evictBefore 和 evictAfter 方法,只是最终调用的 evict() 函数的内部实现逻辑不一样:

代码语言:javascript复制
private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {
    // 窗口最后一个元素
    TimestampedValue<T> lastElement = Iterables.getLast(elements);
    // 遍历整个窗口,与每一个元素进行比较
    for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext(); ) {
        TimestampedValue<T> element = iterator.next();
        if (deltaFunction.getDelta(element.getValue(), lastElement.getValue())
                >= this.threshold) {
            iterator.remove();
        }
    }
}

deltaFunction 函数以及 threshold 变量是在构造函数中传入的。

首先获取窗口中的最后一个元素并遍历整个窗口,然后调用用户指定的 deltaFunction 计算与每一个元素的差值。如果差值大于等于用户自定的阈值就删除该元素。

2.2.2 如何使用

如下代码所示,在触发窗口函数计算之前剔除与最后一个元素值差大于等于1的元素:

代码语言:javascript复制
DataStream<Tuple2<String, Long>> result = stream
    // 格式转换
    .map(tuple -> Tuple2.of(tuple.f0, tuple.f1)).returns(Types.TUPLE(Types.STRING, Types.LONG))
    // 根据key分组
    .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
        @Override
        public String getKey(Tuple2<String, Long> value) throws Exception {
            return value.f0;
        }
    })
    // 处理时间滚动窗口 滚动大小60s
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    // 剔除与最后一个元素值差大于1的元素
    .evictor(DeltaEvictor.of(1, new DeltaFunction<Tuple2<String, Long>>() {
        @Override
        public double getDelta(Tuple2<String, Long> oldDataPoint, Tuple2<String, Long> newDataPoint) {
            return oldDataPoint.f1 - newDataPoint.f1;
        }
    }))
    // 窗口函数
    .process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
        @Override
        public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<Tuple2<String, Long>> out) throws Exception {
            // Watermark
            long watermark = context.currentWatermark();
            String watermarkTime = DateUtil.timeStamp2Date(watermark);
            // 窗口开始与结束时间
            TimeWindow window = context.window();
            String start = DateUtil.timeStamp2Date(window.getStart());
            String end = DateUtil.timeStamp2Date(window.getEnd());
            // 窗口中元素
            List<Long> values = Lists.newArrayList();
            for (Tuple2<String, Long> element : elements) {
                values.add(element.f1);
            }
            LOG.info("[Process] Key: {}, Watermark: [{}|{}], Window: [{}|{}, {}|{}], Values: {}",
                    key, watermarkTime, watermark, start, window.getStart(), end, window.getEnd(), values
            );
        }
    });

完整代码请查阅 DeltaEvictorExample

假如输入流如下所示,我们一起看看输出效果:

代码语言:javascript复制
A,4,2021-08-30 12:07:20
A,1,2021-08-30 12:07:22
A,3,2021-08-30 12:07:33
A,6,2021-08-30 12:07:44
A,3,2021-08-30 12:07:55
A,6,2021-08-30 12:08:34
A,5,2021-08-30 12:08:45
A,1,2021-08-30 12:08:56
A,6,2021-08-30 12:09:30

2.3 TimeEvictor

以毫秒为单位的时间间隔 windowSize 作为参数,在窗口所有元素中找到最大时间戳 max_ts 并删除所有时间戳小于 max_ts - windowSize 的元素。我们可以理解为只保留最新 windowSize 毫秒内的元素。

2.3.1 内部实现

TimeEvictor 与 DeltaEvictor、CountEvictor 一样,都需要实现 Evictor 接口的 evictBefore 和 evictAfter 方法,只是最终调用的 evict() 函数的内部实现逻辑不一样:

代码语言:javascript复制
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
    if (!hasTimestamp(elements)) {
        return;
    }
    // 最大时间戳
    long currentTime = getMaxTimestamp(elements);
    // windowSize 保留元素的时间间隔
    long evictCutoff = currentTime - windowSize;
    for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
            iterator.hasNext(); ) {
        TimestampedValue<Object> record = iterator.next();
        if (record.getTimestamp() <= evictCutoff) {
            iterator.remove();
        }
    }
}
// 第一个元素是否有时间戳
private boolean hasTimestamp(Iterable<TimestampedValue<Object>> elements) {
    Iterator<TimestampedValue<Object>> it = elements.iterator();
    if (it.hasNext()) {
        return it.next().hasTimestamp();
    }
    return false;
}
// 窗口中最大时间戳
private long getMaxTimestamp(Iterable<TimestampedValue<Object>> elements) {
    long currentTime = Long.MIN_VALUE;
    for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
            iterator.hasNext(); ) {
        TimestampedValue<Object> record = iterator.next();
        currentTime = Math.max(currentTime, record.getTimestamp());
    }
    return currentTime;
}

首先获取当前窗口中最大的时间戳,减去用户指定时间间隔 windowSize,得到一个 evictCutoff,然后遍历窗口全部元素,删除时间戳小于等于 evictCutoff 的元素。

2.3.2 如何使用

如下代码所示,在触发窗口函数计算之前只保留最近10s内的元素:

代码语言:javascript复制
DataStream<Tuple2<String, Long>> result = stream
    // 格式转换
    .map(tuple -> Tuple2.of(tuple.f0, tuple.f1)).returns(Types.TUPLE(Types.STRING, Types.LONG))
    // 根据key分组
    .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
        @Override
        public String getKey(Tuple2<String, Long> value) throws Exception {
            return value.f0;
        }
    })
    // 处理时间滚动窗口 滚动大小60s
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    // 保留窗口中最近10s内的元素
    .evictor(TimeEvictor.of(Time.seconds(10)))
    // 窗口函数
    .process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
        @Override
        public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<Tuple2<String, Long>> out) throws Exception {
            // Watermark
            long watermark = context.currentWatermark();
            String watermarkTime = DateUtil.timeStamp2Date(watermark);
            // 窗口开始与结束时间
            TimeWindow window = context.window();
            String start = DateUtil.timeStamp2Date(window.getStart());
            String end = DateUtil.timeStamp2Date(window.getEnd());
            // 窗口中元素
            List<Long> values = Lists.newArrayList();
            for (Tuple2<String, Long> element : elements) {
                values.add(element.f1);
            }
            LOG.info("[Process] Key: {}, Watermark: [{}|{}], Window: [{}|{}, {}|{}], Values: {}",
                    key, watermarkTime, watermark, start, window.getStart(), end, window.getEnd(), values
            );
        }
    });

完整代码请查阅TimeEvictorExample

假如输入流如下所示,我们一起看看输出效果:

代码语言:javascript复制
A,1,2021-08-30 12:07:20
A,2,2021-08-30 12:07:22
A,3,2021-08-30 12:07:44
A,4,2021-08-30 12:07:55
A,5,2021-08-30 12:07:54
A,6,2021-08-30 12:08:34
A,7,2021-08-30 12:08:45
A,8,2021-08-30 12:08:56
A,9,2021-08-30 12:09:30

欢迎关注我的公众号和博客:

相关推荐:

  • Flink 窗口之Window机制
  • Flink 窗口分配器 WindowAssigner
  • Flink 窗口处理函数 WindowFunction
  • Flink 窗口触发器 Trigger

参考:

  • Evictors

0 人点赞