Elasticsearch跨集群数据迁移

2021-02-04 10:50:52 浏览数 (1)

简介

根据业务需求,存在以下场景:

  • 迁移过程中,旧的集群可以暂时停止服务或者暂停写入,数据全部迁移到新的集群中后,业务切换到新的集群进行读取和写入
  • 迁移过程中,旧集群不能停止写入,业务不能停服

如果是第一种场景,数据迁移过程中可以停止写入,可以采用诸如elasticsearch-dump、logstash、reindex、snapshot等方式进行数据迁移。实际上这几种工具大体上可以分为两类:

  • scroll query bulk: 批量读取旧集群的数据然后再批量写入新集群,elasticsearch-dump、logstash、reindex都是采用这种方式
  • snapshot: 直接把旧集群的底层的文件进行备份,在新的集群中恢复出来,相比较scroll query bulk的方式,snapshot的方式迁移速度最快。

如果是第二种场景,数据迁移过程中旧集群不能停止写入,需要根据实际的业务场景解决数据一致性的问题:

  • 如果业务不是直接写ES, 而是把数据写入到了中间件,比如业务->kafka->logstash->es的架构,此时可以直接采用双写的策略,旧集群不停止读写,新的集群也直接写入,然后迁移旧集群的数据到新集群中去,等数据追平之后,新的集群再提供读服务;
  • 如果业务是直接写ES, 并且会进行删除doc操作;此时可以使用ES官方在6.5版本之后的CCR(跨集群复制)功能,把旧集群作为Leader, 新集群作为Follower, 旧集群不停止读写,新集群从旧集群中follow新写入的数据;另一方面使用第三方工具把存量的旧集群中的数据迁移到新集群中,存量数据迁移完毕后,业务再切换到新的集群进行读写。

离线迁移需要先停止老集群的写操作,将数据迁移完毕后在新集群上进行读写操作。适合于业务可以停服的场景。

离线迁移大概有以下几种方式:

  • elasticsearch-dump
  • snapshot
  • reindex
  • logstash

停止旧集群写入

下面介绍一下在旧集群可以停止写入的情况下进行数据迁移的几种工具的用法。

elasticsearch-dump

适用场景

适合数据量不大,迁移索引个数不多的场景

使用方式

elasticsearch-dump是一款开源的ES数据迁移工具,github地址: https://github.com/taskrabbit/elasticsearch-dump

  • 安装elasticsearch-dump

elasticsearch-dump使用node.js开发,可使用npm包管理工具直接安装:

代码语言:javascript复制
npm install elasticdump -g

主要参数说明:
--input: 源地址,可为ES集群URL、文件或stdin,可指定索引,格式为:{protocol}://{host}:{port}/{index}
--input-index: 源ES集群中的索引
--output: 目标地址,可为ES集群地址URL、文件或stdout,可指定索引,格式为:{protocol}://{host}:{port}/{index}
--output-index: 目标ES集群的索引
--type: 迁移类型,默认为data,表明只迁移数据,可选settings, analyzer, data, mapping, alias
--limit:每次向目标ES集群写入数据的条数,不可设置的过大,以免bulk队列写满
  • 迁移单索引

以下操作通过elasticdump命令将集群172.16.0.39中的companydatabase索引迁移至集群172.16.0.20。注意第一条命令先将索引的settings先迁移,如果直接迁移mapping或者data将失去原有集群中索引的配置信息如分片数量和副本数量等,当然也可以直接在目标集群中将索引创建完毕后再同步mapping与data。

代码语言:javascript复制
elasticdump --input=http://172.16.0.39:9200/companydatabase --output=http://172.16.0.20:9200/companydatabase --type=settings
elasticdump --input=http://172.16.0.39:9200/companydatabase --output=http://172.16.0.20:9200/companydatabase --type=mapping
elasticdump --input=http://172.16.0.39:9200/companydatabase --output=http://172.16.0.20:9200/companydatabase --type=data
  • 迁移多索引

以下操作通过elasticdump命令将将集群172.16.0.39中的所有索引迁移至集群172.16.0.20。 注意此操作并不能迁移索引的配置如分片数量和副本数量,必须对每个索引单独进行配置的迁移,或者直接在目标集群中将索引创建完毕后再迁移数据。

代码语言:javascript复制
elasticdump --input=http://172.16.0.39:9200 --output=http://172.16.0.20:9200

logstash

logstash支持从一个ES集群中读取数据然后写入到另一个ES集群,因此可以使用logstash进行数据迁移,具体的配置文件如下:

代码语言:javascript复制
input {
    elasticsearch {
        hosts => ["http://x.x.x.1:9200"]
        index => "*"
        docinfo => true
	}
}
output {
    elasticsearch {
        hosts => ["http://x.x.x.2:9200"]
        index => "%{[@metadata][_index]}"
    }
}

上述配置文件将源ES集群的所有索引同步到目标集群中,当然可以设置只同步指定的索引,logstash的更多功能可查阅logstash官方文档 logstash 官方文档.

reindex

reindex是Elasticsearch提供的一个api接口,可以把数据从一个集群迁移到另外一个集群。

  • 配置reindex.remote.whitelist参数

需要在目标ES集群中配置该参数,指明能够reindex的远程集群的白名单

  • 调用reindex api

以下操作表示从源ES集群中查询名为test1的索引,查询条件为title字段为elasticsearch,将结果写入当前集群的test2索引

代码语言:javascript复制
POST _reindex
{
  	"source": {
    	"remote": {
      		"host": "http://x.x.x.1:9200"
    	},
    	"index": "test1",
    	"query": {
      		"match": {
        		"title": "elasticsearch"
      		}
    	}
  	},
  	"dest": {
   		"index": "test2"
  	}
}

snapshot

适用场景

适用数据量大的场景

使用方式

snapshot api是Elasticsearch用于对数据进行备份和恢复的一组api接口,可以通过snapshot api进行跨集群的数据迁移,原理就是从源ES集群创建数据快照,然后在目标ES集群中进行恢复。需要注意ES的版本问题:

代码语言:javascript复制
目标ES集群的主版本号(如5.6.4中的5为主版本号)要大于等于源ES集群的主版本号;
1.x版本的集群创建的快照不能在5.x版本中恢复;
  • 源ES集群中创建repository

创建快照前必须先创建repository仓库,一个repository仓库可以包含多份快照文件,repository主要有一下几种类型

代码语言:javascript复制
fs: 共享文件系统,将快照文件存放于文件系统中
url: 指定文件系统的URL路径,支持协议:http,https,ftp,file,jar
s3: AWS S3对象存储,快照存放于S3中,以插件形式支持
hdfs: 快照存放于hdfs中,以插件形式支持
cos: 快照存放于腾讯云COS对象存储中,以插件形式支持

如果需要从自建ES集群迁移至腾讯云的ES集群,可以直接使用fs类型仓库,注意需要在Elasticsearch配置文件elasticsearch.yml设置仓库路径:

代码语言:javascript复制
path.repo: ["/usr/local/services/test"]

之后调用snapshot api创建repository:

代码语言:javascript复制
curl -XPUT http://172.16.0.39:9200/_snapshot/my_backup -H 'Content-Type: application/json' -d '{
    	"type": "fs",
    	"settings": {
        "location": "/usr/local/services/test" 
        "compress": true
    }
}'

如果需要从其它云厂商的ES集群迁移至腾讯云ES集群,或者腾讯云内部的ES集群迁移,可以使用对应云厂商他提供的仓库类型,如AWS的S3, 阿里云的OSS,腾讯云的COS等

代码语言:javascript复制
curl -XPUT http://172.16.0.39:9200/_snapshot/my_s3_repository
{
 	"type": "s3",
  	"settings": {
    	"bucket": "my_bucket_name",
   		"region": "us-west"
  	}
}
  • 源ES集群中创建snapshot

调用snapshot api在创建好的仓库中创建快照

代码语言:javascript复制
curl -XPUT http://172.16.0.39:9200/_snapshot/my_backup/snapshot_1?wait_for_completion=true

创建快照可以指定索引,也可以指定快照中包含哪些内容,具体的api接口参数可以查阅官方文档

  • 目标ES集群中创建repository

目标ES集群中创建仓库和在源ES集群中创建仓库类似,用户可在腾讯云上创建COS对象bucket, 将仓库将在COS的某个bucket下。

  • 移动源ES集群snapshot至目标ES集群的仓库

把源ES集群创建好的snapshot上传至目标ES集群创建好的仓库中

  • 从快照恢复
代码语言:javascript复制
curl -XPUT http://172.16.0.20:9200/_snapshot/my_backup/snapshot_1/_restore
  • 查看快照恢复状态
代码语言:javascript复制
curl http://172.16.0.20:9200/_snapshot/_status

不停止旧集群写入

如果旧集群不能停止写入,此时进行在线数据迁移,需要保证新旧集群的数据一致性。目前看来,除了官方提供的CCR功能,没有成熟的可以严格保证数据一致性的在线数据迁移方法。此时可以从业务场景出发,根据业务写入数据的特点选择合适的数据迁移方案。

一般来说,业务写入数据的特点有以下几种:

  • add only, 只追加新数据,比如日志、APM场景中,数据基本都是时序数据,只会追加,没有更新、删除数据的操作
  • add & update, 数据有追加也有更新,但是没有删除数据的操作
  • add & update & delete, 数据有追加,也有更新和删除,搜索场景比较常见

add only

在日志或者APM的场景中,数据都是时序数据,一般索引也都是按天创建的,当天的数据只会写入当前的索引中。此时,可以先把存量的不再写入的索引数据一次性同步到新集群中,然后使用logstash或者其它工具增量同步当天的索引,待数据追平后,把业务对ES的访问切换到新集群中。

具体的实现方案为:

  1. 全量迁移冷索引 因为冷的索引不再写入,可以采用elasticdump、logstash、reindex进行迁移;如果数据量比较大的情况下,可以采用snapshot方式进行迁移。
  2. 增量迁移热索引

add only的数据写入方式,可以按照数据写入的顺序(根据_doc进行排序,如果有时间戳字段也可以根据时间戳排序)批量从旧集群中拉取数据,然后再批量写入新集群中;可以通过写程序,使用用scroll api 或者search_after参数批量拉取增量数据,再使用bulk api批量写入。

使用scroll拉取增量数据:

代码语言:javascript复制
POST {my_index}/_search?scroll=1m
{
	"size":"100",
	"query": {
	   "range": {
	      "timestamp": {
	        "gte": "now-1m",
	        "lt": "now/m"
	       }
	    }
	}
}
POST _search/scroll
{
  	"scroll": "1m",
  	"scroll_id": "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAHCbaFndPR3J4bDJtVDh1bnNRaW5yYWZBWncAAAAAABwm2RZ3T0dyeGwybVQ4dW5zUWlucmFmQVp3AAAAAAAcJtwWd09HcnhsMm1UOHVuc1FpbnJhZkFadwAAAAAAHCbbFndPR3J4bDJtVDh1bnNRaW5yYWZBWncAAAAAABwm3RZ3T0dyeGwybVQ4dW5zUWlucmFmQVp3"
}
	

上述操作可以每分钟执行一次,拉起前一分钟新产生的数据,所以数据在旧集群和新集群的同步延迟为一分钟。

使用search_after批量拉取增量数据:

代码语言:javascript复制
POST {my_index}/_search
{
   "size":100,
   "query": {
	    "match_all": {}
	},
	"search_after": [
	    1569556667000
	],
	"sort": "timestamp"
}

上述操作可以根据需要自定义事件间隔执行,每次执行时修改search_after参数的值,获取指定值之后的多条数据;search_after实际上相当于一个游标,每执行一次向前推进,从而获取到最新的数据。

使用scroll和search_after的区别是:

  • scroll相当于对数据做了一份快照,快照会保存在内存中,会比较消耗资源;search_after是无状态的,并不会过多的消耗内存资源。
  • scroll可以分批次执行,search_after获取到的结果只能一次拉取完,所以需要合理控制search_after参数的值以及size的大小,以免出现一次拉取过多的数据导致内存暴涨。
  • scroll执行过程中并不能获取到更新后的数据(对add only的场景并无影响),search_after每次拉取到的数据都是最新的。

另外,如果不想通过写程序迁移旧集群的增量数据到新集群的话,可以使用logstash结合scroll进行增量数据的迁移,可参考的配置文件如下:

代码语言:javascript复制
input {
      elasticsearch {
        hosts => "x.x.x.1:9200"
        index => "my_index"
        query => '{"query":{"range":{"timestamp":{"gte":"now-1m","lt":"now/m"}}}}'
        size => 100
        scroll => "1m"
        docinfo => true
        schedule => "*/1 * * * *" #定时任务,每分钟执行一次
      }
}
output {
      elasticsearch {
	     hosts => "x.x.x.2:9200"
        index => "%{[@metadata][_index]}"
        document_type => "%{[@metadata][_type]}"
        document_id => "%{[@metadata][_id]}"
      }
}

使用过程中可以根据实际业务的需求调整定时任务参数schedule以及scroll相关的参数。

add & update

业务场景如果是写入ES时既有追加,又有存量数据的更新,此时比较重要的是怎么解决update操作的数据同步问题。对于新增的数据,可以采用上述介绍的增量迁移热索引的方式同步到新集群中。对于更新的数据,此时如果索引有类似于updateTime的字段用于标记数据更新的时间,则可以通过写程序或者logstash,使用scroll api根据updateTime字段批量拉取更新的增量数据,然后再写入到新的集群中。

可参考的logstash配置文件如下:

代码语言:javascript复制
input {
      elasticsearch {
        hosts => "x.x.x.1:9200"
        index => "my_index"
        query => '{"query":{"range":{"updateTime":{"gte":"now-1m","lt":"now/m"}}}}'
        size => 100
        scroll => "1m"
        docinfo => true
        schedule => "*/1 * * * *" #定时任务,每分钟执行一次
      }
}
output {
      elasticsearch {
	     hosts => "x.x.x.2:9200"
        index => "%{[@metadata][_index]}"
        document_type => "%{[@metadata][_type]}"
        document_id => "%{[@metadata][_id]}"
      }
}

实际应用各种,同步新增(add)的数据和更新(update)的数据可以同时进行。但是如果索引中没有类似updateTime之类的字段可以标识出哪些数据是更新过的,目前看来并没有较好的同步方式,可以采用CCR来保证旧集群和新集群的数据一致性。

add & update & delete

如果业务写入ES时既有新增(add)数据,又有更新(update)和删除(delete)数据,可以采用6.5之后商业版X-pack插件中的CCR功能进行数据迁移。但是使用CCR有一些限制,必须要注意:

  • 旧集群和新集群的版本必须都在6.5及以上才能使用
  • Leader Index必须开启index.soft_deletes.enabled, 否则不能使用CCR, 该参数的意义是打开后Leader Index中的所有操作都会被暂存下来,Follower Index 可以通过pull这些操作然后进行重放,从而达到数据同步的目的;另外index.soft_deletes.enabled也只能在6.5之后的版本使用,并且只能在创建索引时开启,如果没有开启的话可以通过reindex到新的索引解决。

具体的使用方式如下:

  • 在新集群中配置旧集群的地址,注意必须为transport端口
代码语言:javascript复制
PUT /_cluster/settings
{
   "persistent" : {
	 "cluster" : {
	   "remote" : {
	     "leader" : {
	        "seeds" : [
	          "x.x.x.1:9300" 
	        ]
	      }
	    }
	  }
	}
}
  • 创建Leader Index
代码语言:javascript复制
PUT my_leader_indx
{
	"settings":{
		"index.soft_deletes.enabled": true
	}
}
  • 创建Follower Index
代码语言:javascript复制
PUT my_follower_index/_ccr/follow?wait_for_active_shards=1
{
	"remote_cluster" : "leader",
	"leader_index" : "my_leader_indx"
}
  • 查看Follower Index统计信息
代码语言:javascript复制
GET my_follower_index/_ccr/stats

使用中间件进行双写

如果业务是通过中间件如kafka把数据写入到ES, 则可以使用如下图中的方式,使用logstash消费kafka的数据到新集群中,在旧集群和新集群数据完全追平之后,可以切换到新集群进行业务的查询,之后再对旧的集群下线处理。

使用中间件进行同步双写的优点是:

  • 写入过程中丢失数据风险较低
  • 可以保证新旧集群的数据一致性

总结

  • elasticsearch-dump和logstash做跨集群数据迁移时,都要求用于执行迁移任务的机器可以同时访问到两个集群,不然网络无法连通的情况下就无法实现迁移。而使用snapshot的方式没有这个限制,因为snapshot方式是完全离线的。因此elasticsearch-dump和logstash迁移方式更适合于源ES集群和目标ES集群处于同一网络的情况下进行迁移,而需要跨云厂商的迁移,比如从阿里云ES集群迁移至腾讯云ES集群,可以选择使用snapshot的方式进行迁移,当然也可以通过打通网络实现集群互通,但是成本较高。
  • elasticsearchdump工具和mysql数据库用于做数据备份的工具mysqldump工具类似,都是逻辑备份,需要将数据一条一条导出后再执行导入,所以适合数据量小的场景下进行迁移;
  • snapshot的方式适合数据量大的场景下进行迁移。

0 人点赞