由 Gyula Fora 和Matyas Orhidi 撰写
介绍
我们正在继续有关在Flink的帮助下实现实时日志聚合的博客系列。在本系列的《使用Flink进行实时日志聚合:第一部分》中,我们回顾了为什么从长期运行的分布式作业中实时收集和分析日志很重要。我们还研究了一种非常简单的解决方案,仅使用可配置的附加程序将日志存储在Kafka中。提醒一下,让我们再次检查管道
在本章中,我们将研究摄取、搜索和可视化的主题。我们仍将依靠CDP堆栈中可用的标准开源组件来完成我们的流程。在我们的解决方案中使用开源组件的方法确保了管道本身可以沿着标准层进行拆分,并且可以轻松地与任何集中式日志管理系统集成。我们将在本文后面讨论一些流行的解决方案,但是现在让我们看看如何在不离开舒适的CDP环境的情况下搜索和分析已经存储在Kafka中的日志。
使用Flink将日志编入Solr
我们使用Flink和Solr构建日志获取/索引管道。Flink提供了所有必要的抽象来实现强大的日志索引器,并提供用于后期处理的其他功能,例如复杂的警报逻辑。使用检查点机制,即使在发生故障时,我们也可以确保所有日志都被提取。
完整的日志提取实现以及构建说明可以在Gi t Hub 上找到 。但是,在构建和运行它之前,让我们仔细看一下流作业本身。
摄取作业概述
该LogIndexerJob 是我们Flink流工作的切入点。
我们的提取流程非常简单:
a) 传入JSON日志的Kafka源
b) 处理窗口和索引器以将日志摄取到Solr
c) 用于日志监视和警报的任意自定义逻辑
让我们详细了解这些步骤。
Kafka JSON输入
我们管道的第一步是从Kafka访问JSON日志。我们首先通过使用FlinkKafkaConsumer 源读取原始String消息,然后将它们转换为Map <String,String> 的流以更方便地访问来实现。
FlinkKafkaConsumer<String> logSource = new FlinkKafkaConsumer<>( inputTopic, new SimpleStringSchema(), kafkaProps);
KeyedStream<Map<String, String>, String> logStream = env .addSource(logSource) .name("Kafka Log Source") .uid("Kafka Log Source") .flatMap(new LogParser()) .name("JSON parser") .keyBy(map -> map.get(LogParser.YARN_CONTAINER_ID_KEY));
我们从Kafka读取日志流作为JSON String数据,并使用Jackson库将LogParser 类中的JSON转换为Map 。同时,我们从JSON中清除了一些不必要的字段,并添加了一个从容器ID派生的附加yarnApplicationId 字段。
应用程序ID充当单个Flink作业的所有日志的顶级分组标识符,而容器ID可用于区分来自不同任务管理器的日志消息。
由于LogParser 类使用Map <String,String> 作为输出类型,因此我们在整个ResultTypeQueryable 接口中提供了额外的信息类型。通过声明我们的TypeInformation 为新的MapTypeInfo <>(String.class,String.class),我们确保尽可能高效地序列化数据。
请注意,将keyBy操作应用于Map流。原因是并行窗口操作仅在键控流上执行。我们决定选择容器ID作为键,但是我们也可以使用任何合理的键为索引步骤提供所需的并行性。
窗口日志索引逻辑
现在,我们已经有了包含要存储的数据的Map流,下一步是将其添加到Solr。
尽管Solr可以处理大量要建立索引的数据(在Solr术语中称为文档),但我们要确保Flink和Solr之间的通信不会阻塞我们的数据管道。最简单的方法是将索引请求一起批处理。
我们利用Flink的处理时间窗口机制来创建这些批次,并通过选择足够小的窗口大小(几秒钟),将端到端的延迟保持在合理的最低水平。
DataStream<UpdateResponse> logIndexResponse = logStream .timeWindow(Time.seconds(params.getLong(LOG_INDEXING_BATCH_SIZE, 10))) .apply(new SolrIndexer(params)) .name("Solr Indexer") .uid("Solr Indexer");
实际的索引逻辑发生在SolrIndexer 窗口函数内部,并且由以下3个步骤组成:
a) 操作员启动时创建Solr Client
b) 当我们收到消息窗口时,我们使用客户端对它们进行索引
c) 操作员停止时关闭Solr Client
每个作业仅执行一次步骤1.和3.,因此可以在操作员各自的生命周期方法open 和close 中实施它们。索引步骤2.将对每个进入的窗口执行,因此它是在窗口函数的apply 方法中实现的。
配置参数在函数的构造函数中传递,并与函数定义一起序列化。我们的索引器运算符采用以下必需的配置参数,这些参数应在我们的作业属性文件中指定:
solr.urls=<solr-host:port>/solrsolr.collection=flink-logs
索引逻辑的输出是UpdateResponse 对象的流,其中包含Solr是否成功接收了索引请求的信息。
5.1. 索引错误处理
在此参考实现中,我们选择了一种简单的错误处理方法,其中我们只记录索引错误而不对它们采取任何措施。
关键日志的另一种方法是确保重试索引步骤,直到它们成功为止,重试逻辑可以轻松地合并到窗口函数实现中。
5.2. 自定义日志处理逻辑
一旦将日志流连续摄取到Flink作业中,我们就可以灵活地利用它来完成许多工作,而不仅仅是将它们索引到Solr。
通过对特定领域的理解,我们可以轻松地添加一些逻辑来检测日志中的模式,否则这些模式很难在仪表板层上实现。
我们还可以使用Flink的状态处理抽象来随着时间的推移建立应用程序的健康状况,并随着时间的推移迅速发现问题。
运行Flink应用程序
在启动Flink应用程序之前,我们必须创建将用日志填充的Solr集合。我们可以使用命令行客户端简单地分两步执行此操作:
solrctl config --create flink-logs-conf schemalessTemplate -p immutable=falsesolrctl collection --create flink-logs -c flink-logs-conf
收集准备就绪后,我们可以创建solr_indexer.props文件来指定我们的应用程序参数:
# General propslog.input.topic=flink.logs# Solr propssolr.urls=<solr-host:port>/solrsolr.collection=flink-logs# Kafka propskafka.group.id=flinkkafka.bootstrap.servers=<kafka-brokers>
设置完所有内容后,我们可以使用Flink CLI在集群上执行我们的作业。
flink run -m yarn-cluster -p 2 flink-solr-log-indexer-1.0-SNAPSHOT.jar --properties.file solr_indexer.props
我们可以首先从低并行度设置开始(在这种情况下为2),然后逐渐增加以满足我们的吞吐量要求。随着并行度的增加,我们可能还必须添加更多的任务管理器和内存。
使用Hue记录仪表板
现在,我们的日志由Flink作业连续处理和索引,最后一步是通过交互式图形界面将其公开给最终用户。尽管Solr本身提供了一个用于搜索日志的Web界面,但我们可以通过为日志数据创建一些漂亮的仪表板来获得更好的见解。为此,我们将使用Hue。
Hue是基于Web的交互式查询编辑器,可让您与数据仓库进行交互。它还具有一些高级仪表板功能,使我们能够随着时间的推移监视日志。
在“仪表板”页面上,我们可以立即访问Solr集合。在屏幕的右侧,我们可以看到所有可用的记录字段,因此我们可以轻松地将其拖放以选择我们真正需要的字段。
我们还可以创建不同的图和图表来跟踪随时间变化的不同指标。
与其他日志记录解决方案比较
我们已经成功构建并部署了可以与我们的数据处理应用程序集成的日志聚合管道。
通过一些额外的调整和维护,我们可以将其变成一个体面的生产系统,该系统以低延迟收集和公开日志,同时具有可伸缩性。除了日志提取工作之外,我们还可以获得完全定制的功能,这些功能在其他任何地方都很难找到。
另一方面,有许多现成的生产级测井解决方案可以“正常工作”。让我们仔细研究一下我们的自定义解决方案与现有的一些日志聚合框架的比较以及我们的设置如何与其他工具配合使用。
这绝不是一个详尽的比较,我们的目的不是列出所有可能的解决方案,而是让您大致了解我们的立场。
ELK堆栈
Elasticsearch – Logstash – Kibana(又名ELK)堆栈通常用于收集和监视应用程序日志和指标。它应满足我们的流式应用程序具有的所有日志聚合要求。
与我们的自定义管道类似,它带有使用logstash的自己的日志提取逻辑。日志存储在elasticsearch中。Kibana作为可视化仪表板层位于Elastic之上,我们可以在其中自定义监控逻辑。
由于logstash可以配置为直接从Kafka使用日志,因此我们可以重复使用为自己的自定义解决方案配置的相同的日志附加器/收集逻辑。
如果我们的数据处理堆栈已经包含某些框架(例如,Elastic),则ELK堆栈是可行的解决方案。在这种情况下,我们只需要设置Logstash或Apache NiFi之类的工具,使其与我们所需的摄取逻辑相匹配即可。
如果我们想将整个ELK堆栈作为新技术带入我们的组织,我们必须意识到这种选择的运营开销。与任何其他系统一样,它也面临着一系列挑战和成本。
Graylog
Graylog是专门设计用于日志聚合和监视的系统。它带有自己的日志提取逻辑和自定义附加程序,可以将其配置为直接使用我们的日志。
与logstash相似,我们还可以将Graylog配置为使用来自Kafka的日志消息,无论我们使用什么下游日志堆栈,我们都将选择Kafka作为日志收集层。
Graylog本身以弹性方式存储日志消息,并使用mongodb来存储我们的配置,仪表板等的元数据。
与ELK堆栈类似,如果我们具备操作堆栈的专业知识,Grayloag可能是我们日志记录堆栈的绝佳选择。
圆满完成
在Flink的帮助下,我们构建了一个高度可扩展且可自定义的日志记录解决方案,可以满足流应用程序的特殊要求。它汇总并存储长期运行的日志,并提供简单的功能,以便于近实时进行轻松的监视和诊断。该解决方案可以直接在CDP环境中使用,也可以轻松集成到集中式日志记录和监视系统中。Flink还可以通过警报功能帮助进一步扩展我们的解决方案。
原文链接:https://blog.cloudera.com/real-time-log-aggregation-with-apache-flink-part-2/