Elasticsearch探索: Reindex API

2021-02-01 13:03:40 浏览数 (1)

简介

5.X版本后新增Reindex。Reindex可以直接在Elasticsearch集群里面对数据进行重建,如果你的mapping因为修改而需要重建,又或者索引设置修改需要重建的时候,借助Reindex可以很方便的异步进行重建,并且支持跨集群间的数据迁移。

官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.3/docs-reindex.html

在我们开发的过程中,我们有很多时候需要用到 Reindex 接口。它可以帮我们把数据从一个 index 到另外一个 index 进行重新reindex。这个对于特别适用于我们在修改我们数据的 mapping 后,需要重新把数据从现有的 index 转到新的 index 建立新的索引,这是因为我们不能修改现有的 index 的 mapping

为了能够使用 reindex 接口,我们必须满足一下的条件:

  • _source 选项对所有的源 index 文档是启动的,也即源 index 的 source 是被存储的
  • reindex不会尝试设置目标索引。它不会复制源索引的设置信息。您应该在运行_reindex操作之前设置目标索引,包括设置映射,分片数,副本等。
代码语言:javascript复制
POST _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
}

Reindex

  • Reindex 是一个时间点的副本
  • 就像上面返回的结果显示的那样,它是以 batch(批量)的方式来执行的。默认的批量大小为1000
  • 你也可以只拷贝源 index 其中的一部分数据

         -  通过加入 query 到 source 中

         -  通过定义 max_docs 参数

代码语言:javascript复制
POST _reindex
{
  "max_docs": 100,
  "source": {
    "index": "twitter2",
    "query": {
      "match": {
        "city": "北京"
      }
    }
  },
  "dest": {
    "index": "twitter3"
  }
}

这里,我们定义最多不超过100个文档,同时,我们只拷贝来自“北京”的 twitter 记录。

设置 op_type to create 将导致 _reindex 仅在目标索引中创建缺少的文档。 所有现有文档都会导致版本冲突,比如:

代码语言:javascript复制
POST _reindex
{
  "source": {
    "index": "twitter2"
  },
  "dest": {
    "index": "twitter3",
    "op_type": "create"
  }
}

结果:
{
    "took":2,
    "timed_out":false,
    "total":1,
    "updated":0,
    "created":0,
    "deleted":0,
    "batches":1,
    "version_conflicts":1,
    "noops":0,
    "retries":{
        "bulk":0,
        "search":0
    },
    "throttled_millis":0,
    "requests_per_second":-1,
    "throttled_until_millis":0,
    "failures":[
        {
            "index":"twitter3",
            "type":"_doc",
            "id":"1",
            "cause":{
                "type":"version_conflict_engine_exception",
                "reason":"[1]: version conflict, document already exists (current version [5])",
                "index_uuid":"ffz2LNIIQqqDx211R5f4fQ",
                "shard":"0",
                "index":"twitter3"
            },
            "status":409
        }
    ]
}

它表明我们之前的文档 id 为1的有版本上的冲突。

默认情况下,版本冲突会中止 _reindex 进程。 “conflict” 请求 body 参数可用于指示 _reindex 继续处理版本冲突的下一个文档。 请务必注意,其他错误类型的处理不受 “conflict” 参数的影响。 当 “conflict”:在请求正文中设置 “proceed” 时, _reindex 进程将继续发生版本冲突并返回遇到的版本冲突计数:

代码语言:javascript复制
POST _reindex
{
  "conflicts": "proceed",
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "op_type": "create"
  }
}

将version_type设置为internal,则Elasticsearch强制性的将文档转储到目标索引中,覆盖具有相同类型和ID的任何内容:

代码语言:javascript复制
POST _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "version_type": "internal"
  }
}

将version_type设置为external,则将导致Elasticsearch从源文件中保留版本,创建缺失的所有文档,并更新在目标索引中比源索引中版本更老的所有文档:

代码语言:javascript复制
POST _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "version_type": "external"
  }
}

source中的index和type都可以是一个列表,允许您在一个请求中从大量的来源进行复制。下面将从twitter和blog索引中的tweet和post类型中复制文档。它也包含twitter索引中post类型以及blog索引中的tweet类型。

代码语言:javascript复制
POST _reindex
{
  "source": {
    "index": ["twitter", "blog"],
    "type": ["tweet", "post"]
  },
  "dest": {
    "index": "all_together"
  }
} 

还可以通过设置大小限制处理的文档的数量。下面只会将单个文档从twitter复制到new_twitter:

代码语言:javascript复制
POST _reindex
{
  "size": 1,
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
} 

如果你想要从twitter索引获得一个特定的文档集合你需要排序。排序使滚动效率更低,但在某些情况下它是值得的。如果可能,更喜欢更多的选择性查询size和sort。这将从twitter复制10000个文档到new_twitter:

代码语言:javascript复制
POST _reindex
{
  "size": 10000,
  "source": {
    "index": "twitter",
    "sort": { "date": "desc" }
  },
  "dest": {
    "index": "new_twitter"
  }
} 

source部分支持搜索请求中支持的所有元素。例如,只使用原始文档的一部分字段,使用源过滤如下所示:

代码语言:javascript复制
POST _reindex
{
  "source": {
    "index": "twitter",
    "_source": ["user", "tweet"]
  },
  "dest": {
    "index": "new_twitter"
  }
} 

默认情况下,_reindex批量滚动处理大小为1000.您可以在source元素中指定size字段来更改批量处理大小:

代码语言:javascript复制
POST _reindex
{
  "source": {
    "index": "source",
    "size": 100  #batch size 这里在机器资源允许的条件下,搞大点
  },
  "dest": {
    "index": "dest",
    "routing": "=cat"
  }
} 

reindex是耗费性能的。借助:scroll bulk实现。

优化建议:重索引下建议这个size设置大点

Reindex也可以使用[Ingest Node]功能来指定pipeline, 就像这样:

代码语言:javascript复制
POST _reindex
{
  "source": {
    "index": "source"
  },
  "dest": {
    "index": "dest",
    "pipeline": "some_ingest_pipeline"
  }
} 

限流Reindex

重新索引大量文档可能会使你的群集泛滥甚至崩溃。requests_per_second 限制索引操作速率。

代码语言:javascript复制
POST _reindex?requests_per_second=500 
{
  "source": {
    "index": "blogs",
    "size": 500
  },
  "dest": {
    "index": "blogs_fixed"
  }
}

远程Reindex

Reindex支持从远程Elasticsearch群集重建索引:

代码语言:javascript复制
POST _reindex
{
  "source": {
    "remote": {
      "host": "http://remote_cluster_node1:9200",
      "username": "user",
      "password": "pass"
    },
    "index": "source",
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
} 

host参数必须包含scheme,host和port(例如 https://remote_cluster_node1:9200)。用户名和密码参数是可选的,当它们存在时,索引将使用基本认证连接到远程Elasticsearch节点。使用基本认证时请务必使用https,密码将以纯文本格式发送。

必须在elasticsearch.yaml中使用reindex.remote.whitelist属性将远程主机明确列入白名单。它可以设置为允许的远程host和port组合的逗号分隔列表(例如otherhost:9200,another:9200,127.0.10.*:9200,localhost:*)。白名单忽略了scheme ——仅使用主机和端口。

来自远程服务器的重新索引使用默认为最大大小为100mb的堆栈缓冲区。如果远程索引包含非常大的文档,则需要使用较小的批量大小。下面的示例设置非常非常小的批量大小10。

代码语言:javascript复制
POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200"
    },
    "index": "source",
    "size": 10,
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
} 

也可以使用socket_timeout字段在远程连接上设置socket的读取超时,并使用connect_timeout字段设置连接超时。两者默认为三十秒。此示例将套接字读取超时设置为一分钟,并将连接超时设置为十秒:

代码语言:javascript复制
POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200",
      "socket_timeout": "1m",
      "connect_timeout": "10s"
    },
    "index": "source",
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
} 

修改Field

reindex支持修改文档的脚本,脚本允许修改文档的元数据。此示例修改了源文档的版本:

代码语言:javascript复制
POST _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "version_type": "external"
  },
  "script": {
    "inline": "if (ctx._source.foo == 'bar') {ctx._version  ; ctx._source.remove('foo')}",
    "lang": "painless"
  }
} 
代码语言:javascript复制
POST test/test/1?refresh
{
  "text": "words words",
  "flag": "foo"
} 

但是你不喜欢这个flag名称,而是要用tag替换它。 _reindex可以为您创建其他索引:

POST _reindex
{
  "source": {
    "index": "test"
  },
  "dest": {
    "index": "test2"
  },
  "script": {
    "inline": "ctx._source.tag = ctx._source.remove("flag")"
  }
} 

现在你可以得到新的文件:

GET test2/test/1 

{
  "found": true,
  "_id": "1",
  "_index": "test2",
  "_type": "test",
  "_version": 1,
  "_source": {
    "text": "words words",
    "tag": "foo"
  }
} 

可以设置ctx.op来更改在目标索引上执行的操作:

  • noop 如果您的脚本决定不必进行任何更改,请设置 ctx.op ="noop" 。这将导致_update_by_query 从其更新中忽略该文档。这个没有操作将被报告在响应体的 noop 计数器上。
  • delete 如果您的脚本决定必须删除该文档,请设置ctx.op="delete"。删除将在响应体的 deleted 计数器中报告。 将ctx.op设置为其他任何内容都是错误。在ctx中设置任何其他字段是一个错误。

修改Value

新索引需要加一个scope字段,并在reindex过程中给一个默认值。

Pipeline API实现

代码语言:javascript复制
PUT _ingest/pipeline/defaultvalue
{
  "description": "set default scope value",
  "processors": [
    {
      "set": {
        "field": "scope",
        "value": "suifang",
        "override": false
      }
    }
  ]
}
POST _reindex
{
  "source": {
    "index": "sphinx-zhuanti-20.06.29-144128"
  },
  "dest": {
    "index": "sphinx-zhuanti-21.01.29-190212",
    "pipeline": "defaultvalue"
  }
}

Script脚本实现

代码语言:javascript复制
POST _reindex
{
  "source": {
    "index": "sphinx-zhuanti-20.06.29-144128"
  },
  "dest": {
    "index": "sphinx-zhuanti-21.01.29-190921"
  },
  "script": {
    "source": "ctx._source.scope = 'suifang';"
  }
}

路由Routing

默认情况下,如果_reindex看到具有路由的文档,则路由将被保留,除非脚本被更改。您可以根据dest请求设置routing来更改:

  • keep:将批量请求的每个匹配项的路由设置为匹配上的路由。默认值。
  • discard:将批量请求的每个匹配项的路由设置为null。
  • =<某些文本>:将批量请求的每个匹配项的路由设置为`=`之后的文本。

例如,您可以使用以下请求将source索引的所有公司名称为cat的文档复制到路由设置为cat的dest索引。

代码语言:javascript复制
POST _reindex
{
  "source": {
    "index": "source",
    "query": {
      "match": {
        "company": "cat"
      }
    }
  },
  "dest": {
    "index": "dest",
    "routing": "=cat"
  }
} 

手动切片

重建索引支持滚动切片,您可以相对轻松地手动并行化处理:

代码语言:javascript复制
POST _reindex
{
  "source": {
    "index": "twitter",
    "slice": {
      "id": 0,
      "max": 2
    }
  },
  "dest": {
    "index": "new_twitter"
  }
}
POST _reindex
{
  "source": {
    "index": "twitter",
    "slice": {
      "id": 1,
      "max": 2
    }
  },
  "dest": {
    "index": "new_twitter"
  }
} 

您可以通过以下方式验证:

GET _refresh
POST new_twitter/_search?size=0&filter_path=hits.total 

其结果一个合理的total像这样:

{
  "hits": {
    "total": 120
  }
} 

自动切片

你还可以让重建索引使用切片的_uid来自动并行的滚动切片。

代码语言:javascript复制
POST _reindex?slices=5&refresh
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
} 

您可以通过以下方式验证:

POST new_twitter/_search?size=0&filter_path=hits.total 

其结果一个合理的total像这样:

{
  "hits": {
    "total": 120
  }
} 

在这一点上,我们围绕要使用的slices数量提供了一些建议(比如手动并行化时,切片API中的max参数):

  • 不要使用大的数字,500就能造成相当大的CPU抖动。
  • 从查询性能的角度来看,在源索引中使用分片数量的一些倍数更为有效。
  • 在源索引中使用完全相同的分片是从查询性能的角度来看效率最高的。
  • 索引性能应在可用资源之间以slices数量线性扩展。
  • 索引或查询性能是否支配该流程取决于许多因素,如正在重建索引的文档和进行reindexing的集群。

Update by Query

虽然这个不在我们的 reindex 介绍范围,但是在有些情况下,我们可以可以通过 _update_by_query API 来让我们轻松地更新一个字段的值:

代码语言:javascript复制
POST blogs_fixed/_update_by_query
{
  "query": {
    "match": {
      "category.keyword": ""
    }
  },
  "script": {
    "source": """       ctx._source['category'] = "None"     """
  }
}

在上面,把 category.keyword 项为空的所有文档的 category 通过脚本设置为默认的 "None" 字符串。它和 reindex 的作用相似。

假设我们要向 twitter_new 索引的 mapping 添加一个多字段(multi-field)

  • 具体来说,假设我们要用新的方法分析 “content” 字段
代码语言:javascript复制
PUT new_new/_mapping
{
  "properties": {
    "content": {
      "type": "text",
      "fields": {
        "english": {
          "type": "text",
          "analyzer": "english"
        }
      }
    }
  }
}

在上面我们为 content 字段添加了一个新的 english 字段,并且它的 analyzer 为 english。

由于 mapping 已经发生改变,但是索引中已经有的文档没有这个新的字段 english,如果这个时候我们进行如下的搜索,将不会找到任何的结果:

代码语言:javascript复制
GET twitter_new/_search
{
  "query": {
    "match": {
      "content.english": "performance tips"
    }
  }
}

那么我们该如何使得索引中现有的文档都有 content.english 这个字段呢?运行 _update_by_query 以拥有现有文档选择新的 “content.english” 字段:

代码语言:javascript复制
POST twitter_new/_update_by_query

当我们完成上面的请求后,然后再执行如下的操作,将会在twitter_new 索引中搜索到想要的文档:

代码语言:javascript复制
GET twitter_new/_search
{
  "query": {
    "match": {
      "content.english": "performance tips"
    }
  }
}

Reindex性能优化

reindex的核心做跨索引、跨集群的数据迁移。 慢的原因及优化思路无非包括:

  • batch size值可能太小(默认是1000)
  • reindex的底层是scroll实现,借助scroll并行优化方式,提升效率
  • 跨索引、跨集群的核心是写入数据,考虑写入优化角度提升效率

提高批量写入大小值

默认情况下,_reindex使用1000进行批量操作,您可以在source中调整batch_size。

代码语言:javascript复制
POST _reindex
{
  "source": {
    "index": "source",
    "size": 5000
  },
  "dest": {
    "index": "dest",
    "routing": "=cat"
  }
}

ES副本数设置为0

如果要进行大量批量导入,请考虑通过设置index.number_of_replicas来禁用副本:0。 

主要原因在于:复制文档时,将整个文档发送到副本节点,并逐字重复索引过程。 这意味着每个副本都将执行分析,索引和潜在合并过程。 相反,如果您使用零副本进行索引,然后在提取完成时启用副本,则恢复过程本质上是逐字节的网络传输。 这比复制索引过程更有效。

代码语言:javascript复制
PUT /my_logs/_settings
{
    "number_of_replicas": 0
}

如:
PUT /regroupmembers-20.11.12-151612/_settings
{
    "number_of_replicas": 0
}

增加refresh间隔或干脆禁用掉

如果你的搜索结果不需要接近实时的准确性,考虑先不要急于索引刷新refresh。默认值是1s,在做reindex时可以将每个索引的refresh_interval到30s或禁用(-1)。 

如果正在进行大量数据导入,reindex就是此场景,先将此值设置为-1来禁用刷新。

代码语言:javascript复制
设置方法:
PUT /index_name/_settings
{ "refresh_interval": -1 }

还原方法:
PUT /index_name/_settings
{ "refresh_interval": "30s" }

Reindex实践优化

  • 索引数据量:71460992
  • 持续时间:55分钟
代码语言:javascript复制
1.设置Refresh:
PUT /regroupmembers-20.11.23-000000/_settings
{
  "refresh_interval": "30s"
}

2.设置Batch_size:
POST _reindex
{
  "source": {
    "index": "regroupmembers-20.05.28-142940",
    "size": 4000
  },
  "dest": {
    "index": "regroupmembers-20.11.23-000000"
  }
}

3.设置副本分片:0

0 人点赞