Flink是下一代大数据计算平台,可处理流计算和批量计算。《Flink-1.9流计算开发:十三、min、minBy、max、maxBy函数》cosmozhu写的本系列文章的第十三篇。通过简单的DEMO来演示min、minBy、max、maxBy函数执行的效果 。
需求
本篇文章我们来区分min(max)与minBy(maxBy)之间的区别,下面案例是每10秒计算一次最近1分钟的最小值订单。
解决方案
代码语言:javascript复制 public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 添加自定义数据源,每秒发出一笔订单信息{商品名称,商品数量}
DataStreamSource<Tuple2<String, Integer>> orderSource = env
.addSource(new SourceFunction<Tuple2<String, Integer>>() {
private volatile boolean isRunning = true;
private final Random random = new Random();
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
while (isRunning) {
TimeUnit.SECONDS.sleep(1);
Tuple2<String, Integer> t = Tuple2.of(TYPE[random.nextInt(TYPE.length)], random.nextInt(1000));
LOG.info("提交数据:" t);
ctx.collect(t);
}
}
@Override
public void cancel() {
isRunning = false;
}
}, "order-info");
orderSource
.timeWindowAll(Time.minutes(1),Time.seconds(10))
.min(1)
// .minBy(1)
.print();
env.execute("Flink Streaming Java API Skeleton");
}
执行效果
执行min函数,我们可以看出min函数确实返回了最小值,但是最小值前面对应的商品名称却对应不上。
代码语言:javascript复制15:52:20,261 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(梨,542)
15:52:21,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(梨,331)
15:52:22,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(苹果,894)
15:52:23,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(梨,791)
15:52:24,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(葡萄,836)
15:52:25,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(火龙果,198)
15:52:26,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(苹果,221)
15:52:27,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(西瓜,309)
15:52:28,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(葡萄,553)
15:52:29,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(火龙果,969)
2> (梨,198)
15:52:30,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(梨,981)
15:52:31,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(梨,483)
15:52:32,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(梨,381)
15:52:33,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(梨,982)
15:52:34,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(火龙果,738)
15:52:35,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(西瓜,654)
15:52:36,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(火龙果,124)
15:52:37,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(苹果,703)
15:52:38,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(葡萄,996)
15:52:39,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(西瓜,127)
3> (梨,124)
15:52:40,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(西瓜,249)
15:52:41,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(梨,705)
15:52:42,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(火龙果,37)
15:52:43,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(苹果,647)
15:52:44,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(葡萄,842)
15:52:45,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(葡萄,475)
15:52:46,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(火龙果,999)
15:52:47,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(葡萄,994)
15:52:48,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(苹果,417)
15:52:49,282 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(葡萄,830)
执行minBy函数,我们可以看出minBy返回的最小值,并且对应的商品名称也是正确的
代码语言:javascript复制15:59:06,657 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(苹果,935)
15:59:07,684 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(葡萄,638)
15:59:08,684 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(梨,485)
15:59:09,684 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(苹果,720)
4> (梨,485)
15:59:10,684 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(梨,407)
15:59:11,684 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(西瓜,353)
15:59:12,684 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(西瓜,76)
15:59:13,684 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(苹果,629)
15:59:14,684 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(苹果,974)
15:59:15,684 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(西瓜,263)
15:59:16,684 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(西瓜,840)
15:59:17,684 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(火龙果,220)
15:59:18,684 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(苹果,27)
15:59:19,685 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(西瓜,94)
1> (苹果,27)
15:59:20,685 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(西瓜,733)
15:59:21,685 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(火龙果,390)
15:59:22,685 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(苹果,766)
15:59:23,685 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(苹果,321)
15:59:24,685 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(苹果,784)
15:59:25,685 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(梨,781)
15:59:26,685 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(火龙果,459)
15:59:27,685 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(葡萄,481)
15:59:28,685 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(西瓜,91)
15:59:29,685 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(葡萄,663)
2> (苹果,27)
15:59:30,685 INFO fun.cosmozhu.session13.StreamTest - 提交数据:(葡萄,183)
小结
从上面的案例我们可以分析得出:
- min只返回计算的最小值,而最小值对应的其他数据不保证正确。
- minBy返回计算的最小值,并且最小值对应的其他数据是保证正确的。
max和maxBy与其相似
代码地址
代码语言:javascript复制https://github.com/chaoxxx/learn-flink-stream-api/blob/master/src/main/java/fun/cosmozhu/session13/StreamTest.java
作者:cosmozhu --90后的老父亲,专注于保护地球的程序员
个人网站:https://www.cosmozhu.fun
欢迎转载,转载时请注明出处。
相关文章
- Flink-1.9流计算开发:十六、intervalJoin函数
- Flink-1.9流计算开发:十五、join函数
- Flink-1.9流计算开发:十四、union函数
- Flink-1.9流计算开发:十二、apply函数
- Flink-1.9流计算开发:十一、count-window-Sliding窗口函数