一.什么是DataSource?
Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就能够一直计算下去,这个 Data Sources 就是数据的来源地。
Flink 中你可以使用 StreamExecutionEnvironment.addSource(sourceFunction)
来为你的程序添加数据来源。
Flink 已经提供了若干实现好了的 source functions,当然你也可以通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source
二.如何从StreamExecutionEnvironment中实现stream sources?
a.基于集合的数据源
- fromCollection(Collection)可以从java自带的一些集合中获得。collection必须相同
- fromCollection(Iterator, Class)从一个迭代器中创建数据流。Class 指定了该迭代器返回元素的类型。
- fromElements(T …)从给定的对象序列中创建数据流。所有对象类型必须相同
- fromParallelCollection()
- GenerateQueue(from,to)创建一个生成指定区间范围内的数字序列的并行数据流
示例:
代码语言:java复制StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input = env.fromElements(
new Event(1, "barfoo", 1.0),
new Event(2, "start", 2.0),
new Event(3, "foobar", 3.0),
...
);
b.基于文件
- readFilePath(file)读取指定位置的文件
- readFilePath()fileInputFormat, path) 根据指定的文件输入格式读取文件(一次)
- readFilePath(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) 这是上面两个方法内部调用的方法。它根据给定的 fileInputFormat 和读取路径读取文件。根据提供的 watchType,这个 source 可以定期(每隔 interval 毫秒)监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。你可以通过 pathFilter 进一步排除掉需要处理的文件。
示例:
代码语言:javascript复制final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");
c.基于socket
- socketTextStream(host,port)从指定的socket套接字中读数据,元素可以用逗号分隔。
示例:
代码语言:javascript复制StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999) // 监听 localhost 的 9999 端口过来的数据
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
d.自定义addSource()
示例:
代码语言:javascript复制StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<KafkaEvent> input = env
.addSource(
new FlinkKafkaConsumer011<>(
parameterTool.getRequired("input-topic"), //从参数中获取传进来的 topic
new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()));
比如你可以自定义从kafak,rabbitMQ,activeMQ,netty中读取数据。那么你就需要去了解一下 SourceFunction 接口了它是所有 stream source 的根接口,它继承自一个标记接口(空接口)Function。SourceFunction定义了两个接口方法:
- run():启动一个 source,即对接一个外部数据源然后 emit 元素形成 stream(大部分情况下会通过在该方法里运行一个 while 循环的形式来产生 stream。
- cancel():取消一个 source,也即将 run 中的循环 emit 元素的行为终止。
正常情况下,这两个方法都应对应的模板可以参考,参考一下格式就行。