flink之DataStream2

2024-06-16 20:41:56 浏览数 (2)

这是接上文的flink之Datastream1,文章链接 https://cloud.tencent.com/developer/article/2428018?shareByChannel=link

------------------------------------------------------分割线---------------------------------------------------------

三、自定义UDF(user-defined function)函数

用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。

用户自定义函数分为:函数类、匿名函数、富函数类(富函数这个后续会用到很多,涉及状态的保存以及更改需要操控open和close函数)

1、函数类(Function Classes)

Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。

因此之前写过实现MapFunction、FilterFunction、ReduceFunction的自定义函数,且此类函数用处不大,这里不过多赘述。

2、匿名函数

flink的这个函数只能在某个算子里面实现, 比如之前keyBy算子,如下

代码语言:javascript复制
KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(e -> e.id);

就使用了匿名函数类,此函数类优点是能简化函数的编写方式,但是缺点也很明显,只能使用一次。

3、富函数类

此函数的作用效果含括了函数类,如果是实现同一个接口,富函数接口在普通函数接口上多增加了一些抽象函数的定义,比如最常用的open、close函数,因此重点介绍。

富函数类”也是DataStream API提供的一个函数类的接口所有的Flink函数类都有其Rich版本富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等

与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能

Rich Function有生命周期的概念。典型的生命周期方法有:

· open()方法,是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。

· close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。

需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用(重点理解这句话!!!!)

比如之前写过的mapFunction

代码语言:javascript复制
public class WaterSensorMapFunction implements MapFunction<String, WaterSensor> {
    @Override
    public WaterSensor map(String s) throws Exception {
        String[] datas = s.split(",");

        return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));
    }
}

变成RIchMapFunction只需要

1、在实现的mapFunction接口名字改成RichMapFunction接口

2、按Alt Enter键自动导包

3、按Ctrl O 选择 生成open方法和close方法

如下

代码语言:javascript复制
public class WaterSensorMapFunction extends RichMapFunction<String, WaterSensor> {
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }
    @Override
    public WaterSensor map(String s) throws Exception {
        String[] datas = s.split(",");

        return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));
    }
}

四、物理分区算子

分区算子只介绍广播

1、广播 broadcast

这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份(从当前分区,往每个分区发一份重复的数据)可能进行重复处理。可以通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。

代码:

代码语言:javascript复制
stream.broadcast()

五、分流

1、分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。

代码语言:javascript复制
public static void main(String[] args) throws Exception {


    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    SingleOutputStreamOperator<Integer> ds = env.socketTextStream("hadoop102", 7777)
            .map(Integer::valueOf);
    //将ds 分为两个流 ,一个是奇数流,一个是偶数流
    //使用filter 过滤两次
    SingleOutputStreamOperator<Integer> ds1 = ds.filter(x -> x % 2 == 0);
    SingleOutputStreamOperator<Integer> ds2 = ds.filter(x -> x % 2 == 1);

    ds1.print("偶数");
    ds2.print("奇数");

    env.execute();
}

这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流stream复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?

2、测流

只需要调用上下文ctx的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的id和类型。

代码语言:javascript复制
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    SingleOutputStreamOperator<WaterSensor> ds = env.socketTextStream("hadoop102", 7777)
            .map(new WaterSensorMapFunction());


    OutputTag<WaterSensor> s1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class)){};
    OutputTag<WaterSensor> s2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class)){};
    //返回的都是主流
    SingleOutputStreamOperator<WaterSensor> ds1 = ds.process(new ProcessFunction<WaterSensor, WaterSensor>()
    {
        @Override
        public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
            //将同一份数据进行需求进行处理,即可完成分流的操作,无需复制多份
            if ("s1".equals(value.getId())) {
                ctx.output(s1, value);
            } else if ("s2".equals(value.getId())) {
                ctx.output(s2, value);
            } else {
                //主流
                out.collect(value);
            }

        }
    });

    ds1.print("主流,非s1,s2的传感器");
    //获取侧输出流
    SideOutputDataStream<WaterSensor> s1DS = ds1.getSideOutput(s1);
    SideOutputDataStream<WaterSensor> s2DS = ds1.getSideOutput(s2);
    
    //打上标识,方便辨识
    s1DS.printToErr("s1");
    s2DS.printToErr("s2");

    env.execute();

}

六、合流

这里先不写,因为我用到的地方不多,待我用到再来总结

0 人点赞