Ingest Pipelines
Elasticsearch 的 Ingest Pipelines 功能允许在数据进入索引之前进行预处理。它提供了一种在索引过程中对数据进行转换、增强、过滤等操作的机制,适用于处理结构化和非结构化数据。Ingest Pipelines 非常适合在数据写入 Elasticsearch 之前对其进行清理、格式化和增强,而不需要在客户端代码中实现这些处理逻辑。
核心概念
1. Pipeline:管道定义了一系列处理器(processors),这些处理器会按照顺序依次对文档执行操作。每个处理器可以对文档进行修改、添加字段、删除字段等操作。
2. Processor:处理器是管道中的核心单元,每个处理器都有特定的功能。例如,它可以对数据进行转换(如字符串到数字)、解析日期、提取字段等。处理器是 Ingest Pipelines 的执行逻辑的最小单位。
3. Execution:当你将文档发送到 Elasticsearch 时,如果指定了一个 Pipeline,这些文档会在处理器中被依次处理,然后写入到目标索引中。
典型使用场景
• 数据清理:从原始数据中删除不需要的字段或格式化数据,使其符合标准化格式。
• 字段增强:从现有字段中提取额外信息并生成新的字段。例如,可以从日志记录中提取地理位置或 IP 地址信息。
• 格式转换:将字段从一种格式转换为另一种格式,例如从字符串转换为日期或数值。
• 数据处理和修改:在数据写入索引之前进行修改,例如替换字段中的字符、应用脚本处理逻辑等。
步骤:
1. 创建一个 Ingest Pipeline
首先,定义一个 Pipeline,并在其中使用 script 处理数据。
代码语言:json复制PUT /_ingest/pipeline/my_pipeline
{
"description": "Process data before indexing",
"processors": [
{
"script": {
"source": """
// 假设我们处理的是字段 'user', 并想将名字大写
if (ctx.user != null) {
ctx.user.name = ctx.user.name.toUpperCase();
}
"""
}
}
]
}
在这个示例中,我们创建了一个 Pipeline,该 Pipeline 会检查文档中的 user 字段,并将 user.name 转换为大写后再写入索引。
2. 在索引数据时指定 Pipeline
在向索引写入数据时,使用刚刚创建的 Pipeline:
代码语言:json复制POST /my_index/_doc/1?pipeline=my_pipeline
{
"user": {
"name": "John",
"age": 30
}
}
通过这个操作,Elasticsearch 会在将数据写入 my_index 索引之前,先通过 my_pipeline 进行处理,最终 user.name 会被转换为 "JOHN"。
3. 验证结果
查询文档:
代码语言:json复制GET /my_index/_doc/1
返回结果为:
代码语言:json复制{
"_source": {
"user": {
"name": "JOHN", // 名字被转换为大写
"age": 30
}
}
}