Go Elasticsearch 更新快速入门

2021-12-06 10:49:57 浏览数 (1)

文章目录

  • 1.根据 ID 修改
  • 2.根据 ID 修改(不存在则插入)
  • 3.根据条件更新
  • 4.批量更新
  • 参考文献

本文借助第三方库 olivere/elastic 完成 Go 对 ES 的更新操作。

Go 对 Elasticsearch 的增删改查参见完整博文 Go Elasticsearch 增删改查(CRUD)快速入门。

1.根据 ID 修改

可以根据文档 ID 更新对应的文档。

代码语言:javascript复制
// Update 修改文档
// param: index 索引; id 文档ID; m 待更新的字段键值结构
func Update(ctx context.Context, index, id string, doc interface{}) error {
	_, err := GetESClient().Update().Index(index).Id(id).Doc(doc).Refresh("true").Do(ctx)
	return err
}

注意:修改不存在的文档将报**elastic: Error 404 (Not Found)**错误。

比如修改文档 ID 为 1 的用户名改为 “jack”。

代码语言:javascript复制
err := Update(context.Background(), index, "1", map[string]interface{}{"username": "jack"})

对应的 RESTful api 为:

代码语言:javascript复制
POST es_index_userinfo/_update/1?refresh=true
{
   "doc": {
    "username": "jack"
  }
}

2.根据 ID 修改(不存在则插入)

如果文档不存在,作为新文档插入,则可以使用 upsert。

代码语言:javascript复制
// Upsert 修改文档(不存在则插入)
// params index 索引; id 文档ID; m 待更新的字段键值结构
func Upsert(ctx context.Context, index, id string, doc interface{}) error {
	_, err := GetESClient().Update().Index(index).Id(id).Doc(doc).Upsert(doc).Refresh("true").Do(ctx)
	return err
}

比如修改文档 ID 为 9 的部分信息,如果文档不存在则插入。

代码语言:javascript复制
ctx := context.Background()
m := map[string]interface{}{
	"id":       9,
	"username": "jerry",
	"age":      11,
}
err := Upsert(ctx, "es_index_userinfo", "9", m)

对应的 RESTful api 为:

代码语言:javascript复制
POST es_index_userinfo/_update/9?refresh=true
{
  "doc":{
		"id":       9,
		"username": "jerry",
		"age":      11
	},
	"doc_as_upsert": true
}

3.根据条件更新

我们也可以根据条件来更新符合条件的文档,即 Update by Query。

代码语言:javascript复制
// UpdateByQuery 根据条件修改文档
// param: index 索引; query 条件; script 脚本指定待更新的字段与值
func UpdateByQuery(ctx context.Context, index string, query elastic.Query, script *elastic.Script) (int64, error) {
	rsp, err := GetESClient().UpdateByQuery(index).Query(query).Script(script).Refresh("true").Do(ctx)
	if err != nil {
		return 0, err
	}
	return rsp.Updated, nil
}

注意:Refresh 只能指定 true 或 false(缺省值),不能指定 wait_for。

比如我将更新用户名为 alice,年龄小于等于 18 岁的用户昵称和祖籍。

代码语言:javascript复制
query := elastic.NewBoolQuery()
query.Filter(elastic.NewTermQuery("username", "alice"))
query.Filter(elastic.NewRangeQuery("age").Lte(18))
script := elastic.NewScriptInline("ctx._source.nickname=params.nickname;ctx._source.ancestral=params.ancestral").Params(
	map[string]interface{}{
		"nickname":  "cat",
		"ancestral": "安徽",
	})
ret, err := UpdateByQuery2ES(context.Background(), index, query, script)

对应的 RESTful api 为:

代码语言:javascript复制
POST /es_index_userinfo/_update_by_query?refresh=true
{
  "query":{
     "bool":{
       "filter":[
         {"term":{"username":"alice"}},
         {"range" : {"age" : {"lte" : 18}}}
         ]
     }
  },
  "script": {
    "source": "ctx._source['nickname'] = 'cat';ctx._source['ancestral'] ='安徽'"
  }
}

4.批量更新

同样地,借助 BulkService BulkUpdateRequest 可实现对文档的批量修改。

代码语言:javascript复制
// UpdateBulk 批量修改文档
func UpdateBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
	bulkService := GetESClient().Bulk().Index(index).Refresh("true")
	for i := range ids {
		doc := elastic.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i])
		bulkService.Add(doc)
	}
	res, err := bulkService.Do(ctx)
	if err != nil {
		return err
	}
	if len(res.Failed()) > 0 {
		return errors.New(res.Failed()[0].Error.Reason)
	}
	return nil
}

// UpsertBulk 批量修改文档(不存在则插入)
func UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
	bulkService := GetESClient().Bulk().Index(index).Refresh("true")
	for i := range ids {
		doc := elastic.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i])
		bulkService.Add(doc)
	}
	res, err := bulkService.Do(ctx)
	if err != nil {
		return err
	}
	if len(res.Failed()) > 0 {
		return errors.New(res.Failed()[0].Error.Reason)
	}
	return nil
}

下面是调用示例:

代码语言:javascript复制
func main() {
	ctx := context.Background()
	// UpdateBulk 全部成功
	// id 9 和 10 均存在
	ids := []string{"9", "10"}
	docs := []interface{}{
		map[string]interface{}{
			"username": "tom",
		},
		map[string]interface{}{
			"username": "alice",
		},
	}
	err := UpdateBulk(ctx, "es_index_userinfo", ids, docs)
	fmt.Printf("UpdateBulk all success err is %vn", err)

	// UpdateBulk 部分成功
	// id 10 存在,id 11 文档不存在
	ids = []string{"10", "11"}
	docs = []interface{}{
		map[string]interface{}{
			"username": "angela",
		},
		map[string]interface{}{
			"username": "bill",
		},
	}
	err = UpdateBulk(ctx, "es_index_userinfo", ids, docs)
	fmt.Printf("UpdateBulk partial success err is %vn", err)
	
	// UpsertBulk 理论上不会部分成功
	ids = []string{"10", "11"}
	docs = []interface{}{
		map[string]interface{}{
			"id":       10,
			"username": "tony",
		},
		map[string]interface{}{
			"id":       11,
			"username": "pony",
		},
	}
	err = UpsertBulk(ctx, "es_index_userinfo", ids, docs)
	fmt.Printf("UpsertBulk all success err is %vn", err)
}

分别输出:

代码语言:javascript复制
UpdateBulk all success err is <nil>
UpdateBulk partial success err is [_doc][11]: document missing
UpsertBulk all success err is <nil>

对应的 RESTful api 为:

代码语言:javascript复制
// UpdateBulk 全部成功
POST /es_index_userinfo/_bulk
{"update":{"_id":"9"}}
{"doc":{"username":"tom"}}
{"update":{"_id":"10"}}
{"doc":{"username": "alice"}}

// UpdateBulk 部分成功
POST /es_index_userinfo/_bulk
{"update":{"_id":"10"}}
{"doc":{"username":"angela"}}
{"update":{"_id":"11"}}
{"doc":{"username": "bill"}}

// Upsert 理论上不会部分成功
POST /es_index_userinfo/_bulk
{"update":{"_id":"10"}}
{"doc":{"id":10, "username":"tony"}, "doc_as_upsert" : true}
{"update":{"_id":"11"}}
{"doc":{"id":11, "username": "pony"}, "doc_as_upsert" : true}

参考文献

elastic - pkg.dev

elastic - type UpdateService

elastic - type UpdateByQueryService

elastic - type BulkService

elastic - type BulkUpdateRequest

Elasticsearch Guide [7.15] » REST APIs » Document APIs » Update API

Elasticsearch Guide [7.13] » REST APIs » Document APIs » Update By Query API

Elasticsearch Guide [7.15] » REST APIs » Document APIs » Bulk API

0 人点赞