某云平台Flink日志按Job名称收集到ES

2023-05-10 23:19:11 浏览数 (2)

为什么有这篇文章

使用ES来查询业务日志在开发中是非常常见的一种方式,典型的解决方案是ELK,已经非常成熟了。Flink是一个比较年轻的开源项目,已经发展了好几年,但是周边的生态还是不是很完善,比如日志收集其实不太友好,比如session模式想要按job收集日志就做不到,另外比较流行的是yarn和k8s模式,k8s理论上应该是比较容易收日志的,越来越多的公司大量java应用都跑在k8s里。日志收集这块也限制只能用商业产品,不好用。我们希望接到es里

配置文件

原生的log4j配置文件支持yml格式和xml格式,全托管的只支持xml,并且xml配置出来日志信息是单行文本,不能直接进es,并且日志中其实没有job的标识,收到一起也没法区分哪条日志是哪个job的,这显然不行。我们要想日志收集到es可用,就必须解决这两个问题。一是需要json格式,二是需要将job名称带到每一条日志中。

日志收集

经过不断尝试,终于解决了这两个问题,在这里分享一下,核心其实就下面几行配置,通过配置JsonLayout让日志打出来是json格式,然后就是增加时间、host和appName,appName就是job的名字,可以每个任务都自定义一下

代码语言:javascript复制
<JsonLayout properties="true" locationInfo="true" compact="true" eventEol="true"> 
              <KeyValuePair key="appName" value="LogCollectDemo"/>
              <KeyValuePair key="@timestamp" value="${date:yyyy-MM-dd HH:mm:ss.SSS}"/>
              <KeyValuePair key="host_name" value="${hostName}"/> 
</JsonLayout>  

收集到的日志长这样,hostname其实没什么用,有用的其实就4个字段,时间、jobname、message、level

代码语言:javascript复制
{
	"instant": {
		"epochSecond": 1653276334,
		"nanoOfSecond": 185000000
	},
	"thread": "main",
	"level": "INFO",
	"loggerName": "org.apache.flink.configuration.GlobalConfiguration",
	"message": "Loading configuration property: metrics.reporter.promappmgr.class, org.apache.flink.metrics.prometheus.PrometheusReporter",
	"endOfBatch": false,
	"loggerFqcn": "org.apache.logging.slf4j.Log4jLogger",
	"contextMap": {},
	"threadId": 1,
	"threadPriority": 5,
	"source": {
		"class": "org.apache.flink.configuration.GlobalConfiguration",
		"method": "loadYAMLResource",
		"file": "GlobalConfiguration.java",
		"line": 228
	},
	"appName": "LogCollectDemo",
	"@timestamp": "2023-01-01 16:45:33.376",
	"host_name": "job-abc-123456-taskmanager-1-1"
}

收集到oss之后,可以找台服务器拉下来再用filebeat收走丢到kafka,也可以走sls丢kafka,然后就随便玩了

完整的配置如下,这个配置日志是输出到oss的,sls还没买,等测试过了再写出来,但和oss的应该大同小异

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<Configuration xmlns="http://logging.apache.org/log4j/2.0/config"
strict="true" packages="com.ververica.platform.logging.appender" status="WARN">
  <Appenders>
    <Appender name="StdOut" type="Console">
      <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT 8} %-5p %-60c %x - %m%n" type="PatternLayout"/>
    </Appender>
    <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i">
      <JsonLayout properties="true" locationInfo="true" compact="true" eventEol="true"> 
              <KeyValuePair key="appName" value="LogCollectDemo"/>
              <KeyValuePair key="@timestamp" value="${date:yyyy-MM-dd HH:mm:ss.SSS}"/>
              <KeyValuePair key="host_name" value="${hostName}"/> 
      </JsonLayout>   
      <Policies>
        <SizeBasedTriggeringPolicy size="20 MB"/>
      </Policies> 
      <DefaultRolloverStrategy max="4"/>
    </Appender> 
     
    <Appender name="OSS" type="OSS">
      <JsonLayout properties="true" locationInfo="true" compact="true" eventEol="true">
              <KeyValuePair key="appName" value="LogCollectDemo"/>
              <KeyValuePair key="@timestamp" value="${date:yyyy-MM-dd HH:mm:ss.SSS}"/>
              <KeyValuePair key="host_name" value="${hostName}"/>
      </JsonLayout> 
      <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
      <Property name="baseUri">oss://YOUR-BUCKET-NAME/</Property>
      <Property name="endpoint">https://YOUR-ENDPOINT</Property> 
      <Property name="flushIntervalSeconds">60</Property> 
      <Property name="flushIntervalEventCount">200</Property> 
      <Property name="rollingBytes">10000000</Property> 
    </Appender>
  </Appenders> 
  <Loggers>
    <Logger level="INFO" name="org.apache.hadoop"/> 
    <Logger level="INFO" name="org.apache.kafka"/> 
    <Logger level="INFO" name="org.apache.zookeeper"/> 
    <Logger level="INFO" name="akka"/> 
    <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/> 
    <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/>
    {%- for name, level in userConfiguredLoggers -%}
      <Logger level="{{ level }}" name="{{ name }}"/>
    {%- endfor -%}
    <Root level="{{ rootLoggerLogLevel }}">
      <AppenderRef ref="StdOut"/>
      <AppenderRef ref="RollingFile"/> 
      <AppenderRef ref="OSS"/>
    </Root>
  </Loggers>
</Configuration>

0 人点赞