flink任务本身提供了各种类型的指标监控,细化到了每一个Operator的流入/流出量、速率、Watermark值等,通常在实际应用中需要对接入数据做格式化例如转json,符合要求的数据会向下流动,不符合要求或者格式化异常称为脏数据会被过滤掉,现在目标实现一个通用化方式能够对正常数据与脏数据进行指标统计。 实现思路:
- flink metric类型分为Counter、Gauge、Histogram、Meter,需要统计的是一个累加值因此选取Counter类型的metirc
- 由于是对任务的流入监控,因此需要在Source端进行处理,通常对接的数据源是kafka, 而flink本身已经提供了kakfa connector,并且开放了数据反序列化的接口DeserializationSchema与抽象类AbstractDeserializationSchema,实现该接口或者继承抽象类可以完成数据的反序列化与格式化,由于每一条数据都需要进过反序列化处理,那么可以在反序列化的同时进行指标统计
- 在flink中自定义Metric入口是RuntimeContext, 但是在反序列化抽象类中并没有提供访问RuntimeContext的接口,一般是在RichFunction中,与其相关只有FlinkKafkaConsumer,那么就可以在FlinkKafkaConsumer中将获取到的RuntimeContext传给AbstractDeserializationSchema
实现步骤:
- 自定义一个继承AbstractDeserializationSchema的抽象类AbsDeserialization,里面包含RuntimeContext与两个统计的Counter,并且包含一个初始化Counter的方法initMetric
- 自定义一个继承FlinkKafkaConsumer010的抽象类,里面包含AbsDeserialization属性、构造化方法,并且重写run方法,在run方法里面给AbsDeserialization设置RuntimeContex对象并且调用其initMetric, 最后调用父类run方法
代码如下:
代码语言:javascript复制public abstract class AbsDeserialization<T> extends AbstractDeserializationSchema<T> {
private RuntimeContext runtimeContext;
private String DIRTY_DATA_NAME="dirtyDataNum";
private String NORMAL_DATA_NAME="normalDataNum";
protected transient Counter dirtyDataNum;
protected transient Counter normalDataNum;
public RuntimeContext getRuntimeContext() {
return runtimeContext;
}
public void setRuntimeContext(RuntimeContext runtimeContext) {
this.runtimeContext = runtimeContext;
}
public void initMetric()
{
dirtyDataNum=runtimeContext.getMetricGroup().counter(DIRTY_DATA_NAME);
normalDataNum=runtimeContext.getMetricGroup().counter(NORMAL_DATA_NAME);
}
}
代码语言:javascript复制public class CustomerKafkaConsumer<T> extends FlinkKafkaConsumer010<T> {
private AbsDeserialization<T> valueDeserializer;
public CustomerKafkaConsumer(String topic, AbsDeserialization<T> valueDeserializer, Properties props) {
super(topic, valueDeserializer, props);
this.valueDeserializer=valueDeserializer;
}
@Override public void run(SourceContext<T> sourceContext) throws Exception {
valueDeserializer.setRuntimeContext(getRuntimeContext());
valueDeserializer.initMetric();
super.run(sourceContext);
}
}
使用案例,只要定义一个继承AbsDeserialization类即可,
代码语言:javascript复制class ParseDeserialization extends AbsDeserialization[RawData] {
override def deserialize(message: Array[Byte]): RawData = {
try {
val msg = new String(message)
val rawData = JSON.parseObject(msg, classOf[RawData])
normalDataNum.inc() //正常数据指标
rawData
} catch {
case e:Exception=>{
dirtyDataNum.inc() //脏数据指标
null
}
}
}
}
source使用方式:
代码语言:javascript复制val consumer: CustomerKafkaConsumer[RawData] = new CustomerKafkaConsumer[RawData](topic, new ParseDeserialization, kafkaPro)
那么在任务运行中,可以在flink web的监控界面查看到normalDataNum 、dirtyDataNum 两个指标值,另外在AbsDeserialization里面也可以定义一些流入速率等监控。