你为什么要关心 "摄入"阶段,或具体的摄取管道?
嗯,对于我们许多客户的用例和解决方案来说,索引吞吐量是最重要的关注点之一。我们花在摄取数据上的CPU时间越多,我们能达到的吞吐量就越少。
一个集群能够维持的索引吞吐量决定了能够实际存储到磁盘上的数据量,从而使之成为可搜索的数据。摄取过程中花费的每一个额外的CPU周期都意味着更少的可用搜索数据,最终意味着用户最终需要在硬件上花费更多,才可以确保他们有足够的空间来捕获他们需要的数据。
这实际上转化成了最终技术选型上最重要的裁决点之一。
在不少相同的使用场景上,我们可以发现ClickHouse和Quickwit不仅比我们更快地获取数据,而且只需花费一小部分CPU成本。
因此,数据摄取资源的使用,以及集群的大小,将会是我们用户是否会持续选择Elasticsearch的关键点。
什么是摄入(Ingestion)
简单地说,摄入可以被定义为吸收信息的过程。这实际上是描述我们在Elasticsearch中所说的 "摄入 "过程的一种恰当方式。
我们可以认为Elasticsearch的数据摄取过程大致分为四个主要方面:
- Lucene段的合并:CPU时间花在重新计算数据结构上,如doc值和倒置索引
- 将JSON解析成Lucene文档
- 索引本身,比如写到translog,tokenization,以及flush Lucene段。这就是使数据可以被搜索的原因。
- 最后,就是今天文章的重点,摄入管道(ingest pipeline)。
为了提高搜索效率,需要对数据进行结构化和格式化,以适应schema(或在Elasticsearch中称为映射——mapping)。鉴于许多不同数据源的异质性,数据往往需要被处理和解析,以确保它包含正确的值和字段。
具体到我们的解决方案,这种处理可以在 "边缘 "完成,如Beats、Elastic Agent本身,也可以集中处理,通过使用Ingest Pipelines,由集群处理。
因此,使用摄取管道是一个架构决策,将数据处理从 "边缘 "移到 "中心",但每一个架构决策都有取舍和考虑。
从上往下看,常见的取舍和平衡是,通过摄入管道集中数据处理意味着一个更方便和更简单的架构,因为你只需要在一个地方更新管道,并且可以为数据丰富化开辟更多的选择。在操作上,这也是一个比在边缘进行数据处理要简化得多的模式。
然而,我们通过给集群增加额外的负载来为这种集中式的便利付出代价。
这与在Beats或Elastic Agent中的 "边缘 "执行处理相反,因为将处理转移到边缘会将成本分散到每个代理,减少目标集群的负载,但反过来也使我们的代理 "更重"。不过,在一些生产环境中,或者在物联网或嵌入式系统等特定领域,这可能是不可接受的。
综上所述,如果不正确理解摄入管道过程的哪些部分成本高,我们就无法把握集中式与边缘式的取舍。
深入了解ingestion
现在我们对摄入过程的现状有了更好的了解,让我们深入了解摄取管道到底是什么。
输入管道就是这样,一个管道。
一个pipeline由一系列可配置的任务组成,称为处理器(processor)。每个处理器按顺序运行,对进入的文档进行特定的修改。在处理器运行后,Elasticsearch将文档索引到目标索引或data stream中。
有许多不同的处理器,每个处理器都有不同的功能和性能特点,使一些处理器比其他处理器有更高的开销。
这个例子显示了目前通过Elastic Agent集成发送的所有文档的'最终管道'。这意味着每一个传入的文件最终都要经过这个管道的处理,确保有一个'event.ingested'存在。
这个看似无害的 "集合 "处理器最终会占到总摄取管道时间的很大一部分。
代码语言:javascript复制…
{
"grok": {
"field": "message",
"ignore_missing": true,
"pattern_definitions": {
"GREEDYMULTILINE": "(.|n)*",
"TIMESTAMP": "(?:%{TIMESTAMP_ISO8601}|%{SYSLOGTIMESTAMP})"
},
"patterns": [
"%{TIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:host.hostname} %{DATA:process.name}(?:\[%{POSINT:process.pid:long}\])?: %{DATA:system.auth.ssh.event} %{DATA:system.auth.ssh.method} for (invalid user )?%{DATA:user.name} from %{IPORHOST:source.ip} port %{NUMBER:source.port:long} ssh2(: %{GREEDYDATA:system.auth.ssh.signature})?",
"%{TIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:host.hostname} %{DATA:process.name}(?:\[%{POSINT:process.pid:long}\])?: %{DATA:system.auth.ssh.event} user %{DATA:user.name} from %{IPORHOST:source.ip}",
"%{TIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:host.hostname} %{DATA:process.name}(?:\[%{POSINT:process.pid:long}\])?: Did not receive identification string from %{IPORHOST:system.auth.ssh.dropped_ip}",
"%{TIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:host.hostname} %{DATA:process.name}(?:\[%{POSINT:process.pid:long}\])?: \s*%{DATA:user.name} :( %{DATA:system.auth.sudo.error} ;)? TTY=%{DATA:system.auth.sudo.tty} ; PWD=%{DATA:system.auth.sudo.pwd} ; USER=%{DATA:system.auth.sudo.user} ; COMMAND=%{GREEDYDATA:system.auth.sudo.command}",
"%{TIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:host.hostname} %{DATA:process.name}(?:\[%{POSINT:process.pid:long}\])?: new group: name=%{DATA:group.name}, GID=%{NUMBER:group.id}",
"%{TIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:host.hostname} %{DATA:process.name}(?:\[%{POSINT:process.pid:long}\])?: new user: name=%{DATA:user.name}, UID=%{NUMBER:user.id}, GID=%{NUMBER:group.id}, home=%{DATA:system.auth.useradd.home}, shell=%{DATA:system.auth.useradd.shell}$",
"%{TIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:host.hostname}? %{DATA:process.name}(?:\[%{POSINT:process.pid:long}\])?: %{GREEDYMULTILINE:system.auth.message}"
]
}
}
https://www.elastic.co/guide/en/elasticsearch/reference/current/grok-processor.html
不同的处理器有不同的功能和性能特点。一些特定的处理器,更会放大对资源的消耗。
上面这个例子向我们展示了常用的 "grok "处理器。
grok处理器通过使用类似正则表达式的语法进行模式匹配,从文档中的单一文本字段中提取结构化字段。Grok有足够的感知功能,包括一些保障措施(称为看门狗-watchdog),以确保在默认情况下,执行一个grok表达式的时间不超过1s。
但是,当你每秒摄取几十万份文档时,1s是一个非常长的时间。如果能准确地知道每个管道的处理器所花费的时间,那就更方便了,特别是考虑到你的管道中也可以有多个相同类型的处理器。
输入管道的成本有多高?
Elasticsearch已经提供了一些指标,帮助我们更好地了解在摄取和索引过程中所花费的时间。Node Stats API报告了许多关于索引时间和摄入管道执行时间的指标:
代码语言:javascript复制nodes.*.indices.indexing.index_time_in_millis
执行索引操作的总时间,以毫秒计。
nodes.*.ingest.total.index_time_in_millis
花费在预处理摄入的文档上的总时间,以毫秒为单位。
例如,我们可以收集一个节点用于索引文档的总时间,以及用于摄取管道的总时间。这里没有画出更多可用的指标,使我们能够在集群、节点、管道和处理器层面上获得摄取管道花费的时间。
然而,这些数字都不是彼此的子集或超集,因此,如果不使用CPU剖析器来查看CPU在摄取管道中花费的时间的确切比例,目前不可能建立一个所谓的 "摄取 "过程中的总时间。
我们可以使用async-profiler来查看Elasticsearch正在忙于做什么(使用async-profiler项目,我们能够对CPU进行剖析,看看在特定时间段内最常执行的函数或方法,然后将输出转化为交互式火焰图)。在这个例子中,15%的CPU时间是花费在ingest/IngestService.doRun
中:
而经过一年的开发,摄取的索引时间从15%提高到26%! 是的,单从摄入的角度看,它花费了更多的开销
在该测试中,我们选择具有代表性的全观测/日志场景,用于基准CPU时间测试,在最近的测试中,结果为:
- 索引 35%
- 摄取管道 26%
- 合并 15%
- 文档解析 15%
- GC、Netty、TLS......其余的9%。
输入管道的优化与设计
如果我们要优化摄取管道26%的CPU时间,不了解哪些处理器的成本高,我们就无法把握这种取舍。因此,进行这项优化工作之前,我们需要建立一个持续的基线,能够持续评估我们的工作,帮助我们做出决定。而通过rally进行持续的,定期的夜间测试以观测随着时间的推移,性能的改善和退步情况。夜间基准是一种有效而简单的方式,可以直观地看到输入管道的性能指标。
得益于Lucene和Elasticsearch不断提高索引和合并的性能,即便在更复杂的摄入管道消耗更多的CPU的情况下,我们的吞吐仍然是持续增加的。
今年早些时候,我们开发了一个 "摄入管道 "遥测设备,允许我们在集群、节点、管道和处理器层面收集和测量摄入管道指标。
我们现在以图表形式显示每个基准中每个管道处理器花费的总时间。越低越好。
你可以很快看到,处理器性能的提高对我们每秒钟可以摄取的总数据产生了实质性的影响。而通过上图,你可以根据各个处理器的实际性能,在设计你自己的处理管道时,可在满足数据处理需求的前提下,根据基线数据选择性能最优的方案,拍脑袋选择一个自己认为的最优方案。Elastic基准测试地址
我们使用树状图将管道作为父类进行可视化,将每个处理器作为一个子类。比如下图,我们可以将摄取管道遥测设备收集的原始数据通过其他的可视化分析工具进行更进一步的分析。
这正是可以帮助我们在权衡利弊后做出决定的数据类型。