最近将Flink集群从1.6升级到1.8,主要是为了使用1.8的两个特性:一个是universal kafka ,另外一个是rocksdb ttl, 然后注意到1.8 提供了Influxdb 的reporter, 在最开始1.6使用的rest api方式主动请求对应的metric, 使用这种方式目前有两个弊端:
- 前期使用metric比较少,自己通过开发图表展示,但是后期需要新的metric 都需要开发一次
- 客户端使用轮询的方式去请求,如果任务比较多就会造成一定延时,并且实时平台在做高可用情况下,涉及定时的切换,给系统开发带来一定复杂性
面对这两个问题选择了influxdb grafana的方式,也应该是很多公司选择的方案,当然也有选择Prometheus 的,接下来介绍一下将flink metric 上报influxdb部署的步骤:
- 将flink安装包下面opt目录的flink-metrics-influxdb-1.8.2.jar包拷贝至lib目录下面
- 在flink-conf.yaml 中增加配置,
metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter
metrics.reporter.influxdb.host: localhost //influxdb服务所在的地址
metrics.reporter.influxdb.port:8086//influxdb 端口
metrics.reporter.influxdb.db: flink //influxdb库名称
metrics.reporter.influxdb.username: flink-metrics
metrics.reporter.influxdb.password: qwerty
至此部署已经完成,接下来提交一个任务到集群中去,在grafana做一些图表展示,在influxdb中会自动生成很多measurement也就是表,选择taskmanager_Status_JVM_CPU_Load_value 表查看,发现其写入的Tags只有host与tm_id的信息,查看jobmanager_Status_JVM_CPU_Load_value表,发现其写入的Tags只有host信息,
代码语言:javascript复制taskmanager_Status_JVM_CPU_Load_value{db="flink-metric",host="xx.xx.xx.xx",tm_id="container_e03_1573197073760_0134_01_000006"}
jobmanager_Status_JVM_CPU_Load_value{db="flink-metric",host="xx.xx.xx.xx"}
如果这样,那么对于flink on yarn perjob模式就没有办法区别当前指标到底属于哪一个任务,然后查看其它表例如numReocrdsIn包含了job_name/job_id 信息,导致同一个任务的不同metric写入的tags不同是为什么呢?果断看下InfluxdbReporter源码,可以发现在InfluxdbReporter的继承类AbstractReporter中的notifyOfAddedMetric方法,每一个metric在被添加时其metric也就被确定了,其tags由MeasurementInfoProvider的getTags方法来获取:
代码语言:javascript复制privatestaticMap<String,String> getTags(MetricGroup group){
// Keys are surrounded by brackets: remove them, transforming "<name>" to "name".
Map<String,String> tags =newHashMap<>();
for(Map.Entry<String,String> variable: group.getAllVariables().entrySet()){
String name = variable.getKey();
tags.put(name.substring(1, name.length()-1), variable.getValue());
}
return tags;
}
每一个metric的MetricGroup都是不同的,所以导致了上面观察到的现象,现在想每一个metric都包含job_name/job_id信息,我们可以将包含job_name/job_id 信息的提取出来添加到其他metric的tags的,需要改写一下源码,实现方式如下:
- 在AbstractReporter中定义如下变量
privatefinalString JOB_NAME_LABEL ="job_name";
privatefinalString JOB_ID_LABEL ="job_id";
protectedString jobName;
protectedString jobId;
protectedMap<String,String> jobInfo =newHashMap<>();
在AbstractReporter增加一个能够在InfluxdbReporter获取jobInfo方法
代码语言:javascript复制protectedAbstractReporter(MetricInfoProvider<MetricInfo> metricInfoProvider){
this.metricInfoProvider = metricInfoProvider;
}
在notifyOfAddedMetric方中新增获取job_name/job_id的逻辑
代码语言:javascript复制MeasurementInfo measurementInfo =((MeasurementInfo) metricInfo);
Map<String,String> tags = measurementInfo.getTags();
if(StringUtils.isBlank(jobName)){
jobId = tags.get(JOB_NAME_LABEL);
}else{
jobInfo.put(JOB_NAME_LABEL, jobName);
}
if(StringUtils.isBlank(jobId)){
jobName = tags.get(JOB_ID_LABEL);
}else{
jobInfo.put(JOB_ID_LABEL, jobId);
}
2. 看一下InfluxdbReporter的上报方法report中,其通过buildReport构造了BatchPoints,每个Point的构造又是通过MetricMapper的map方法构造,最终调用了MetricMapper的builder方法,那么就需要将jobInfo添加到tags中,改造如下: InfluxdbReporter的buildReport方法中
代码语言:javascript复制report.point(MetricMapper.map(entry.getValue(), timestamp, entry.getKey(),getJobInfo()));
那么需要在MetricMapper对应map方法中新增Map jobInfo参数,需要将这个参数传入到builder方法中
代码语言:javascript复制privatestaticPoint.Builder builder(MeasurementInfo info,Instant timestamp,Map<String,String> jobInfo){
Map<String,String> tags=info.getTags();
tags.putAll(jobInfo);
returnPoint.measurement(info.getName())
.tag(tags)
.time(timestamp.toEpochMilli(),TimeUnit.MILLISECONDS);
}
至此改造全部完成,然后重新打包上传,看一下结果
代码语言:javascript复制taskmanager_Status_JVM_CPU_Load_value{db="flink-metric",host="xx.xx.xx.xx",job_id="d9f2524eeb590a61628560d8677a1b23",job_name="test",tm_id="container_e03_1573197073760_0141_01_000003"}
已经满足我们预期的结果,接下来就可以通过配置job_name或者job_id条件筛选,查看想要的metric。