本篇文章大概4833字,阅读时间大约13分钟
产线环境上的Flink应用是长时运行的应用,日志量较大,通过flink web页面查看任务日志会导致浏览器卡死,通过日志排查问题非常不便。因此,需要将flink应用的日志发送到外部系统,方便进行日志检索
集群环境
- CDH-5.16.2
- Flink-1.10.1
- flink on yarn per job模式
Flink应用日志搜集方案
ELK全家桶是比较成熟的开源日志检索方案,flink日志搜集要做的就是将日志打到kafka,剩余的工作交由ELK完成即可。整个数据流向如下:
- flink应用集成logback进行日志打点,通过logback-kafka-appender将日志发送到kafka
- logstash消费kafka的日志消息送入es中,通过kibana进行检索
核心问题
- 如何在topic中区分出指定的应用日志
- 需要在flink的日志中拼上业务应用名称的app name列进行应用区分
- 通过logback自定义layout的方式打上flink业务应用名称
- 独立的flink应用名称通过在conf/flink-conf.yaml中配置java opts进行实现,在每个任务提交前先进行job.name参数的替换和设置
- 需要在flink的日志中拼上业务应用名称的app name列进行应用区分
#========================================================================
# Job Conf
#========================================================================
env.java.opts: -Djob.name=FLINK日志DEMO
日志搜集方案实现
Flink集成logback
Flink-1.10.1中提供了log4j和logback的配置文件,默认情况下使用log4j,这里采用logback进行日志打点。
- 上传logback相关依赖到flink的lib目录下
- logback-access-1.2.3.jar
- logback-classic-1.2.3.jar
- logback-core-1.2.3.jar
- logback-kafka-appender-0.2.0-RC2.jar
- 自定义logback的converter和layout
/**
* 获取flink应用的java环境变量传递的应用名称并添加到日志中
*
* @author Eights
*/
public class AppNameConvert extends ClassicConverter {
private static final String JOB_NAME = "job.name";
private static String appName = "应用默认名称";
//应用名称,这里就可以获取yarn application id, 运行机器之类的指标打到日志上
static {
String jobName = System.getProperty(JOB_NAME);
if (!StringUtils.isNullOrWhitespaceOnly(jobName)) {
appName = jobName;
}
}
@Override
public String convert(ILoggingEvent iLoggingEvent) {
return appName;
}
}
/**
* 自定义应用名称layout
* @author Eights
*/
public class AppNameLayOut extends PatternLayout {
static {
defaultConverterMap.put("app", AppNameConvert.class.getName());
}
}
- 配置集群上的flink/conf/logback.xml文件
- kafka-appender:
https://github.com/danielwegener/logback-kafka-appender
代码语言:javascript复制<configuration>
<property name="LOG_PATTERN"
value="%d{yyyy-MM-dd HH:mm:ss.SSS} %app [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n"/>
<appender name="file" class="ch.qos.logback.core.FileAppender">
<file>${log.file}</file>
<append>false</append>
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="com.eights.carbond.common.logconver.AppNameLayOut">
<pattern>${LOG_PATTERN}</pattern>
</layout>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="com.eights.carbond.common.logconver.AppNameLayOut">
<pattern>${LOG_PATTERN}</pattern>
</layout>
<charset>UTF-8</charset>
</encoder>
<topic>flink-app-logs</topic>
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
<producerConfig>bootstrap.servers=dn2.eights.com:9092,dn3.eights.com:9092,dn4.eights.com:9092</producerConfig>
<producerConfig>retries=3</producerConfig>
<producerConfig>acks=1</producerConfig>
<producerConfig>batch-size=16384</producerConfig>
<producerConfig>buffer-memory=33554432</producerConfig>
<producerConfig>properties.max.request.size==2097152</producerConfig>
<producerConfig>linger.ms=1000</producerConfig>
<producerConfig>max.block.ms=0</producerConfig>
</appender>
<!-- This affects logging for both user code and Flink -->
<root level="INFO">
<appender-ref ref="file"/>
<appender-ref ref="kafkaAppender"/>
</root>
<!-- Uncomment this if you want to only change Flink's logging -->
<!--<logger name="org.apache.flink" level="INFO">-->
<!--<appender-ref ref="file"/>-->
<!--</logger>-->
<!-- The following lines keep the log level of common libraries/connectors on
log level INFO. The root logger does not override this. You have to manually
change the log levels here. -->
<logger name="akka" level="INFO">
<appender-ref ref="file"/>
<appender-ref ref="kafkaAppender"/>
</logger>
<logger name="org.apache.kafka" level="INFO">
<appender-ref ref="file"/>
<appender-ref ref="kafkaAppender"/>
</logger>
<logger name="org.apache.hadoop" level="INFO">
<appender-ref ref="file"/>
<appender-ref ref="kafkaAppender"/>
</logger>
<logger name="org.apache.zookeeper" level="INFO">
<appender-ref ref="file"/>
<appender-ref ref="kafkaAppender"/>
</logger>
<!-- Suppress the irrelevant (wrong) warnings from the Netty channel handler -->
<logger name="org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR">
<appender-ref ref="file"/>
<appender-ref ref="kafkaAppender"/>
</logger>
</configuration>
Flink日志发送到kafka测试
编写一个简单的flink-demo应用,在窗口的apply方法中打一波日志
编译打包,运行一下命令提交到yarn集群,采用yarn per job模式
代码语言:javascript复制flink run -m yarn-cluster -yd -ytm 2g -ys 4 -yjm 2g -ynm flink-demo测试
-c com.eights.carbond.streaming.sensor.AverageSensorReadings -j
./flink-demo-1.0-SNAPSHOT.jar
在flink的web ui上可以查看自定义的业务名称
消费kafka的topic命令如下:
代码语言:javascript复制kafka-console-consumer --bootstrap-server dn2.eights.com:9092
--topic flink-app-logs --from-beginning --group eights
--consumer-property enable.auto.commit=false
可以发现自定义的Flink业务应用名称已经打到了日志上,kafka中的日志显示正常,flink应用日志发送到kafka测试完成。