Elasticsearch 重建索引

2022-06-27 13:12:01 浏览数 (1)

1. 引言

上一篇文章中,我们介绍了如何为 ik 分词器配置我们自己的分词库。

Elasticsearch 常用分词器介绍与 ik 分词器自定义词库添加 但事实上,更加常见的场景是我们需要为一个已有大量数据的线上 ES 集群添加分词库。 这时,配置分词库只是第一步操作,因为大量的历史数据在索引时并没有使用新添加的分词库,将导致查询出现不可预期的效果。 此时,我们需要做的就是重建索引。

2. 重建索引的使用场景

至少在以下场景需要重建索引。

2.1. 生成索引方式变更

如上所述,因为新的词库的添加,导致历史数据需要按照新的索引生成方式来生成索引。 此时,重建索引就是唯一的选择了。

2.2. 切分数据

对于已有 ES 集群,数据量庞大到一定程度或因为其他业务上的原因,往往需要将已有数据按照一定的规则进行切分到多个不同的索引中。 这样的过程通过重建索引来实现是非常容易得。

2.3. mapping 字段类型、分词器等属性变更

ES 本身是不支持字段类型变更的,如果将一个 text 类型的字段错误的定义为 datetime 类型,那么将导致所有不匹配日期时间格式的文本无法插入。 此时也是不得不进行索引的删除重建工作的。

3. 重建索引的过程

对于线上业务来说,我们不能简单暴力地删除已有索引 -> 创建新索引 -> 导入数据的方式来重建索引,这样将严重影响到业务的使用。 正确的流程是:

  1. 创建新索引
  2. 批量从原索引中将数据导出到新索引中
  3. 数据导入完成后,通过 ES 别名机制进行索引切换
  4. 删除旧索引

这样就实现了索引的平滑重建。

4. 数据批量导出 — ES 的 scroll 操作

Elasticsearch 提供了 scroll 查询语句,用于数据的批量导出。 他在首次执行时创建会话并返回 _scroll_id 字段,此后通过 _scroll_id 都会直接使用上一次的会话上下文,从而实现加速查询的目的。 我们需要通过 scroll 参数设置每次返回数据量的大小:

POST /myindex/_search?scroll=1m { "size": 100, "query": { "match" : { "title" : "elasticsearch" } } }

上面的查询返回了所有 title 为 elasticsearch 的结果,假设数据量远远超过 1m。 此次查询会返回 _scroll_id,此后我们继续查询:

POST /myindex/_search/scroll { "scroll" : "1m", "scroll_id" : "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAD4WYm9laVYtZndUQlNsdDcwakFMNjU1QQ==" }

直到所有数据查询完成,我们可以执行下面命令关闭会话:

DELETE /_search/scroll { "scroll_id" : "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAD4WYm9laVYtZndUQlNsdDcwakFMNjU1QQ==" }

5. Elasticsearch 批量导入数据 — ES 的 bulk 操作

那么如何将批量查询出的数据批量导入新的索引呢? Elasticsearch 提供了 bulk API 允许我们使用单一请求实现批量创建、索引、更新或删除。

POST /_bulk { "delete": { "_index": "website", "_type": "blog", "_id": "123" }} { "create": { "_index": "website", "_type": "blog", "_id": "123" }} { "title": "My first blog post" } { "index": { "_index": "website", "_type": "blog" }} { "title": "My second blog post" } { "update": { "_index": "website", "_type": "blog", "_id": "123", "_retry_on_conflict" : 3} } { "doc" : {"title" : "My updated blog post"} }

每行行尾的换行符(n)都是必须的,包括最后一行。

6. 基于 python 的封装

python 的 elasticsearch 包将上述两个命令合成了一个方法来让用户方便的调用。

代码语言:javascript复制
from elasticsearch import helpers
from elasticsearch import Elasticsearch

es = Elasticsearch([{"host":"127.0.0.1","port":"9200"}], timeout=100000)
body={"query":{"match_all":{}}}

helpers.reindex(client=es, source_index='old_index', target_index='new_index',
        target_client=es, query=body, chunk_size=1000, scroll='15m')

7. 索引切换 — ES 的 alias 操作

Elasticsearch 提供了 alias 操作来为索引提供别名,通过别名机制我们可以实现快速切换索引等功能。 重建索引后,我们执行下面命令来切换索引:

POST /_aliases { "actions" : [ { "remove" : { "index" : "old_index", "alias" : "online_index" } }, { "add" : { "index" : "new_index", "alias" : "online_index" } } ] }

此后删除旧索引即可。

此处有图片 2

8. 参考资料

https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html。 https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html。 https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-aliases.html。 https://blog.csdn.net/deardreaming/article/details/52813581。

0 人点赞