这是接上文的flink之Datastream1,文章链接 https://cloud.tencent.com/developer/article/2428018?shareByChannel=link
------------------------------------------------------分割线---------------------------------------------------------
三、自定义UDF(user-defined function)函数
用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。
用户自定义函数分为:函数类、匿名函数、富函数类(富函数这个后续会用到很多,涉及状态的保存以及更改需要操控open和close函数)。
1、函数类(Function Classes)
Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。
因此之前写过实现MapFunction、FilterFunction、ReduceFunction的自定义函数,且此类函数用处不大,这里不过多赘述。
2、匿名函数
flink的这个函数只能在某个算子里面实现, 比如之前keyBy算子,如下
代码语言:javascript复制KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(e -> e.id);
就使用了匿名函数类,此函数类优点是能简化函数的编写方式,但是缺点也很明显,只能使用一次。
3、富函数类
此函数的作用效果含括了函数类,如果是实现同一个接口,富函数接口在普通函数接口上多增加了一些抽象函数的定义,比如最常用的open、close函数,因此重点介绍。
富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function有生命周期的概念。典型的生命周期方法有:
· open()方法,是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。
· close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用(重点理解这句话!!!!)。
比如之前写过的mapFunction
代码语言:javascript复制public class WaterSensorMapFunction implements MapFunction<String, WaterSensor> {
@Override
public WaterSensor map(String s) throws Exception {
String[] datas = s.split(",");
return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));
}
}
变成RIchMapFunction只需要
1、在实现的mapFunction接口名字改成RichMapFunction接口
2、按Alt Enter键自动导包
3、按Ctrl O 选择 生成open方法和close方法
如下
代码语言:javascript复制public class WaterSensorMapFunction extends RichMapFunction<String, WaterSensor> {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public WaterSensor map(String s) throws Exception {
String[] datas = s.split(",");
return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));
}
}
四、物理分区算子
分区算子只介绍广播
1、广播 broadcast
这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份(从当前分区,往每个分区发一份重复的数据),可能进行重复处理。可以通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。
代码:
代码语言:javascript复制stream.broadcast()
五、分流
1、分流
所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。
代码语言:javascript复制public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<Integer> ds = env.socketTextStream("hadoop102", 7777)
.map(Integer::valueOf);
//将ds 分为两个流 ,一个是奇数流,一个是偶数流
//使用filter 过滤两次
SingleOutputStreamOperator<Integer> ds1 = ds.filter(x -> x % 2 == 0);
SingleOutputStreamOperator<Integer> ds2 = ds.filter(x -> x % 2 == 1);
ds1.print("偶数");
ds2.print("奇数");
env.execute();
}
这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流stream复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?
2、测流
只需要调用上下文ctx的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的id和类型。
代码语言:javascript复制public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<WaterSensor> ds = env.socketTextStream("hadoop102", 7777)
.map(new WaterSensorMapFunction());
OutputTag<WaterSensor> s1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class)){};
OutputTag<WaterSensor> s2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class)){};
//返回的都是主流
SingleOutputStreamOperator<WaterSensor> ds1 = ds.process(new ProcessFunction<WaterSensor, WaterSensor>()
{
@Override
public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
//将同一份数据进行需求进行处理,即可完成分流的操作,无需复制多份
if ("s1".equals(value.getId())) {
ctx.output(s1, value);
} else if ("s2".equals(value.getId())) {
ctx.output(s2, value);
} else {
//主流
out.collect(value);
}
}
});
ds1.print("主流,非s1,s2的传感器");
//获取侧输出流
SideOutputDataStream<WaterSensor> s1DS = ds1.getSideOutput(s1);
SideOutputDataStream<WaterSensor> s2DS = ds1.getSideOutput(s2);
//打上标识,方便辨识
s1DS.printToErr("s1");
s2DS.printToErr("s2");
env.execute();
}
六、合流
这里先不写,因为我用到的地方不多,待我用到再来总结