文档主要内容
产线环境上的Flink应用是长时运行的应用,日志量较大,需要将flink应用的日志发送到外部系统,方便进行日志检索。
最近,在开发环境上遇到了,Flink连接kafka报错刷出大量错误日志,把磁盘打满的问题。Flink从1.11开始已经实现了日志滚动,于是决定将Flink版本升级到最新的1.12.1并配置logback的rollingFileAppender和kafkaAppender实现日志切分和kafka发送。
集群环境
- CDH-5.16.2
- Flink-1.12.1
- flink on yarn per job模式
Flink日志配置Logback实现日志切分和kafka发送
kafka发送部分的实现请参考之前的文章:如何将Flink应用的日志发送到kafka。其中,logback的jar包添加与该文一致。
Flink日志配置官网参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/advanced/logging.html
logback appenders配置参考:http://logback.qos.ch/manual/appenders.html
logback详细配置
- 其中AppNameLayOut是为了在日志中打上每个Flink应用独立的业务名称
<configuration>
<property name="LOG_PATTERN"
value="%d{yyyy-MM-dd HH:mm:ss.SSS} %app [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n"/>
<appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.file}</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${log.file}.%i</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>10</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>10KB</maxFileSize>
</triggeringPolicy>
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="com.changan.carbond.common.logconver.AppNameLayOut">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %app [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n
</pattern>
</layout>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="kafka" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="com.changan.carbond.common.logconver.AppNameLayOut">
<pattern>${LOG_PATTERN}</pattern>
</layout>
<charset>UTF-8</charset>
</encoder>
<topic>flink-app-logs</topic>
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
<producerConfig>bootstrap.servers=broker1:9092,broker2:9092,broker3:9092</producerConfig>
<producerConfig>retries=3</producerConfig>
<producerConfig>acks=1</producerConfig>
<producerConfig>batch-size=16384</producerConfig>
<producerConfig>buffer-memory=33554432</producerConfig>
<!-- wait up to 1000ms and collect log messages before sending them as a batch -->
<producerConfig>linger.ms=1000</producerConfig>
<!-- even if the producer buffer runs full, do not block the application but start to drop messages -->
<producerConfig>max.block.ms=0</producerConfig>
</appender>
<!-- This affects logging for both user code and Flink -->
<root level="INFO">
<appender-ref ref="file"/>
<appender-ref ref="kafka"/>
</root>
<!-- Uncomment this if you want to only change Flink's logging -->
<!--<logger name="org.apache.flink" level="INFO">-->
<!--<appender-ref ref="file"/>-->
<!--</logger>-->
<!-- The following lines keep the log level of common libraries/connectors on
log level INFO. The root logger does not override this. You have to manually
change the log levels here. -->
<logger name="akka" level="INFO">
<appender-ref ref="file"/>
<appender-ref ref="kafka"/>
</logger>
<logger name="org.apache.kafka" level="INFO">
<appender-ref ref="file"/>
<appender-ref ref="kafka"/>
</logger>
<logger name="org.apache.hadoop" level="INFO">
<appender-ref ref="file"/>
<appender-ref ref="kafka"/>
</logger>
<logger name="org.apache.zookeeper" level="INFO">
<appender-ref ref="file"/>
<appender-ref ref="kafka"/>
</logger>
<!-- Suppress the irrelevant (wrong) warnings from the Netty channel handler -->
<logger name="org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR">
<appender-ref ref="file"/>
<appender-ref ref="kafka"/>
</logger>
</configuration>
Flink日志切分和日志搜集测试
- 编写一个简单的Flink应用,在apply方法中打个日志
- 编译打包,运行flink run将任务提交到集群,采用flink on yarn per job模式
- 将flink集群日志文件配置为10kb滚动一个文件,测试日志滚动的效果
flink run -m yarn-cluster -yd -ytm 2g -ys 4 -yjm 2g -ynm flink-demo测试
-c com.eights.carbond.streaming.sensor.AverageSensorReadings
./flink-demo-1.0-SNAPSHOT.jar
- 业务独立名称打在日志中
- 日志文件滚动正常
- ES检查是否有Flink日志进入
整个日志搜集正常,Flink1.12日志配置logback日志切分和kafka搜集完成