Flink学习笔记
一、Flink运行架构
1、 Flink 运行时的组件
代码语言:javascript复制`作业管理器(JobManager)`
`资源管理器(ResourceManager)`
`任务管理器(TaskManager)`
`以及分发器(Dispatcher)`
代码语言:javascript复制作业管理器(JobManager)
控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager 所控制执行。JobManager 会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的 JAR 包。JobManager 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。JobManager 会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager 上。而在运行过程中,JobManager 会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
代码语言:javascript复制资源管理器(ResourceManager)
主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger 插槽是 Flink 中定义的处理资源单元。Flink 为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及 standalone 部署。当 JobManager 申请插槽资源时,ResourceManager会将有空闲插槽的 TaskManager 分配给 JobManager。如果 ResourceManager 没有足够的插槽来满足 JobManager 的请求,它还可以向资源提供平台发起会话,以提供启动 TaskManager进程的容器。另外,ResourceManager 还负责终止空闲的 TaskManager,释放计算资源。
代码语言:javascript复制任务管理器(TaskManager)
Flink 中的工作进程。通常在 Flink 中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager 能够执行的任务数量。启动之后,TaskManager 会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager 就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManagerx交换数据。
代码语言:javascript复制分发器(Dispatcher)
可以跨作业运行,它为应用提交提供了 REST 接口。当一个应用被提交执行时,分发器就会启动并将应用移交给一个 JobManager。由于是 REST 接口,所以 Dispatcher 可以作为集群的一个 HTTP 接入点,这样就能够不受防火墙阻挡。Dispatcher 也会启动一个 Web UI,用来方便地展示和监控作业执行的信息。Dispatcher 在架构中可能并不是必需的,这取决于应用提交运行的方式。
二、开发
1、Source
1.1从集合读取数据
代码语言:javascript复制public class SourceFromList {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ①、从集合中读取文件
DataStreamSource<SensorReading> data1 = env.fromCollection(Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_6", 1547718201L, 15.4),
new SensorReading("sensor_10", 1547718205L, 38.1)
));
// ②、直接读取传入参数 setParallelism为设置并行度
DataStreamSource<? extends Serializable> data2 = env.fromElements("sensor_16, 1547218201, 27.3","sensor_18, 1547358286, 36.5").setParallelism(1);
// 打印输出
data1.print("data1");
data2.print("data2");
// execute中传参为JobName
env.execute("Demo1");
}
}
1.2从文件中读取数据
代码语言:javascript复制public class SourceFromFile {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(1);
// 从文件中读取内容
DataStreamSource<String> dataSource = env.readTextFile("E:\IDEA2019_3Projects\FlinkDemo\src\main\resources\sensor.txt");
dataSource.print("dataSource");
// 执行
env.execute();
}
}
1.3从kafka中读取数据
代码语言:javascript复制需要引入连接器jar包
<!-- 0.11为kafka版本,2.12为scala版本,Flink是依赖于scala的。1.10.1是连接器的版本,和Flink版本一致 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
代码语言:javascript复制public class SourceFromKafka {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(1);
// 配置参数
Properties prop = new Properties();
// 集群信息
prop.setProperty("bootstrap.servers", "master:9092");
// 消费者组
prop.setProperty("group.id", "consumer-group");
// 序列化
prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// 反序列化
prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// 消费偏移量
prop.setProperty("auto.offset.reset", "latest");
// 从kafka中读取数据 addSource()
DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), prop));
// 打印输出
dataStreamSource.print("kafka");
// 执行
env.execute();
}
}
1.4自定义数据源
代码语言:javascript复制public class SourceFromCustom {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(1);
DataStreamSource<SensorReading> dataStreamSource = env.addSource(new MySource());
dataStreamSource.print();
// 执行
env.execute();
}
/**
*
*自定义SourceFunction
*/
public static class MySource implements SourceFunction<SensorReading> {
// 定义一个标识位,用来控制循环
private boolean running = true;
@Override
public void run(SourceContext<SensorReading> ctx) throws Exception {
// 为了模拟真实数据变化,定义一个随机数发生器
Random random = new Random();
// 设置是个传感器的初始温度值
HashMap<String, Double> sensorTempMap = new HashMap<String,Double>();
for (int i = 1; i < 11; i ) {
sensorTempMap.put("sensor_" i, 60 random.nextGaussian()*20);
}
while (running){
for (String sensorId:sensorTempMap.keySet()){
Double newTemp = sensorTempMap.get(sensorId) random.nextGaussian();
sensorTempMap.put(sensorId, newTemp);
ctx.collect(new SensorReading(sensorId,System.currentTimeMillis(),newTemp));
}
}
Thread.sleep(1000L);
}
@Override
public void cancel() {
running = false;
}
}
}
2、Transform转换算子
代码语言:javascript复制读数据
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(1);
// 读数据
DataStreamSource<String> dataStreamSource = env.readTextFile("E:\IDEA2019_3Projects\FlinkDemo\src\main\resources\sensor.txt");
2.1Map
代码语言:javascript复制// 1、map,把String转换成长度输出
SingleOutputStreamOperator<Integer> map = dataStreamSource.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) throws Exception {
return value.length();
}
});
2.2flatMap
代码语言:javascript复制SingleOutputStreamOperator<String> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] fields = value.split(",");
for (String field : fields){
out.collect(field);
}
}
});
flatMap.print("flatMap");
2.3、filter
代码语言:javascript复制// 3、filter,筛选温度为37.1的数据
SingleOutputStreamOperator<String> filter = dataStreamSource.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.contains("37.1");
}
});
filter.print("filter");
// filter,筛选以senser_1开头的
value.startsWith("sensor_1");
2.4、KeyBy
代码语言:javascript复制// 转换成SensorReading
// SingleOutputStreamOperator<SensorReading> map = dataStreamSource.map(new MapFunction<String, SensorReading>() {
// @Override
// public SensorReading map(String value) throws Exception {
// String[] fields = value.split(",");
// return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
// }
// });
// Lambda表达式格式
SingleOutputStreamOperator<SensorReading> dataStream = dataStreamSource.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 分组
KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
// 滚动聚合
SingleOutputStreamOperator<SensorReading> temperature = keyedStream.maxBy("temperature");
temperature.print("temp");
2.4、Reduce
代码语言:javascript复制// 分组
KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
// Reduce聚合,取最大的温度值以及当前最新的时间戳
SingleOutputStreamOperator<SensorReading> reduce = keyedStream.reduce(new ReduceFunction<SensorReading>() {
@Override
public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));
}
});
reduce.print("reducce");
2.5、分流split、select
代码语言:javascript复制// 读数据
DataStreamSource<String> dataStreamSource = env.readTextFile("E:\IDEA2019_3Projects\FlinkDemo\src\main\resources\sensor.txt");
// 转换为POJO类
SingleOutputStreamOperator<SensorReading> map = dataStreamSource.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String value) throws Exception {
String[] fields = value.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
}
});
// 分流操作,按照30度为临界值分成两个流
SplitStream<SensorReading> multiplieStream = map.split(new OutputSelector<SensorReading>() {
@Override
public Iterable<String> select(SensorReading value) {
return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
}
});
DataStream<SensorReading> high = multiplieStream.select("high");
DataStream<SensorReading> low = multiplieStream.select("low");
DataStream<SensorReading> all = multiplieStream.select("high", "low");
high.print("high");
low.print("low");
all.print("all");
// 执行
env.execute();
}
2.6、合并流
代码语言:javascript复制Connect 和 CoMap
/**
* 合流操作,将高温流转换成二元组类型,与低温流连接合并之后,输出状态信息,高温报警低温正常
*
*/
SingleOutputStreamOperator<Tuple2<String, Double>> highStream = high.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(),value.getTemperature());
}
});
ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = highStream.connect(low);
SingleOutputStreamOperator<Object> result = connectedStreams.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
@Override
public Object map1(Tuple2<String, Double> value) throws Exception {
return new Tuple3<>(value.f0, value.f1, "高温报警");
}
@Override
public Object map2(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), "正常");
}
});
result.print();
代码语言:javascript复制Union
DataStream<SensorReading> unionStream = highTempStream.union(lowTempStream);
代码语言:javascript复制Connect 与 Union区别
1.Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
2. Connect只能操作两个流,Union 可以操作多个
2.7、自定义UDF函数
代码语言:javascript复制自定义函数并可以传参
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(1);
// 读数据
DataStreamSource<String> dataStreamSource = env.readTextFile("E:\IDEA2019_3Projects\FlinkDemo\src\main\resources\sensor.txt");
// 自定义keyFilter函数
SingleOutputStreamOperator<String> sensorFilter = dataStreamSource.filter(new keyFilter("sensor_1"));
sensorFilter.print("result");
// 实现keyFilter函数并传参
public static class keyFilter implements FilterFunction<String>{
private String key;
public keyFilter(String key) {
this.key = key;
}
@Override
public boolean filter(String value) throws Exception {
return value.contains(this.key);
}
}
3、Sink
3.1Sink到Kafka
代码语言:javascript复制public class SinkToKafka {
public static <IN> void main(String[] args) {
// 环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取文件
DataStreamSource<String> inputStream = env.readTextFile("E:\IDEA2019_3Projects\FlinkDemo\src\main\resources\sensor.txt");
// 转换成SensorReading类型
SingleOutputStreamOperator<String> datastream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[1])).toString();
});
DataStreamSink<String> test = datastream.addSink(new FlinkKafkaProducer011<String>("master:9092", "test", new SimpleStringSchema()));
}
}
3.2Sink到MySql
代码语言:javascript复制public class SinkToMysql {
public static void main(String[] args) throws Exception {
// 环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取文件
DataStreamSource<String> inputStream = env.readTextFile("E:\IDEA2019_3Projects\FlinkDemo\src\main\resources\sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[1]));
});
dataStream.addSink(new MyJDBCSink());
env.execute();
}
public static class MyJDBCSink extends RichSinkFunction<SensorReading> {
Connection conn = null;
PreparedStatement insertStmt = null;
PreparedStatement updateStmt = null;
@Override
public void open(Configuration parameters) throws Exception {
// 连接
conn=DriverManager.getConnection("jdbc:mysql://master:3306/student","root", "root");
// 创建预编译器,有占位符,可传入参数
insertStmt=conn.prepareStatement("INSERT INTO sensor (id, dept) VALUES(?, ?)");
updateStmt = conn.prepareStatement("UPDATE sensor SET dept = ? WHERE id = ?");
}
@Override
public void invoke(SensorReading value, Context context) throws Exception {
// 直接执行更新语句,如果没有执行成功则执行插入操作
updateStmt.setDouble(1, value.getTemperature());
updateStmt.setString(2, value.getId());
updateStmt.execute();
if( updateStmt.getUpdateCount() == 0 ){
insertStmt.setString(1, value.getId());
insertStmt.setDouble(2, value.getTemperature());
insertStmt.execute();
}
}
@Override
public void close() throws Exception {
insertStmt.close();
updateStmt.close();
conn.close();
}
}
}
4、Window
4.1Window概述
4.1.1概述
代码语言:javascript复制streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window是一种切割无限数据为有限块进行处理的手段。
Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。
4.1.2Window类型
代码语言:javascript复制Window 可以分成两类:
➢ CountWindow:按照指定的数据条数生成一个 Window,与时间无关。
➢ TimeWindow:按照时间生成 Window。
对于 TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。
1、滚动窗口(Tumbling Windows) 将数据依据固定的窗口长度对数据进行切片。 ==特点:==时间对齐,窗口长度固定,没有重叠。 滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。 ==适用场景:==适合做 BI 统计等(做每个时间段的聚合计算)。 **2、滑动窗口(Sliding Windows) ** 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动 间隔组成。 ==特点:==时间对齐,窗口长度固定,可以有重叠。 滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。 例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包 含着上个 10 分钟产生的数据 ==适用场景:==对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)。 3. 会话窗口(Session Windows) 由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。 ==特点:==时间无对齐。 session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去。