文章目录
- 1.根据 ID 修改
- 2.根据 ID 修改(不存在则插入)
- 3.根据条件更新
- 4.批量更新
- 参考文献
本文借助第三方库 olivere/elastic 完成 Go 对 ES 的更新操作。
Go 对 Elasticsearch 的增删改查参见完整博文 Go Elasticsearch 增删改查(CRUD)快速入门。
1.根据 ID 修改
可以根据文档 ID 更新对应的文档。
代码语言:javascript复制// Update 修改文档
// param: index 索引; id 文档ID; m 待更新的字段键值结构
func Update(ctx context.Context, index, id string, doc interface{}) error {
_, err := GetESClient().Update().Index(index).Id(id).Doc(doc).Refresh("true").Do(ctx)
return err
}
注意:修改不存在的文档将报**elastic: Error 404 (Not Found)
**错误。
比如修改文档 ID 为 1 的用户名改为 “jack”。
代码语言:javascript复制err := Update(context.Background(), index, "1", map[string]interface{}{"username": "jack"})
对应的 RESTful api 为:
代码语言:javascript复制POST es_index_userinfo/_update/1?refresh=true
{
"doc": {
"username": "jack"
}
}
2.根据 ID 修改(不存在则插入)
如果文档不存在,作为新文档插入,则可以使用 upsert。
代码语言:javascript复制// Upsert 修改文档(不存在则插入)
// params index 索引; id 文档ID; m 待更新的字段键值结构
func Upsert(ctx context.Context, index, id string, doc interface{}) error {
_, err := GetESClient().Update().Index(index).Id(id).Doc(doc).Upsert(doc).Refresh("true").Do(ctx)
return err
}
比如修改文档 ID 为 9 的部分信息,如果文档不存在则插入。
代码语言:javascript复制ctx := context.Background()
m := map[string]interface{}{
"id": 9,
"username": "jerry",
"age": 11,
}
err := Upsert(ctx, "es_index_userinfo", "9", m)
对应的 RESTful api 为:
代码语言:javascript复制POST es_index_userinfo/_update/9?refresh=true
{
"doc":{
"id": 9,
"username": "jerry",
"age": 11
},
"doc_as_upsert": true
}
3.根据条件更新
我们也可以根据条件来更新符合条件的文档,即 Update by Query。
代码语言:javascript复制// UpdateByQuery 根据条件修改文档
// param: index 索引; query 条件; script 脚本指定待更新的字段与值
func UpdateByQuery(ctx context.Context, index string, query elastic.Query, script *elastic.Script) (int64, error) {
rsp, err := GetESClient().UpdateByQuery(index).Query(query).Script(script).Refresh("true").Do(ctx)
if err != nil {
return 0, err
}
return rsp.Updated, nil
}
注意:Refresh 只能指定 true 或 false(缺省值),不能指定 wait_for。
比如我将更新用户名为 alice,年龄小于等于 18 岁的用户昵称和祖籍。
代码语言:javascript复制query := elastic.NewBoolQuery()
query.Filter(elastic.NewTermQuery("username", "alice"))
query.Filter(elastic.NewRangeQuery("age").Lte(18))
script := elastic.NewScriptInline("ctx._source.nickname=params.nickname;ctx._source.ancestral=params.ancestral").Params(
map[string]interface{}{
"nickname": "cat",
"ancestral": "安徽",
})
ret, err := UpdateByQuery2ES(context.Background(), index, query, script)
对应的 RESTful api 为:
代码语言:javascript复制POST /es_index_userinfo/_update_by_query?refresh=true
{
"query":{
"bool":{
"filter":[
{"term":{"username":"alice"}},
{"range" : {"age" : {"lte" : 18}}}
]
}
},
"script": {
"source": "ctx._source['nickname'] = 'cat';ctx._source['ancestral'] ='安徽'"
}
}
4.批量更新
同样地,借助 BulkService BulkUpdateRequest 可实现对文档的批量修改。
代码语言:javascript复制// UpdateBulk 批量修改文档
func UpdateBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
bulkService := GetESClient().Bulk().Index(index).Refresh("true")
for i := range ids {
doc := elastic.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i])
bulkService.Add(doc)
}
res, err := bulkService.Do(ctx)
if err != nil {
return err
}
if len(res.Failed()) > 0 {
return errors.New(res.Failed()[0].Error.Reason)
}
return nil
}
// UpsertBulk 批量修改文档(不存在则插入)
func UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
bulkService := GetESClient().Bulk().Index(index).Refresh("true")
for i := range ids {
doc := elastic.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i])
bulkService.Add(doc)
}
res, err := bulkService.Do(ctx)
if err != nil {
return err
}
if len(res.Failed()) > 0 {
return errors.New(res.Failed()[0].Error.Reason)
}
return nil
}
下面是调用示例:
代码语言:javascript复制func main() {
ctx := context.Background()
// UpdateBulk 全部成功
// id 9 和 10 均存在
ids := []string{"9", "10"}
docs := []interface{}{
map[string]interface{}{
"username": "tom",
},
map[string]interface{}{
"username": "alice",
},
}
err := UpdateBulk(ctx, "es_index_userinfo", ids, docs)
fmt.Printf("UpdateBulk all success err is %vn", err)
// UpdateBulk 部分成功
// id 10 存在,id 11 文档不存在
ids = []string{"10", "11"}
docs = []interface{}{
map[string]interface{}{
"username": "angela",
},
map[string]interface{}{
"username": "bill",
},
}
err = UpdateBulk(ctx, "es_index_userinfo", ids, docs)
fmt.Printf("UpdateBulk partial success err is %vn", err)
// UpsertBulk 理论上不会部分成功
ids = []string{"10", "11"}
docs = []interface{}{
map[string]interface{}{
"id": 10,
"username": "tony",
},
map[string]interface{}{
"id": 11,
"username": "pony",
},
}
err = UpsertBulk(ctx, "es_index_userinfo", ids, docs)
fmt.Printf("UpsertBulk all success err is %vn", err)
}
分别输出:
代码语言:javascript复制UpdateBulk all success err is <nil>
UpdateBulk partial success err is [_doc][11]: document missing
UpsertBulk all success err is <nil>
对应的 RESTful api 为:
代码语言:javascript复制// UpdateBulk 全部成功
POST /es_index_userinfo/_bulk
{"update":{"_id":"9"}}
{"doc":{"username":"tom"}}
{"update":{"_id":"10"}}
{"doc":{"username": "alice"}}
// UpdateBulk 部分成功
POST /es_index_userinfo/_bulk
{"update":{"_id":"10"}}
{"doc":{"username":"angela"}}
{"update":{"_id":"11"}}
{"doc":{"username": "bill"}}
// Upsert 理论上不会部分成功
POST /es_index_userinfo/_bulk
{"update":{"_id":"10"}}
{"doc":{"id":10, "username":"tony"}, "doc_as_upsert" : true}
{"update":{"_id":"11"}}
{"doc":{"id":11, "username": "pony"}, "doc_as_upsert" : true}
参考文献
elastic - pkg.dev
elastic - type UpdateService
elastic - type UpdateByQueryService
elastic - type BulkService
elastic - type BulkUpdateRequest
Elasticsearch Guide [7.15] » REST APIs » Document APIs » Update API
Elasticsearch Guide [7.13] » REST APIs » Document APIs » Update By Query API
Elasticsearch Guide [7.15] » REST APIs » Document APIs » Bulk API