flink join,Flink是下一代大数据计算平台,可处理流计算和批量计算。《Flink-1.9流计算开发:十五、join函数》cosmozhu写的本系列文章的第十五篇。通过简单的DEMO来演示join函数执行的效果 。
需求
当前有一个订单流(每秒1个订单),一个人民币-美元汇率流(每10秒发布一个新汇率)。实时计算一个汇率窗口期(10秒)内,订单的外汇金额。
解决方案
代码语言:javascript复制public class StreamTest {
private static final Logger LOG = LoggerFactory.getLogger(StreamTest.class);
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//CNY -> USD 汇率流
SingleOutputStreamOperator<ExchangeRateInfo> usdToCny = env.addSource(new ExchangeRateDataSource(CurrencyType.CNY, CurrencyType.USD, 7, 6),"USD-CNY")
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ExchangeRateInfo>(Time.milliseconds(100)) {
@Override
public long extractTimestamp(ExchangeRateInfo element) {
return element.getTimeStamp().getTime();
}
});
//订单流
SingleOutputStreamOperator<OrderInfo> orderDs = env.addSource(new OrderDataSource())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderInfo>(Time.milliseconds(100)) {
@Override
public long extractTimestamp(OrderInfo element) {
return element.getTimeStamp().getTime();
}
});
//订单流inner join 汇率流
orderDs.join(usdToCny)
.where(new KeySelector<OrderInfo, CurrencyType>() {
private static final long serialVersionUID = 1L;
@Override
public CurrencyType getKey(OrderInfo value) throws Exception {
return value.getCurrencyType();
}
})
.equalTo(new KeySelector<ExchangeRateInfo, CurrencyType>() {
private static final long serialVersionUID = 1L;
@Override
public CurrencyType getKey(ExchangeRateInfo value) throws Exception {
return value.getFrom();
}
})
//转换20s窗口期内的订单金额为美元
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
//将订单金额人民币转换为美元
.apply(new JoinFunction<OrderInfo, ExchangeRateInfo, String>() {
private static final long serialVersionUID = 1L;
@Override
public String join(OrderInfo first, ExchangeRateInfo second) throws Exception {
return "$" first.getTotalAmt().divide(second.getCoefficient(), 2,BigDecimal.ROUND_HALF_UP).toPlainString();
}
})
.print();
env.execute("Flink Streaming Java API Skeleton");
}
}
执行效果
代码语言:javascript复制3> $125.05
3> $625.79
3> $95.79
3> $587.56
3> $387.35
3> $278.63
3> $40.45
3> $578.36
3> $15.48
3> $236.96
小结
Flink的join和传统数据库的join有相似之处。上面代码如果用SQL语句可以表示为:
代码语言:javascript复制select * from OrderData a,ExchangeRate b where a.CurrencyType = b.From;
本例在窗口期(10秒)中对金额做转换的流程如下图所示:
代码地址
代码语言:javascript复制https://github.com/chaoxxx/learn-flink-stream-api/blob/master/src/main/java/fun/cosmozhu/session15/main/StreamTest.java
作者:cosmozhu --90后的老父亲,专注于保护地球的程序员
个人网站:https://www.cosmozhu.fun
欢迎转载,转载时请注明出处。
相关文章
- Flink-1.9流计算开发:十六、intervalJoin函数
- Flink-1.9流计算开发:十四、union函数
- Flink-1.9流计算开发:十三、min、minBy、max、maxBy函数
- Flink-1.9流计算开发:十二、apply函数
- Flink-1.9流计算开发:十一、count-window-Sliding窗口函数