首先先拉取Flink的样例代码
代码语言:javascript复制mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.7.2
-DarchetypeCatalog=local
实现从文件读取的批处理
建立一个hello.txt,文件内容如下
hello world welcome hello welcome
统计词频
代码语言:javascript复制public class BatchJavaApp {
public static void main(String[] args) throws Exception {
String input = "/Users/admin/Downloads/flink/data";
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> text = env.readTextFile(input);
text.print();
text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens = value.toLowerCase().split(" ");
for (String token : tokens) {
collector.collect(new Tuple2<>(token,1));
}
}
}).groupBy(0).sum(1).print();
}
}
运行结果(日志省略)
代码语言:javascript复制hello welcome
hello world welcome
(world,1)
(hello,2)
(welcome,2)
纯Java实现
文件读取类
代码语言:javascript复制public class FileOperation {
/**
* 读取文件名称为filename中的内容,并将其中包含的所有词语放进words中
* @param filename
* @param words
* @return
*/
public static boolean readFile(String filename, List<String> words) {
if (filename == null || words == null) {
System.out.println("filename为空或words为空");
return false;
}
Scanner scanner;
try {
File file = new File(filename);
if (file.exists()) {
FileInputStream fis = new FileInputStream(file);
scanner = new Scanner(new BufferedInputStream(fis),"UTF-8");
scanner.useLocale(Locale.ENGLISH);
}else {
return false;
}
} catch (FileNotFoundException e) {
System.out.println("无法打开" filename);
return false;
}
//简单分词
if (scanner.hasNextLine()) {
String contents = scanner.useDelimiter("\A").next();
int start = firstCharacterIndex(contents,0);
for (int i = start 1;i <= contents.length();) {
if (i == contents.length() || !Character.isLetter(contents.charAt(i))) {
String word = contents.substring(start,i).toLowerCase();
words.add(word);
start = firstCharacterIndex(contents,i);
i = start 1;
}else {
i ;
}
}
}
return true;
}
private static int firstCharacterIndex(String s,int start) {
for (int i = start;i < s.length();i ) {
if (Character.isLetter(s.charAt(i))) {
return i;
}
}
return s.length();
}
}
代码语言:javascript复制public class BatchJavaOnly {
@SuppressWarnings("unchecked")
public static void main(String[] args) {
String input = "/Users/admin/Downloads/flink/data/hello.txt";
List<String> list = new ArrayList<>();
FileOperation.readFile(input,list);
System.out.println(list);
Map<String,Integer> map = new HashMap<>();
list.forEach(token -> {
if (map.containsKey(token)) {
map.put(token,map.get(token) 1);
}else {
map.put(token,1);
}
});
map.entrySet().stream().map(entry -> new Tuple2(entry.getKey(),entry.getValue()))
.forEach(System.out::println);
}
}
运行结果
代码语言:javascript复制[hello, world, welcome, hello, welcome]
(world,1)
(hello,2)
(welcome,2)
从网络传输的流式处理
代码语言:javascript复制public class StreamingJavaApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("127.0.0.1",9999);
text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens = value.toLowerCase().split(" ");
for (String token : tokens) {
collector.collect(new Tuple2<>(token,1));
}
}
}).keyBy(0).timeWindow(Time.seconds(5))
.sum(1).print();
env.execute("StreamingJavaApp");
}
}
运行前打开端口
nc -lk 9999
运行代码,在nc命令输入a a c d b c e e f a
运行结果(日志省略)
代码语言:javascript复制1> (e,2)
9> (d,1)
11> (a,3)
3> (b,1)
4> (f,1)
8> (c,2)
现在我们将元组改成实体类
代码语言:javascript复制public class StreamObjJavaApp {
@AllArgsConstructor
@Data
@ToString
@NoArgsConstructor
public static class WordCount {
private String word;
private int count;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("127.0.0.1",9999);
text.flatMap(new FlatMapFunction<String, WordCount>() {
@Override
public void flatMap(String value, Collector<WordCount> collector) throws Exception {
String[] tokens = value.toLowerCase().split(" ");
for (String token : tokens) {
collector.collect(new WordCount(token,1));
}
}
}).keyBy("word").timeWindow(Time.seconds(5))
.sum("count").print();
env.execute("StreamingJavaApp");
}
}
运行结果
代码语言:javascript复制4> StreamObjJavaApp.WordCount(word=f, count=1)
11> StreamObjJavaApp.WordCount(word=a, count=3)
8> StreamObjJavaApp.WordCount(word=c, count=2)
1> StreamObjJavaApp.WordCount(word=e, count=2)
9> StreamObjJavaApp.WordCount(word=d, count=1)
3> StreamObjJavaApp.WordCount(word=b, count=1)
当然我们也可以这么写
代码语言:javascript复制public class StreamObjJavaApp {
@AllArgsConstructor
@Data
@ToString
@NoArgsConstructor
public static class WordCount {
private String word;
private int count;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("127.0.0.1",9999);
text.flatMap(new FlatMapFunction<String, WordCount>() {
@Override
public void flatMap(String value, Collector<WordCount> collector) throws Exception {
String[] tokens = value.toLowerCase().split(" ");
for (String token : tokens) {
collector.collect(new WordCount(token,1));
}
}
}).keyBy(WordCount::getWord).timeWindow(Time.seconds(5))
.sum("count").print().setParallelism(1);
env.execute("StreamingJavaApp");
}
}
keyBy里面是一个函数式接口KeySelector
代码语言:javascript复制@Public
@FunctionalInterface
public interface KeySelector<IN, KEY> extends Function, Serializable {
KEY getKey(IN value) throws Exception;
}
flatMap的lambda表达式写法,比较繁琐,不如匿名类的写法
代码语言:javascript复制public class StreamObjJavaApp {
@AllArgsConstructor
@Data
@ToString
@NoArgsConstructor
public static class WordCount {
private String word;
private int count;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("127.0.0.1",9999);
text.flatMap((FlatMapFunction<String,WordCount>)(value,collector) -> {
String[] tokens = value.toLowerCase().split(" ");
for (String token : tokens) {
collector.collect(new WordCount(token,1));
}
}).returns(WordCount.class)
.keyBy(WordCount::getWord).timeWindow(Time.seconds(5))
.sum("count").print().setParallelism(1);
env.execute("StreamingJavaApp");
}
}
flatMap还可以使用RichFlatMapFunction抽象类
代码语言:javascript复制public class StreamObjJavaApp {
@AllArgsConstructor
@Data
@ToString
@NoArgsConstructor
public static class WordCount {
private String word;
private int count;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("127.0.0.1",9999);
text.flatMap(new RichFlatMapFunction<String, WordCount>() {
@Override
public void flatMap(String value, Collector<WordCount> collector) throws Exception {
String[] tokens = value.toLowerCase().split(" ");
for (String token : tokens) {
collector.collect(new WordCount(token,1));
}
}
}).keyBy(WordCount::getWord).timeWindow(Time.seconds(5))
.sum("count").print().setParallelism(1);
env.execute("StreamingJavaApp");
}
}