目前基于ELK架构的日志系统,通过filebeat收集上来的日志都会发送到同一个kafka topic中,然后再由Logstash消费处理写入Elasticsearch中,这种方式导致该topic包含所有业务日志,那么各个业务去做实时统计分析就会造成重复消费,使得流量成本的浪费;对于离线分析的日志来源是通过在应用服务端定时上传的方式,对于日志量比较大的业务,一方面上传时会对应用服务器造成比较大的压力,另一方面这种上传方式对于后续小时或者分钟级别分析造成一定延时。
本文将会介绍基于Flink的日志采集平台来解决这些问题。
采集架构
•拆分:最上层Kafka A表示由filebeat收集上来的应用日志,然后通过Flink程序对Kafka topic进行拆分,根据不同的业务拆分到到下游Kafka B不同的topic中,那么对于下游的实时处理任务只需要消费对应的业务日志即可,避免了重复消费;
•转储:对于发送到Kafka B不同的业务日志,通过Flink程序转储写入到HDFS上,生成小时分区文件,供后续的离线日志分析
拆分实现
避免重复消费:为了避免对大topic的重复消费,对于同一个topic只会消费一次,也就是只会启动一个Flink任务,按照一定的规则对数据进行拆分,常见的规则就是应用名称、类型、日志文件名称等,在filebeat收集的时候这些信息都会被带上,作为拆分的依据;
可配置化:为了满足业务方能够快速获取自己的业务日志,就必须提供可配置规则的可视化界面,提供填写拆分应用标识、目标Kafka topic等,将这些规则信息保存在数据库中,然后拆分的Flink任务定时加载规则信息;
日志格式:在实践中规定日志格式是非常有必要的,为了保证拆分任务能够准确的拆分出对应的业务日志,就必须按照指定的日志格式进行打点
转储实现
通用实现:对于不同的业务日志,其日志的具体内容肯定各不相同,对于我们来说不可能每一个业务都去写一套转储的程序,希望一套程序能够处理所有的业务日志,因此对于我们来说不管任何日志对于我们来说其所代表的含义就是一个data字段对应的数据,那么就只需要把这个data字段写入到对应的hdfs目录文件即可;
数据分区:默认分区字段根据日志中一个固定的时间字段进行分区,在实践中对于老的日志并没有按照规范日志进行打点或者分区的时间字段不是通用的一个字段,需要按照日志中一个特殊的字段解析进行分区,如果将这个解析直接放在程序里面根据业务判断,最终的结果会造成代码很难维护,解决方式就是将DataStream处理转换为Table/SQL 的处理,将数据流注册成表,然后通过udf去解析出来需要的分区字段,同样这个udf无法通用,那么就必须支持不同的udf,但是对于处理却是通用的,例如: select data,udf(data) from tbl , 是一个固定的模板,只需要对于不同的转储程序加载不同的udf即可,通过Calcite 做sql语法解析,解析出使用的udf, 然后将其注册即可;
可配置化:同样需要提供界面让业务只需要通过配置一些规则即可完成日志的收集,配置消费的topic、写入数据位置、自定义分区语句支持(上面提到的自定义udf)等,在后台自动完成日志的收集开启;
其他几点:日志压缩与小文件合并可参考:StreamingFileSink压缩与合并小文件; 在实现过程中可能会存在集群迁移的场景,即将数据写入到另外的一个集群中,对于bulk的文件写入方式,其文件的滚动会在每次checkpoint使文件滚动,使用的滚动策略实现是OnCheckpointRollingPolicy,因此可以直接将hdfs文件copy到另外一个集群中,重新消费kafka的offset与生成的文件是同步的,但是存在另外一个问题,在hdfs上文件名称的生成规则是part-subtask-index,此时切换集群任务没有从checkpoint恢复index重新从0开始递增,存在覆盖以前文件的风险,因此对文件生成规则进行自定义,例如加上集群标识等。
总结
本篇主要介绍了基于Flink的采集架构以及一些关键的实现点,欢迎交流。