在学习大数据,最基础的入门程序就是计算wordcount,即统计每个单词出现的次数
回顾一下flink程序的基础步骤 :1、获取环境 2、配置基础环境的配置(checkpoint、并行度之类) 3、执行的业务的逻辑
代码语言:javascript复制 public static void main(String[] args) throws Exception {
// 1. 创建流式执行环境
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(new Configuration());
// 2. 读取文本流:hadoop102表示发送端主机名、7777表示端口号
DataStreamSource<String> lineStream = env.socketTextStream("hadoop102", 7777);
// 3. 转换、分组、求和,得到统计结果
SingleOutputStreamOperator<Tuple2<String, Long>> sum =
//使用了匿名函数
lineStream.flatMap((String line, Collector<String> out) -> {
//执行清洗数据的功能 把 "hello world" 变成了 "hello" "world"
String[] words = line.split(" ");
for (String word : words) {
out.collect(word);//使用了收集器收集,并把返回的结果返回给
}
}).returns(Types.STRING)
//进行了数据的格式转换 从string格式 变成了 Tuple2<String, Long>格式
.map(word -> Tuple2.of(word,1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG))//java 的泛型擦除
//按照上游传下来的Tuple2<String, Long>格式对第一个数据进行分区
.keyBy(data -> data.f0)
//按照上游传下来的Tuple2<String, Long>格式对第二个位置(从0开始读)的数据进行sum聚合
.sum(1);
// 4. 打印
sum.print();
// 5. 执行
env.execute();
}