Flink(1.12.1)日志配置Logback实现日志切分和kafka发送

2021-03-16 15:02:03 浏览数 (1)

文档主要内容

产线环境上的Flink应用是长时运行的应用,日志量较大,需要将flink应用的日志发送到外部系统,方便进行日志检索。

最近,在开发环境上遇到了,Flink连接kafka报错刷出大量错误日志,把磁盘打满的问题。Flink从1.11开始已经实现了日志滚动,于是决定将Flink版本升级到最新的1.12.1并配置logback的rollingFileAppender和kafkaAppender实现日志切分和kafka发送。

集群环境

  • CDH-5.16.2
  • Flink-1.12.1
  • flink on yarn per job模式

Flink日志配置Logback实现日志切分和kafka发送

kafka发送部分的实现请参考之前的文章:如何将Flink应用的日志发送到kafka。其中,logback的jar包添加与该文一致。

Flink日志配置官网参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/advanced/logging.html

logback appenders配置参考:http://logback.qos.ch/manual/appenders.html

logback详细配置
  • 其中AppNameLayOut是为了在日志中打上每个Flink应用独立的业务名称
代码语言:javascript复制
<configuration>
    <property name="LOG_PATTERN"
              value="%d{yyyy-MM-dd HH:mm:ss.SSS} %app [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n"/>

    <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${log.file}</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
            <fileNamePattern>${log.file}.%i</fileNamePattern>
            <minIndex>1</minIndex>
            <maxIndex>10</maxIndex>
        </rollingPolicy>
        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
            <maxFileSize>10KB</maxFileSize>
        </triggeringPolicy>
        <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
            <layout class="com.changan.carbond.common.logconver.AppNameLayOut">
                <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %app [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n
                </pattern>
            </layout>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <appender name="kafka" class="com.github.danielwegener.logback.kafka.KafkaAppender">

        <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
            <layout class="com.changan.carbond.common.logconver.AppNameLayOut">
                <pattern>${LOG_PATTERN}</pattern>
            </layout>
            <charset>UTF-8</charset>
        </encoder>
        <topic>flink-app-logs</topic>
        <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>
        <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
        <producerConfig>bootstrap.servers=broker1:9092,broker2:9092,broker3:9092</producerConfig>
        <producerConfig>retries=3</producerConfig>
        <producerConfig>acks=1</producerConfig>
        <producerConfig>batch-size=16384</producerConfig>
        <producerConfig>buffer-memory=33554432</producerConfig>
        <!-- wait up to 1000ms and collect log messages before sending them as a batch -->
        <producerConfig>linger.ms=1000</producerConfig>
        <!-- even if the producer buffer runs full, do not block the application but start to drop messages -->
        <producerConfig>max.block.ms=0</producerConfig>
    </appender>

    <!-- This affects logging for both user code and Flink -->
    <root level="INFO">
        <appender-ref ref="file"/>
        <appender-ref ref="kafka"/>
    </root>

    <!-- Uncomment this if you want to only change Flink's logging -->
    <!--<logger name="org.apache.flink" level="INFO">-->
    <!--<appender-ref ref="file"/>-->
    <!--</logger>-->

    <!-- The following lines keep the log level of common libraries/connectors on
         log level INFO. The root logger does not override this. You have to manually
         change the log levels here. -->
    <logger name="akka" level="INFO">
        <appender-ref ref="file"/>
        <appender-ref ref="kafka"/>
    </logger>
    <logger name="org.apache.kafka" level="INFO">
        <appender-ref ref="file"/>
        <appender-ref ref="kafka"/>
    </logger>
    <logger name="org.apache.hadoop" level="INFO">
        <appender-ref ref="file"/>
        <appender-ref ref="kafka"/>
    </logger>
    <logger name="org.apache.zookeeper" level="INFO">
        <appender-ref ref="file"/>
        <appender-ref ref="kafka"/>
    </logger>

    <!-- Suppress the irrelevant (wrong) warnings from the Netty channel handler -->
    <logger name="org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR">
        <appender-ref ref="file"/>
        <appender-ref ref="kafka"/>
    </logger>
</configuration>
Flink日志切分和日志搜集测试
  • 编写一个简单的Flink应用,在apply方法中打个日志
  • 编译打包,运行flink run将任务提交到集群,采用flink on yarn per job模式
  • 将flink集群日志文件配置为10kb滚动一个文件,测试日志滚动的效果
代码语言:javascript复制
flink run -m yarn-cluster -yd -ytm 2g -ys 4 -yjm 2g -ynm flink-demo测试 
-c com.eights.carbond.streaming.sensor.AverageSensorReadings 
./flink-demo-1.0-SNAPSHOT.jar
  • 业务独立名称打在日志中
  • 日志文件滚动正常
  • ES检查是否有Flink日志进入

整个日志搜集正常,Flink1.12日志配置logback日志切分和kafka搜集完成

0 人点赞