flink基础之wordcount计算

2024-06-11 19:39:14 浏览数 (2)

在学习大数据,最基础的入门程序就是计算wordcount,即统计每个单词出现的次数

回顾一下flink程序的基础步骤 :1、获取环境 2、配置基础环境的配置(checkpoint、并行度之类) 3、执行的业务的逻辑

代码语言:javascript复制
 public static void main(String[] args) throws Exception {

        // 1. 创建流式执行环境
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(new Configuration());

        // 2. 读取文本流:hadoop102表示发送端主机名、7777表示端口号
        DataStreamSource<String> lineStream = env.socketTextStream("hadoop102", 7777);

        // 3. 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = 
                 //使用了匿名函数
                 lineStream.flatMap((String line, Collector<String> out) -> {
                    //执行清洗数据的功能  把 "hello world" 变成了 "hello" "world"
                    String[] words = line.split(" ");
                    for (String word : words) {
                        out.collect(word);//使用了收集器收集,并把返回的结果返回给
                    }
                }).returns(Types.STRING)
                //进行了数据的格式转换  从string格式 变成了 Tuple2<String, Long>格式
                .map(word -> Tuple2.of(word,1L))  
                .returns(Types.TUPLE(Types.STRING, Types.LONG))//java 的泛型擦除
                //按照上游传下来的Tuple2<String, Long>格式对第一个数据进行分区
                .keyBy(data -> data.f0)
                //按照上游传下来的Tuple2<String, Long>格式对第二个位置(从0开始读)的数据进行sum聚合
                .sum(1);

        // 4. 打印
        sum.print();

        // 5. 执行
        env.execute();
    }

0 人点赞