Flink技术整理

2021-01-06 10:55:30 浏览数 (1)

首先先拉取Flink的样例代码

代码语言:javascript复制
mvn archetype:generate                               
      -DarchetypeGroupId=org.apache.flink              
      -DarchetypeArtifactId=flink-quickstart-java      
      -DarchetypeVersion=1.7.2                         
      -DarchetypeCatalog=local

实现从文件读取的批处理

建立一个hello.txt,文件内容如下

hello world welcome hello welcome

统计词频

代码语言:javascript复制
public class BatchJavaApp {
    public static void main(String[] args) throws Exception {
        String input = "/Users/admin/Downloads/flink/data";
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> text = env.readTextFile(input);
        text.print();
        text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] tokens = value.toLowerCase().split(" ");
                for (String token : tokens) {
                    collector.collect(new Tuple2<>(token,1));
                }
            }
        }).groupBy(0).sum(1).print();
    }
}

运行结果(日志省略)

代码语言:javascript复制
hello welcome
hello world welcome
(world,1)
(hello,2)
(welcome,2)

纯Java实现

文件读取类

代码语言:javascript复制
public class FileOperation {
    /**
     * 读取文件名称为filename中的内容,并将其中包含的所有词语放进words中
     * @param filename
     * @param words
     * @return
     */
    public static boolean readFile(String filename, List<String> words) {
        if (filename == null || words == null) {
            System.out.println("filename为空或words为空");
            return false;
        }
        Scanner scanner;

        try {
            File file = new File(filename);
            if (file.exists()) {
                FileInputStream fis = new FileInputStream(file);
                scanner = new Scanner(new BufferedInputStream(fis),"UTF-8");
                scanner.useLocale(Locale.ENGLISH);
            }else {
                return false;
            }
        } catch (FileNotFoundException e) {
            System.out.println("无法打开"   filename);
            return false;
        }
        //简单分词
        if (scanner.hasNextLine()) {
            String contents = scanner.useDelimiter("\A").next();
            int start = firstCharacterIndex(contents,0);
            for (int i = start   1;i <= contents.length();) {
                if (i == contents.length() || !Character.isLetter(contents.charAt(i))) {
                    String word = contents.substring(start,i).toLowerCase();
                    words.add(word);
                    start = firstCharacterIndex(contents,i);
                    i = start   1;
                }else {
                    i  ;
                }
            }
        }
        return true;

    }

    private static int firstCharacterIndex(String s,int start) {
        for (int i = start;i < s.length();i  ) {
            if (Character.isLetter(s.charAt(i))) {
                return i;
            }
        }
        return s.length();
    }
}
代码语言:javascript复制
public class BatchJavaOnly {
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        String input = "/Users/admin/Downloads/flink/data/hello.txt";
        List<String> list = new ArrayList<>();
        FileOperation.readFile(input,list);
        System.out.println(list);
        Map<String,Integer> map = new HashMap<>();
        list.forEach(token -> {
            if (map.containsKey(token)) {
                map.put(token,map.get(token)   1);
            }else {
                map.put(token,1);
            }
        });
        map.entrySet().stream().map(entry -> new Tuple2(entry.getKey(),entry.getValue()))
                .forEach(System.out::println);
    }
}

运行结果

代码语言:javascript复制
[hello, world, welcome, hello, welcome]
(world,1)
(hello,2)
(welcome,2)

从网络传输的流式处理

代码语言:javascript复制
public class StreamingJavaApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("127.0.0.1",9999);
        text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] tokens = value.toLowerCase().split(" ");
                for (String token : tokens) {
                    collector.collect(new Tuple2<>(token,1));
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5))
                .sum(1).print();
        env.execute("StreamingJavaApp");
    }
}

运行前打开端口

nc -lk 9999

运行代码,在nc命令输入a a c d b c e e f a

运行结果(日志省略)

代码语言:javascript复制
1> (e,2)
9> (d,1)
11> (a,3)
3> (b,1)
4> (f,1)
8> (c,2)

现在我们将元组改成实体类

代码语言:javascript复制
public class StreamObjJavaApp {
    @AllArgsConstructor
    @Data
    @ToString
    @NoArgsConstructor
    public static class WordCount {
        private String word;
        private int count;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("127.0.0.1",9999);
        text.flatMap(new FlatMapFunction<String, WordCount>() {
            @Override
            public void flatMap(String value, Collector<WordCount> collector) throws Exception {
                String[] tokens = value.toLowerCase().split(" ");
                for (String token : tokens) {
                    collector.collect(new WordCount(token,1));
                }
            }
        }).keyBy("word").timeWindow(Time.seconds(5))
                .sum("count").print();
        env.execute("StreamingJavaApp");
    }
}

运行结果

代码语言:javascript复制
4> StreamObjJavaApp.WordCount(word=f, count=1)
11> StreamObjJavaApp.WordCount(word=a, count=3)
8> StreamObjJavaApp.WordCount(word=c, count=2)
1> StreamObjJavaApp.WordCount(word=e, count=2)
9> StreamObjJavaApp.WordCount(word=d, count=1)
3> StreamObjJavaApp.WordCount(word=b, count=1)

当然我们也可以这么写

代码语言:javascript复制
public class StreamObjJavaApp {
    @AllArgsConstructor
    @Data
    @ToString
    @NoArgsConstructor
    public static class WordCount {
        private String word;
        private int count;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("127.0.0.1",9999);
        text.flatMap(new FlatMapFunction<String, WordCount>() {
            @Override
            public void flatMap(String value, Collector<WordCount> collector) throws Exception {
                String[] tokens = value.toLowerCase().split(" ");
                for (String token : tokens) {
                    collector.collect(new WordCount(token,1));
                }
            }
        }).keyBy(WordCount::getWord).timeWindow(Time.seconds(5))
                .sum("count").print().setParallelism(1);
        env.execute("StreamingJavaApp");
    }
}

keyBy里面是一个函数式接口KeySelector

代码语言:javascript复制
@Public
@FunctionalInterface
public interface KeySelector<IN, KEY> extends Function, Serializable {
   KEY getKey(IN value) throws Exception;
}

flatMap的lambda表达式写法,比较繁琐,不如匿名类的写法

代码语言:javascript复制
public class StreamObjJavaApp {
    @AllArgsConstructor
    @Data
    @ToString
    @NoArgsConstructor
    public static class WordCount {
        private String word;
        private int count;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("127.0.0.1",9999);
        text.flatMap((FlatMapFunction<String,WordCount>)(value,collector) -> {
            String[] tokens = value.toLowerCase().split(" ");
            for (String token : tokens) {
                collector.collect(new WordCount(token,1));
            }
        }).returns(WordCount.class)
                .keyBy(WordCount::getWord).timeWindow(Time.seconds(5))
                .sum("count").print().setParallelism(1);
        env.execute("StreamingJavaApp");
    }
}

flatMap还可以使用RichFlatMapFunction抽象类

代码语言:javascript复制
public class StreamObjJavaApp {
    @AllArgsConstructor
    @Data
    @ToString
    @NoArgsConstructor
    public static class WordCount {
        private String word;
        private int count;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("127.0.0.1",9999);
        text.flatMap(new RichFlatMapFunction<String, WordCount>() {
            @Override
            public void flatMap(String value, Collector<WordCount> collector) throws Exception {
                String[] tokens = value.toLowerCase().split(" ");
                for (String token : tokens) {
                    collector.collect(new WordCount(token,1));
                }
            }
        }).keyBy(WordCount::getWord).timeWindow(Time.seconds(5))
                .sum("count").print().setParallelism(1);
        env.execute("StreamingJavaApp");
    }
}

0 人点赞