flume是我2015年为前公司主导开发【统一日志平台】时采用的技术(主要技术栈:flume ES Redis mongoBD Kafka Hadoop Netty ),期间也积累了不少经验(挖坑、踩坑、填坑)。
在我离开前,我们的日志平台数据量为8亿/天,高峰为8500万/小时、800万/5分钟。 flume agent单机压测15000/s数据量,未出现程序异常、资源占用过高与日志明显丢失情况。
离开前东家后,便没有再从事该类型的工作,因此当时的一些关于日志平台的想法也不再有机会去实践,暂且认为这是0.1版本吧。
本文将主要介绍我们在flume上做的一些定制开发与压测,另外时间已经过去了一年多,有一些细节难免有点忘却。
1.Flume介绍
1.1 架构介绍
image.png
agent本身是一个Java进程,运行在日志收集节点—所谓日志收集节点就是服务器节点。
agent里面包含3个核心的组件:source—->channel—–>sink,类似生产者、仓库、消费者的架构。
source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。
channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。
sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定义。
event:将传输的数据进行封装,是flume传输数据的基本单位,如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。
2.背景说明
我们的需求是将Java 应用的log信息进行收集,达到日志采集的目的,agent目前主要有flume、Logstash,技术选型详情在此就不表了,最终选择的flume。
由于当时公司内部推行技术组件一直有难度,且也无法借助行政手段,因此我们在设计时很多时候考虑都是尽量对应用透明,比如我们的flume source使用的是基于log文件的,而未使用应用与flume agent采用长连接的方式(该方式需要修改log4j配置,并且引入我们的jar),比如我们agent进行日志等级判断时 需要兼容各种日志格式,因为我们难以推动各个应用方统一日志格式……
sre方面,当时并没有为agent预留内存等资源,所以一旦我们的agent出现资源占用过多,都会比较敏感。
整个日志平台1.0版本的架构图如下:
image.png
可以看到我们使用kafka将log信息做转储,消息消费者主要有HDFS、ES、Queue等。
3.定制开发
从我们的实际情况出发,我们做定制开发部分主要是source和sink部分。同时我们也开发了一套agent自动部署工具。
3.1 source定制
3.1.1 dirSource
基于文件的dirSource在flume高版本里已经去除了,原生的dirSource也存在很多性能和功能上的问题,为了在我们使用的flume1.6版本里继续使用dirSource,我们就基于1.6实现了一版dirSource。
dirSource特性
- 基于NIO的WatchEvent进行log文件内容的写操作监听,同时有能动态的监听文件的创建和删除。我们丰富了这部分的匹配模式,可以实现灵活的文件监听。
- 文件的读取基于RandomAccessFile,按行读取
- 将获取内容进行处理封装Event,存入Channel。
存在的问题
- 无论是WatchEvent还是RandomAccessFile在log疯狂输出时,CPU占用会居高不下。
3.1.2 execSource
execSource为flume新版本推出的用来替代dirSource的一种实现方式,主要是通过Java执行shell命令,并且获取shell命令的输出信息,如tail、cat等。
我们在原生的execSource基础上,实现了文件的自动监听,实现了多命令模式,并且会自动回收长时间无内容产出的命令,优化了原有的线程关闭的操作及进程钩子等。
execSource特性
- 基于NIO的WatchEvent进行log文件内容的写操作监听,同时有能动态的监听文件的创建和删除。我们丰富了这部分的匹配模式,可以实现灵活的文件监听
- 多命令模式
- 自动回收长时间无内容产出的命令
- 重启时自动清理无用的shell命令
存在的问题
- flume agent进程被kill -9 时,对导致执行的shell命令无法退出,进而导致句柄得不到释放,积累下来对服务器造成影响。
3.2 sink定制
我们采用的是kafka sink,flume原生的kafka sink使用的是老版本kafka producer client,发送消息时需要手动实现批量与异步,并且是消息发送的实现上存在一些不足,在大数据量时存在明显的性能瓶颈,并且会由于集合中消息数量太多而报异常,进而丢失消息。
我们定制的版本使用的new kafka producer client ,并且对消息发送做了优化,同时对Channel参数做了大量的压测,最终确定了最优配置。
kafkaSink特性
- 使用new kafka producer client ,默认异步批量发送
- 优化了消息体序列化方式
4.压测
下文描述的压测都是在建设日志平台过程中对flume的相关测试。 测试环境都是mac book pro ,这里只关注各个测试项的对比信息。
测试一
类型 | 日志总数 | 生产频率 | cpu | cpu平均 | mem | 数据丢失 | 用时 |
---|---|---|---|---|---|---|---|
tailDirSource New kafka sink | 50万 | 2000/s | 16-27% | 20% | 230M | 几百条以内 | 280s |
tailDirSource Old kafka sink | 50万 | 2000/s | 16-27% | 19% | 230M | 较上一种丢失少 | 280s |
tailDirSource New kafka sink | 50万 | 4000/s | 34-60% | 40% | 230M | <2000 | 145s |
tailDirSource Old kafka sink | 50万 | 4000/s | 34-57% | 41% | 230M | <200 | 145s |
execSource Old kafka sink | 50万 | 4000/s | <8% | 7.5% | 230M | <200 | 145s |
execSource Old kafka sink Spillable Memory Channel | 50万 | 4000/s | 8-10% | 9.5% | 230M | <200 | 145s |
execSource Old kafka sink File Channel | 50万 | 4000/s | 40-55% | 45% | 230M | <200 | 145s |
说明:
- 类型New kafka sink为:原生sink,使用kafka旧client,只定制了从head中获取配置参数,拼接字符串
- 类型Old kafka sink为:深度定制版,使用kafka新client
结论:
- flume 资源占用从kafka发送部分目前没有太好的优化方案,且旧kafka client数据丢失更加严重。
- 因此flume kafka sink 维持不变,后续可从flume source入手优化
测试二
类型 | 日志总数 | 生产频率 | cpu占用 | cpu平均 | 内存占用 | 数据丢失 | 用时 | JVM配置 |
---|---|---|---|---|---|---|---|---|
tailDirSource kafka api sink | 50万 | 3100/s | 34-60% | 40% | 230M | <200 | 163s | 512M |
tailDirSource kafka sink | 50万 | 3100/s | 34-57% | 41% | 230M | <200 | 163s | 512M |
execSource kafka sink | 50万 | 3100/s | <8% | 7.5% | 230M | <200 | 163s | 512M |
execSource kafka sink Spillable Memory Channel | 50万 | 3100/s | 8-10% | 9.5% | 230M | <200 | 163s | 512M |
execSource kafka sink File Channel | 50万 | 3100/s | 40-55% | 45% | 230M | <200 | 163s | 512M |
execSource kafka sink MemoryChannel | 500万 | 31074/s | 30-100% | 40% | 1G | <200 | 163s | 1G |
execSource kafka sink MemoryChannel | 250万 | 15337/s | 15-20% | 18% | 450M | <200 | 163s | 1G |
execSource kafka sink MemoryChannel FastJSON | 250万 | 15337/s | 18-22% | 20% | 420M | <200 | 163s | 1G |
custom execSource kafka sink MemoryChannel FastJSON | 250万 | 15432/s | 18-25% | 21% | 420M | <200 | 162s | 1G |
custom execSource kafka sink MemoryChannel FastJSON | 125万 125万 | 7661/s 7668/s | 20-26% | 24% | 440M | <500 | 163s | 1G |
测试三
配置说明一
代码语言:javascript复制 a1.sinks.k1.batch.num.messages = 5000
a1.sinks.k1.block.on.buffer.full = true
a1.sinks.k1.buffer.memory = 167108864
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 1000
flume -Xmx256M -Xms256M
测试结果一
日志写数量 | 用时 | 线程数 | QPS | 日志文件量 | 成功发送到kafka数量 | topic个数 | CPU | 内存 | 序列化方式 | 其他 |
---|---|---|---|---|---|---|---|---|---|---|
500万 | 74s | 50 | 70000/s | 600m | 280万(单个topic) | 2 | 未统计 | 300M | fastjson | agent异常 |
配置说明二
代码语言:javascript复制 a1.sinks.k1.batch.num.messages = 5000
a1.sinks.k1.block.on.buffer.full = true
a1.sinks.k1.buffer.memory = 167108864
a1.channels.c1.type = memory
a1.channels.c1.capacity = 500000
a1.channels.c1.transactionCapacity = 500
a1.channels.c1.byteCapacity = 536870912
flume -Xmx256M -Xms256M
测试结果二
日志写数量 | 用时 | 线程数 | QPS | 日志文件量 | 成功发送到kafka数量 | topic个数 | CPU | 内存 | 序列化方式 | 其他 |
---|---|---|---|---|---|---|---|---|---|---|
500万 | 68s | 50 | 74000/s | 600m | 500万(单个topic) | 2 | 200%以上 | 320M | fastjson | 无异常 |
500万 | 68s | 50 | 74000/s | 600m | 500万(单个topic) | 1 | 100%-200% | 320M | fastjson | 无异常 |
500万 | 68s | 50 | 74000/s | 600m | 500万(单个topic) | 1 | 小于100% | 280M | StringBuild拼接 | 无异常 |
总结
- 数据量过大时,sink中kafka client 缓存被存满,kafka会报异常,设置block=true后,存入缓存会被阻塞,kafka不报异常,但是由于sink从channel中消费的速度远低于source存入channel的速度,channel会报Unable to put event on required channel,flume停止提供服务。继续写入日志,会重复发送错误。
- 该异常可通过增大channel的byteCapacity参数或者调大JVM的参数值(byteCapacity默认为JVM的80%)来提高报错的阀值,且减小transactionCapacity 的值来减缓传输到sink的数据量。
- JVM内存参数在7万每秒的压力下,设置为256M较为合适,byteCapacity设置为512M较为合适,当增加channel个数或者增大channel向sink传输的数据量时,都会导致sink消费过慢报异常(总结1中异常),单个channel内存消耗在300M左右。
- 对于数据量较大的应用,建议只发送单个topic。