Flink per-Job模式InfluxdbReporter上报JobName

2022-04-18 12:53:40 浏览数 (1)

最近将Flink集群从1.6升级到1.8,主要是为了使用1.8的两个特性:一个是universal kafka ,另外一个是rocksdb ttl, 然后注意到1.8 提供了Influxdb 的reporter, 在最开始1.6使用的rest api方式主动请求对应的metric, 使用这种方式目前有两个弊端:

  • 前期使用metric比较少,自己通过开发图表展示,但是后期需要新的metric 都需要开发一次
  • 客户端使用轮询的方式去请求,如果任务比较多就会造成一定延时,并且实时平台在做高可用情况下,涉及定时的切换,给系统开发带来一定复杂性

面对这两个问题选择了influxdb grafana的方式,也应该是很多公司选择的方案,当然也有选择Prometheus 的,接下来介绍一下将flink metric 上报influxdb部署的步骤:

  1. 将flink安装包下面opt目录的flink-metrics-influxdb-1.8.2.jar包拷贝至lib目录下面
  2. 在flink-conf.yaml 中增加配置,
代码语言:javascript复制
metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter

metrics.reporter.influxdb.host: localhost  //influxdb服务所在的地址

metrics.reporter.influxdb.port:8086//influxdb 端口

metrics.reporter.influxdb.db: flink   //influxdb库名称

metrics.reporter.influxdb.username: flink-metrics

metrics.reporter.influxdb.password: qwerty
 

至此部署已经完成,接下来提交一个任务到集群中去,在grafana做一些图表展示,在influxdb中会自动生成很多measurement也就是表,选择taskmanager_Status_JVM_CPU_Load_value 表查看,发现其写入的Tags只有host与tm_id的信息,查看jobmanager_Status_JVM_CPU_Load_value表,发现其写入的Tags只有host信息,

代码语言:javascript复制
taskmanager_Status_JVM_CPU_Load_value{db="flink-metric",host="xx.xx.xx.xx",tm_id="container_e03_1573197073760_0134_01_000006"}

jobmanager_Status_JVM_CPU_Load_value{db="flink-metric",host="xx.xx.xx.xx"}
 

如果这样,那么对于flink on yarn perjob模式就没有办法区别当前指标到底属于哪一个任务,然后查看其它表例如numReocrdsIn包含了job_name/job_id 信息,导致同一个任务的不同metric写入的tags不同是为什么呢?果断看下InfluxdbReporter源码,可以发现在InfluxdbReporter的继承类AbstractReporter中的notifyOfAddedMetric方法,每一个metric在被添加时其metric也就被确定了,其tags由MeasurementInfoProvider的getTags方法来获取:

代码语言:javascript复制
privatestaticMap<String,String> getTags(MetricGroup group){

// Keys are surrounded by brackets: remove them, transforming "<name>" to "name".

Map<String,String> tags =newHashMap<>();

for(Map.Entry<String,String> variable: group.getAllVariables().entrySet()){

String name = variable.getKey();

            tags.put(name.substring(1, name.length()-1), variable.getValue());

}

return tags;

}
 

每一个metric的MetricGroup都是不同的,所以导致了上面观察到的现象,现在想每一个metric都包含job_name/job_id信息,我们可以将包含job_name/job_id 信息的提取出来添加到其他metric的tags的,需要改写一下源码,实现方式如下:

  1. 在AbstractReporter中定义如下变量
代码语言:javascript复制
privatefinalString JOB_NAME_LABEL ="job_name";

privatefinalString JOB_ID_LABEL ="job_id";

protectedString jobName;

protectedString jobId;

protectedMap<String,String> jobInfo =newHashMap<>();
 

在AbstractReporter增加一个能够在InfluxdbReporter获取jobInfo方法

代码语言:javascript复制
protectedAbstractReporter(MetricInfoProvider<MetricInfo> metricInfoProvider){

this.metricInfoProvider = metricInfoProvider;

}
 

在notifyOfAddedMetric方中新增获取job_name/job_id的逻辑

代码语言:javascript复制
MeasurementInfo measurementInfo =((MeasurementInfo) metricInfo);

Map<String,String> tags = measurementInfo.getTags();

if(StringUtils.isBlank(jobName)){

            jobId = tags.get(JOB_NAME_LABEL);

}else{

            jobInfo.put(JOB_NAME_LABEL, jobName);

}

if(StringUtils.isBlank(jobId)){

            jobName = tags.get(JOB_ID_LABEL);

}else{

            jobInfo.put(JOB_ID_LABEL, jobId);

}
 

2. 看一下InfluxdbReporter的上报方法report中,其通过buildReport构造了BatchPoints,每个Point的构造又是通过MetricMapper的map方法构造,最终调用了MetricMapper的builder方法,那么就需要将jobInfo添加到tags中,改造如下: InfluxdbReporter的buildReport方法中

代码语言:javascript复制
report.point(MetricMapper.map(entry.getValue(), timestamp, entry.getKey(),getJobInfo())); 

那么需要在MetricMapper对应map方法中新增Map jobInfo参数,需要将这个参数传入到builder方法中

代码语言:javascript复制
privatestaticPoint.Builder builder(MeasurementInfo info,Instant timestamp,Map<String,String> jobInfo){

Map<String,String> tags=info.getTags();

        tags.putAll(jobInfo);

returnPoint.measurement(info.getName())

.tag(tags)

.time(timestamp.toEpochMilli(),TimeUnit.MILLISECONDS);

}
 

至此改造全部完成,然后重新打包上传,看一下结果

代码语言:javascript复制
taskmanager_Status_JVM_CPU_Load_value{db="flink-metric",host="xx.xx.xx.xx",job_id="d9f2524eeb590a61628560d8677a1b23",job_name="test",tm_id="container_e03_1573197073760_0141_01_000003"}

已经满足我们预期的结果,接下来就可以通过配置job_name或者job_id条件筛选,查看想要的metric。

0 人点赞