1.背景
项目中需要自定义 trigger,需要基于两个条件:1. count 即 msg 的个数,当个数大于某个数时触发窗口 2. time 即每个固定的时间触发窗口
2.代码样例
代码语言:javascript复制/**
* @author shengjk1
* @date 2019/9/4
*/
public class CountAndTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private final long maxCount;
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
public CountAndTimeTrigger(long maxCount) {
super();
this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public boolean canMerge() {
return false;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(stateDesc);
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
ctx.registerProcessingTimeTimer(windowMaxTimestamp);
}
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 value2;
}
}
}