Flink 自定义 countAndTimeTrigger

2020-05-21 10:38:34 浏览数 (1)

1.背景

项目中需要自定义 trigger,需要基于两个条件:1. count 即 msg 的个数,当个数大于某个数时触发窗口 2. time 即每个固定的时间触发窗口

2.代码样例

代码语言:javascript复制
/**
 * @author shengjk1
 * @date 2019/9/4
 */
public class CountAndTimeTrigger extends Trigger<Object, TimeWindow> {
	private static final long serialVersionUID = 1L;
	
	private final long maxCount;
	
	private final ReducingStateDescriptor<Long> stateDesc =
			new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
	
	public CountAndTimeTrigger(long maxCount) {
		super();
		this.maxCount = maxCount;
	}
	
	
	@Override
	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
		ctx.registerProcessingTimeTimer(window.maxTimestamp());
		
		ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
		count.add(1L);
		if (count.get() >= maxCount) {
			count.clear();
			return TriggerResult.FIRE_AND_PURGE;
		}
		return TriggerResult.CONTINUE;
	}
	
	@Override
	public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
		return TriggerResult.FIRE;
	}
	
	@Override
	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
		return TriggerResult.CONTINUE;
	}
	
	@Override
	public boolean canMerge() {
		return false;
	}
	
	@Override
	public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
		ctx.mergePartitionedState(stateDesc);
		long windowMaxTimestamp = window.maxTimestamp();
		if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
			ctx.registerProcessingTimeTimer(windowMaxTimestamp);
		}
	}
	
	@Override
	public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
		ctx.getPartitionedState(stateDesc).clear();
		ctx.deleteProcessingTimeTimer(window.maxTimestamp());
	}
	
	
	private static class Sum implements ReduceFunction<Long> {
		private static final long serialVersionUID = 1L;
		
		@Override
		public Long reduce(Long value1, Long value2) throws Exception {
			return value1   value2;
		}
	}
}

0 人点赞