使用Python脚本实现ElasticSearch的在线数据迁移

2023-07-31 14:28:02 浏览数 (3)

该脚本的功能,类似于 elasticsearch-dump ,二者都是基于scroll来实现的(包括reindex底层也是scroll)。

依赖包

代码语言:javascript复制
# 我这里演示的ES是7.x的,如果下面的脚本运行报错,请考虑调整这里的python的elasticsearch包版本
pip install elasticsearch==7.13.1

配置文件

vim configs.py

代码语言:javascript复制
# -*- coding: utf-8 -*-

# es数据源的信息
es_source_host = ['127.0.0.1:9200']  # 支持多个节点间用逗号分隔
es_source_index = "index-test1"

# es目标库的信息
es_dest_host = ['127.0.0.1:9200']
es_dest_index = "index-test2"

# 每次取的条数
batch_size = 2000

# 每轮休眠的时间(单位秒)
sleep_time = 0

主程序

vim run.py

代码语言:javascript复制
# -*- coding: utf-8 -*-
import json
import time
import configs

from elasticsearch import Elasticsearch

src_es = Elasticsearch(hosts = configs.es_source_host,maxsize=16)
dest_es = Elasticsearch(hosts = configs.es_dest_host,maxsize=16)

start_ts = time.time()

scroll_time = '5m'  # 指定 Scroll 上下文的存活时间

src_index_name = configs.es_source_index
dest_index_name = configs.es_dest_index

def create_dest_index():
    try:
        dest_es.indices.create(
            index=configs.es_dest_index,
            body={"settings": {"index": {"number_of_shards": 4}}},
        )

    except Exception as e:
        print(str(e))

def update_dest_index_setting(time_dur,replicas):
    try:
        res = dest_es.indices.put_settings(
            index=configs.es_dest_index,
            body={"index.refresh_interval": time_dur, "number_of_replicas": replicas},
        )
        print(res)
    except Exception as e:
        print(str(e))


def update_dest_index_mapping():
    dest_mapping = src_es.indices.get_mapping(index=configs.es_source_index)[configs.es_source_index]["mappings"]
    try:
        res = dest_es.indices.put_mapping(body=dest_mapping, index=configs.es_dest_index)
        print(res)
    except Exception as e:
        print(str(e))



def migrate():
    query = {
        "query": {
            "match_all": {}  # 查询所有文档
        }
    }

    # 计数下,用于最后确认scroll的次数
    count = 0

    # 初始化 Scroll 上下文
    response = src_es.search(index=src_index_name, scroll=scroll_time, body=query,size=configs.batch_size)
    scroll_id = response['_scroll_id']
    hits = response['hits']['hits']

    # 处理第一批结果,拼装bulk需要的数据结构
    data_list1=[]
    for hit in hits:
        data1={}
        _id, _source = hit["_id"], hit["_source"]
        data1["index"]= {"index": {"_index": dest_index_name , "_id": _id }}
        data_list1.append(data1["index"])
        data_list1.append(_source)

    # 把第一次找出的数据,拼装好的结果写入目标ES
    dest_res = dest_es.bulk(index=dest_index_name, body=data_list1)
    if dest_res["errors"]:
        for item in response["items"]:
            if "error" in item["index"]:
                print(f"Failed operation: {item['index']}")
    else:
        print("Bulk operations completed successfully!")

    count  = 1

    # 滚动获取剩余结果
    while True:
        if len(hits) < 0:
            break

        response = src_es.scroll(scroll_id=scroll_id, scroll=scroll_time)
        scroll_id = response['_scroll_id']
        # print("scroll_id ---> ", scroll_id )
        hits = response['hits']['hits']

        # 拼装bulk需要的数据结构
        data_list2=[]
        for hit in hits:
            data2={}
            _id, _source = hit["_id"], hit["_source"]
            data2["index"]= {"index": {"_index": dest_index_name , "_id": _id }}
            data_list2.append(data2["index"])
            data_list2.append(_source)

        if len(data_list2) <=0:
            break
        dest_res = dest_es.bulk(index=dest_index_name, body=data_list2)
        if dest_res["errors"]:
            for item in response["items"]:
                if "error" in item["index"]:
                    print(f"Failed operation: {item['index']}")
        else:
            print("Bulk operations completed successfully!")

        time.sleep(configs.sleep_time)

        count  = 1


    stop_ts = time.time()
    print('scroll 遍历的次数: ', count, '耗时(秒):', int(stop_ts - start_ts))



if __name__ == '__main__':
    create_dest_index()  # 创建目标索引
    update_dest_index_setting("60s",0)  # 临时降低持久性,提升写入性能
    update_dest_index_mapping()  # 复制mapping
    migrate() # 数据同步
    update_dest_index_setting("1s",1)  # 提升持久性,确保数据安全性

执行

代码语言:javascript复制
python run.py

效率

代码语言:javascript复制
测试下来,速度还是很给力的。
测试数据集:
	docs: 639566
	primary size: 179.78MB

耗时:
elasticsearch-dump迁移耗时7分钟。
python脚本迁移耗时 4分钟(可能是因为我脚本里面的迁移前先调大refresh的功劳?)。

0 人点赞