flink学习-DataSourse学习

2019-08-28 10:20:10 浏览数 (1)

一.什么是DataSource?

Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就能够一直计算下去,这个 Data Sources 就是数据的来源地。

Flink 中你可以使用 StreamExecutionEnvironment.addSource(sourceFunction) 来为你的程序添加数据来源。

Flink 已经提供了若干实现好了的 source functions,当然你也可以通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source

二.如何从StreamExecutionEnvironment中实现stream sources?

a.基于集合的数据源

  1. fromCollection(Collection)可以从java自带的一些集合中获得。collection必须相同
  2. fromCollection(Iterator, Class)从一个迭代器中创建数据流。Class 指定了该迭代器返回元素的类型。
  3. fromElements(T …)从给定的对象序列中创建数据流。所有对象类型必须相同
  4. fromParallelCollection()
  5. GenerateQueue(from,to)创建一个生成指定区间范围内的数字序列的并行数据流

示例:

代码语言:java复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Event> input = env.fromElements(
	new Event(1, "barfoo", 1.0),
	new Event(2, "start", 2.0),
	new Event(3, "foobar", 3.0),
	...
);

b.基于文件

  1. readFilePath(file)读取指定位置的文件
  2. readFilePath()fileInputFormat, path) 根据指定的文件输入格式读取文件(一次)
  3. readFilePath(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) 这是上面两个方法内部调用的方法。它根据给定的 fileInputFormat 和读取路径读取文件。根据提供的 watchType,这个 source 可以定期(每隔 interval 毫秒)监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。你可以通过 pathFilter 进一步排除掉需要处理的文件。

示例:

代码语言:javascript复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");

c.基于socket

  1. socketTextStream(host,port)从指定的socket套接字中读数据,元素可以用逗号分隔。

示例:

代码语言:javascript复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env
        .socketTextStream("localhost", 9999) // 监听 localhost 的 9999 端口过来的数据
        .flatMap(new Splitter())
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

d.自定义addSource()

示例:

代码语言:javascript复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<KafkaEvent> input = env
		.addSource(
			new FlinkKafkaConsumer011<>(
				parameterTool.getRequired("input-topic"), //从参数中获取传进来的 topic 
				new KafkaEventSchema(),
				parameterTool.getProperties())
			.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()));

比如你可以自定义从kafak,rabbitMQ,activeMQ,netty中读取数据。那么你就需要去了解一下 SourceFunction 接口了它是所有 stream source 的根接口,它继承自一个标记接口(空接口)Function。SourceFunction定义了两个接口方法:

  1. run():启动一个 source,即对接一个外部数据源然后 emit 元素形成 stream(大部分情况下会通过在该方法里运行一个 while 循环的形式来产生 stream。
  2. cancel():取消一个 source,也即将 run 中的循环 emit 元素的行为终止。

正常情况下,这两个方法都应对应的模板可以参考,参考一下格式就行。

自定义DataSource模板自定义DataSource模板

0 人点赞