为什么有这篇文章
使用ES来查询业务日志在开发中是非常常见的一种方式,典型的解决方案是ELK,已经非常成熟了。Flink是一个比较年轻的开源项目,已经发展了好几年,但是周边的生态还是不是很完善,比如日志收集其实不太友好,比如session模式想要按job收集日志就做不到,另外比较流行的是yarn和k8s模式,k8s理论上应该是比较容易收日志的,越来越多的公司大量java应用都跑在k8s里。日志收集这块也限制只能用商业产品,不好用。我们希望接到es里
配置文件
原生的log4j配置文件支持yml格式和xml格式,全托管的只支持xml,并且xml配置出来日志信息是单行文本,不能直接进es,并且日志中其实没有job的标识,收到一起也没法区分哪条日志是哪个job的,这显然不行。我们要想日志收集到es可用,就必须解决这两个问题。一是需要json格式,二是需要将job名称带到每一条日志中。
日志收集
经过不断尝试,终于解决了这两个问题,在这里分享一下,核心其实就下面几行配置,通过配置JsonLayout让日志打出来是json格式,然后就是增加时间、host和appName,appName就是job的名字,可以每个任务都自定义一下
代码语言:javascript复制<JsonLayout properties="true" locationInfo="true" compact="true" eventEol="true">
<KeyValuePair key="appName" value="LogCollectDemo"/>
<KeyValuePair key="@timestamp" value="${date:yyyy-MM-dd HH:mm:ss.SSS}"/>
<KeyValuePair key="host_name" value="${hostName}"/>
</JsonLayout>
收集到的日志长这样,hostname其实没什么用,有用的其实就4个字段,时间、jobname、message、level
代码语言:javascript复制{
"instant": {
"epochSecond": 1653276334,
"nanoOfSecond": 185000000
},
"thread": "main",
"level": "INFO",
"loggerName": "org.apache.flink.configuration.GlobalConfiguration",
"message": "Loading configuration property: metrics.reporter.promappmgr.class, org.apache.flink.metrics.prometheus.PrometheusReporter",
"endOfBatch": false,
"loggerFqcn": "org.apache.logging.slf4j.Log4jLogger",
"contextMap": {},
"threadId": 1,
"threadPriority": 5,
"source": {
"class": "org.apache.flink.configuration.GlobalConfiguration",
"method": "loadYAMLResource",
"file": "GlobalConfiguration.java",
"line": 228
},
"appName": "LogCollectDemo",
"@timestamp": "2023-01-01 16:45:33.376",
"host_name": "job-abc-123456-taskmanager-1-1"
}
收集到oss之后,可以找台服务器拉下来再用filebeat收走丢到kafka,也可以走sls丢kafka,然后就随便玩了
完整的配置如下,这个配置日志是输出到oss的,sls还没买,等测试过了再写出来,但和oss的应该大同小异
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<Configuration xmlns="http://logging.apache.org/log4j/2.0/config"
strict="true" packages="com.ververica.platform.logging.appender" status="WARN">
<Appenders>
<Appender name="StdOut" type="Console">
<Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT 8} %-5p %-60c %x - %m%n" type="PatternLayout"/>
</Appender>
<Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i">
<JsonLayout properties="true" locationInfo="true" compact="true" eventEol="true">
<KeyValuePair key="appName" value="LogCollectDemo"/>
<KeyValuePair key="@timestamp" value="${date:yyyy-MM-dd HH:mm:ss.SSS}"/>
<KeyValuePair key="host_name" value="${hostName}"/>
</JsonLayout>
<Policies>
<SizeBasedTriggeringPolicy size="20 MB"/>
</Policies>
<DefaultRolloverStrategy max="4"/>
</Appender>
<Appender name="OSS" type="OSS">
<JsonLayout properties="true" locationInfo="true" compact="true" eventEol="true">
<KeyValuePair key="appName" value="LogCollectDemo"/>
<KeyValuePair key="@timestamp" value="${date:yyyy-MM-dd HH:mm:ss.SSS}"/>
<KeyValuePair key="host_name" value="${hostName}"/>
</JsonLayout>
<Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
<Property name="baseUri">oss://YOUR-BUCKET-NAME/</Property>
<Property name="endpoint">https://YOUR-ENDPOINT</Property>
<Property name="flushIntervalSeconds">60</Property>
<Property name="flushIntervalEventCount">200</Property>
<Property name="rollingBytes">10000000</Property>
</Appender>
</Appenders>
<Loggers>
<Logger level="INFO" name="org.apache.hadoop"/>
<Logger level="INFO" name="org.apache.kafka"/>
<Logger level="INFO" name="org.apache.zookeeper"/>
<Logger level="INFO" name="akka"/>
<Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>
<Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/>
{%- for name, level in userConfiguredLoggers -%}
<Logger level="{{ level }}" name="{{ name }}"/>
{%- endfor -%}
<Root level="{{ rootLoggerLogLevel }}">
<AppenderRef ref="StdOut"/>
<AppenderRef ref="RollingFile"/>
<AppenderRef ref="OSS"/>
</Root>
</Loggers>
</Configuration>