Flink学习随笔-2021-02

2021-04-09 11:32:46 浏览数 (1)

Flink学习笔记

一、Flink运行架构

1、 Flink 运行时的组件

代码语言:javascript复制
`作业管理器(JobManager)`
`资源管理器(ResourceManager)`
`任务管理器(TaskManager)`
`以及分发器(Dispatcher)`

作业管理器(JobManager)

代码语言:javascript复制
控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager 所控制执行。JobManager 会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的 JAR 包。JobManager 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。JobManager 会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager 上。而在运行过程中,JobManager 会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。

资源管理器(ResourceManager)

代码语言:javascript复制
主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger 插槽是 Flink 中定义的处理资源单元。Flink 为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及 standalone 部署。当 JobManager 申请插槽资源时,ResourceManager会将有空闲插槽的 TaskManager 分配给 JobManager。如果 ResourceManager 没有足够的插槽来满足 JobManager 的请求,它还可以向资源提供平台发起会话,以提供启动 TaskManager进程的容器。另外,ResourceManager 还负责终止空闲的 TaskManager,释放计算资源。

任务管理器(TaskManager)

代码语言:javascript复制
Flink 中的工作进程。通常在 Flink 中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager 能够执行的任务数量。启动之后,TaskManager 会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager 就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManagerx交换数据。

分发器(Dispatcher)

代码语言:javascript复制
可以跨作业运行,它为应用提交提供了 REST 接口。当一个应用被提交执行时,分发器就会启动并将应用移交给一个 JobManager。由于是 REST 接口,所以 Dispatcher 可以作为集群的一个 HTTP 接入点,这样就能够不受防火墙阻挡。Dispatcher 也会启动一个 Web UI,用来方便地展示和监控作业执行的信息。Dispatcher 在架构中可能并不是必需的,这取决于应用提交运行的方式。

二、开发

1、Source

1.1从集合读取数据
代码语言:javascript复制
public class SourceFromList {
    public static void main(String[] args) throws Exception {
        
//        创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
//        ①、从集合中读取文件
        DataStreamSource<SensorReading> data1 = env.fromCollection(Arrays.asList(
                new SensorReading("sensor_1", 1547718199L, 35.8),
                new SensorReading("sensor_6", 1547718201L, 15.4),
                new SensorReading("sensor_10", 1547718205L, 38.1)
                ));
        
//        ②、直接读取传入参数  setParallelism为设置并行度
        DataStreamSource<? extends Serializable> data2 = env.fromElements("sensor_16, 1547218201, 27.3","sensor_18, 1547358286, 36.5").setParallelism(1);

//        打印输出
        data1.print("data1");
        data2.print("data2");

//        execute中传参为JobName
        env.execute("Demo1");
        
    }
}
1.2从文件中读取数据
代码语言:javascript复制
public class SourceFromFile {
    public static void main(String[] args) throws Exception {
//        创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//       设置并行度
        env.setParallelism(1);

//        从文件中读取内容
        DataStreamSource<String> dataSource = env.readTextFile("E:\IDEA2019_3Projects\FlinkDemo\src\main\resources\sensor.txt");
        dataSource.print("dataSource");

//        执行
        env.execute();
    }
}
1.3从kafka中读取数据

需要引入连接器jar包

代码语言:javascript复制
<!-- 0.11为kafka版本,2.12为scala版本,Flink是依赖于scala的。1.10.1是连接器的版本,和Flink版本一致 -->
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
	<version>1.10.1</version>
</dependency>
代码语言:javascript复制
public class SourceFromKafka {
    public static void main(String[] args) throws Exception {
//        创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        设置并行度
        env.setParallelism(1);
//        配置参数
        Properties prop = new Properties();
//        集群信息
        prop.setProperty("bootstrap.servers", "master:9092");
//        消费者组
        prop.setProperty("group.id", "consumer-group");
//        序列化
        prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//        反序列化
        prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//        消费偏移量
        prop.setProperty("auto.offset.reset", "latest");

//        从kafka中读取数据  addSource()
        DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), prop));

//        打印输出
        dataStreamSource.print("kafka");

//        执行
        env.execute();
    }
}
1.4自定义数据源
代码语言:javascript复制
public class SourceFromCustom {
    public static void main(String[] args) throws Exception {
//        创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        设置并行度
        env.setParallelism(1);
        DataStreamSource<SensorReading> dataStreamSource = env.addSource(new MySource());
        dataStreamSource.print();
//        执行
        env.execute();
}

    /**
     *
     *自定义SourceFunction
     */
    public static class MySource implements SourceFunction<SensorReading> {
//        定义一个标识位,用来控制循环
        private boolean running = true;

        @Override
        public void run(SourceContext<SensorReading> ctx) throws Exception {
//          为了模拟真实数据变化,定义一个随机数发生器
            Random random = new Random();
//          设置是个传感器的初始温度值
            HashMap<String, Double> sensorTempMap = new HashMap<String,Double>();
            for (int i = 1; i < 11; i  ) {
                sensorTempMap.put("sensor_"   i, 60 random.nextGaussian()*20);
            }

            while (running){
               for (String sensorId:sensorTempMap.keySet()){
                   Double newTemp = sensorTempMap.get(sensorId) random.nextGaussian();
                   sensorTempMap.put(sensorId, newTemp);
                   ctx.collect(new SensorReading(sensorId,System.currentTimeMillis(),newTemp));
                }
             }
            Thread.sleep(1000L);
          }
        
        @Override
        public void cancel() {
            running = false;
        }
    }
}

2、Transform转换算子

读数据

代码语言:javascript复制
//        创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        设置并行度
        env.setParallelism(1);
//        读数据
DataStreamSource<String> dataStreamSource = env.readTextFile("E:\IDEA2019_3Projects\FlinkDemo\src\main\resources\sensor.txt");
2.1Map
代码语言:javascript复制
//        1、map,把String转换成长度输出
        SingleOutputStreamOperator<Integer> map = dataStreamSource.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return value.length();
            }
        });
2.2flatMap
代码语言:javascript复制
SingleOutputStreamOperator<String> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {

                String[] fields = value.split(",");
                for (String field : fields){
                    out.collect(field);
                }
            }
        });
        flatMap.print("flatMap");
2.3、filter
代码语言:javascript复制
//        3、filter,筛选温度为37.1的数据
        SingleOutputStreamOperator<String> filter = dataStreamSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.contains("37.1");
            }
        });
        filter.print("filter");
//       filter,筛选以senser_1开头的
value.startsWith("sensor_1");
2.4、KeyBy
代码语言:javascript复制
//        转换成SensorReading
//        SingleOutputStreamOperator<SensorReading> map = dataStreamSource.map(new MapFunction<String, SensorReading>() {
//            @Override
//            public SensorReading map(String value) throws Exception {
//                String[] fields = value.split(",");
//                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
//            }
//        });

//        Lambda表达式格式
        SingleOutputStreamOperator<SensorReading> dataStream = dataStreamSource.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

//        分组
        KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
//        滚动聚合
        SingleOutputStreamOperator<SensorReading> temperature = keyedStream.maxBy("temperature");
        temperature.print("temp");
2.4、Reduce
代码语言:javascript复制
//        分组
        KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
//        Reduce聚合,取最大的温度值以及当前最新的时间戳
        SingleOutputStreamOperator<SensorReading> reduce = keyedStream.reduce(new ReduceFunction<SensorReading>() {
            @Override
            public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
                return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));
            }
        });
reduce.print("reducce");
2.5、分流split、select
代码语言:javascript复制
//        读数据
        DataStreamSource<String> dataStreamSource = env.readTextFile("E:\IDEA2019_3Projects\FlinkDemo\src\main\resources\sensor.txt");
//       转换为POJO类
        SingleOutputStreamOperator<SensorReading> map = dataStreamSource.map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            }
        });

//      分流操作,按照30度为临界值分成两个流
        SplitStream<SensorReading> multiplieStream = map.split(new OutputSelector<SensorReading>() {
            @Override
            public Iterable<String> select(SensorReading value) {
                return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
            }
        });
        DataStream<SensorReading> high = multiplieStream.select("high");
        DataStream<SensorReading> low = multiplieStream.select("low");
        DataStream<SensorReading> all = multiplieStream.select("high", "low");

        high.print("high");
        low.print("low");
        all.print("all");

//        执行
        env.execute();
    }
2.6、合并流

Connect CoMap

代码语言:javascript复制
        /**
         *    合流操作,将高温流转换成二元组类型,与低温流连接合并之后,输出状态信息,高温报警低温正常
         *
         */

        SingleOutputStreamOperator<Tuple2<String, Double>> highStream = high.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
            @Override
            public Tuple2<String, Double> map(SensorReading value) throws Exception {
                return new Tuple2<>(value.getId(),value.getTemperature());
            }
        });

        ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = highStream.connect(low);
        SingleOutputStreamOperator<Object> result = connectedStreams.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
            @Override
            public Object map1(Tuple2<String, Double> value) throws Exception {
                return new Tuple3<>(value.f0, value.f1, "高温报警");
            }

            @Override
            public Object map2(SensorReading value) throws Exception {
                return new Tuple2<>(value.getId(), "正常");
            }
        });

        result.print();

Union

代码语言:javascript复制
DataStream<SensorReading> unionStream = highTempStream.union(lowTempStream);

Connect 与 Union区别

代码语言:javascript复制
1.Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
2. Connect只能操作两个流,Union 可以操作多个

2.7、自定义UDF函数

自定义函数并可以传参

代码语言:javascript复制
//        创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        设置并行度
        env.setParallelism(1);

//        读数据
        DataStreamSource<String> dataStreamSource = env.readTextFile("E:\IDEA2019_3Projects\FlinkDemo\src\main\resources\sensor.txt");
//        自定义keyFilter函数
SingleOutputStreamOperator<String> sensorFilter = dataStreamSource.filter(new keyFilter("sensor_1"));
sensorFilter.print("result");

//        实现keyFilter函数并传参
    public static class keyFilter implements FilterFunction<String>{
        private String key;
        public keyFilter(String key) {
            this.key = key;
        }

        @Override
        public boolean filter(String value) throws Exception {
            return value.contains(this.key);
        }
    }

3、Sink

3.1Sink到Kafka
代码语言:javascript复制
public class SinkToKafka {
    public static <IN> void main(String[] args) {
//        环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
//       读取文件
        DataStreamSource<String> inputStream = env.readTextFile("E:\IDEA2019_3Projects\FlinkDemo\src\main\resources\sensor.txt");
//       转换成SensorReading类型
        SingleOutputStreamOperator<String> datastream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[1])).toString();
        });
        DataStreamSink<String> test = datastream.addSink(new FlinkKafkaProducer011<String>("master:9092", "test", new SimpleStringSchema()));
    }
}
3.2Sink到MySql
代码语言:javascript复制
public class SinkToMysql {
    public static void main(String[] args) throws Exception {
//        环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

//       读取文件
        DataStreamSource<String> inputStream = env.readTextFile("E:\IDEA2019_3Projects\FlinkDemo\src\main\resources\sensor.txt");
//       转换成SensorReading类型
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[1]));
        });

        dataStream.addSink(new MyJDBCSink());
        env.execute();
    }
    
    public static class MyJDBCSink extends RichSinkFunction<SensorReading> {
        Connection conn = null;
        PreparedStatement insertStmt = null;
        PreparedStatement updateStmt = null;

        @Override
        public void open(Configuration parameters) throws Exception {
//            连接
            conn=DriverManager.getConnection("jdbc:mysql://master:3306/student","root", "root");

//            创建预编译器,有占位符,可传入参数
            insertStmt=conn.prepareStatement("INSERT INTO sensor (id, dept) VALUES(?, ?)");
            updateStmt = conn.prepareStatement("UPDATE sensor SET dept = ? WHERE id = ?");

        }
        @Override
        public void invoke(SensorReading value, Context context) throws Exception {
//            直接执行更新语句,如果没有执行成功则执行插入操作
            updateStmt.setDouble(1, value.getTemperature());
            updateStmt.setString(2, value.getId());
            updateStmt.execute();
            if( updateStmt.getUpdateCount() == 0 ){
                insertStmt.setString(1, value.getId());
                insertStmt.setDouble(2, value.getTemperature());
                insertStmt.execute();
            }
        }
        @Override
        public void close() throws Exception {
            insertStmt.close();
            updateStmt.close();
            conn.close();
        }

    }
}

4、Window

4.1Window概述
4.1.1概述
代码语言:javascript复制
streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window是一种切割无限数据为有限块进行处理的手段。 
Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。
4.1.2Window类型
代码语言:javascript复制
Window 可以分成两类:
➢ CountWindow:按照指定的数据条数生成一个 Window,与时间无关。
➢ TimeWindow:按照时间生成 Window。

对于 TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

1、滚动窗口(Tumbling Windows) 将数据依据固定的窗口长度对数据进行切片。 ==特点:==时间对齐,窗口长度固定,没有重叠。 滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。 ==适用场景:==适合做 BI 统计等(做每个时间段的聚合计算)。 **2、滑动窗口(Sliding Windows) ** 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动 间隔组成。 ==特点:==时间对齐,窗口长度固定,可以有重叠。 滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。 例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包 含着上个 10 分钟产生的数据 ==适用场景:==对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)。 3. 会话窗口(Session Windows) 由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。 ==特点:==时间无对齐。 session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去。

4.2API

0 人点赞