Optimistic Concurrency Control
乐观并发控制,即乐观锁。乐观锁是一种轻量级的实现并发控制的思想,乐观锁的实现一般依靠version
版本号。比如在mysql
中,我们在建表的时候常常额外新增一个version
字段,在更新某行数据时对version
字段值进行比对,以此来判断在此期间是否有他人更新该行数据。
UPDATE table_name
SET field = ${value},
version = version 1
WHERE id = ${id}
AND version = ${version}
在6.7版本之前,Elasticsearch也是借助于version
版本号来实现乐观并发控制的,但由于Elasticsearch的分布式特性,version
版本号并不能唯一标识主本分片和其副本分片中同一文档的版本。具体地,索引中的文档都是根据路由规则分散存储到主本分片中的,而每个主本分片又会有若干副本分片,每个副本分片只会对应一个主本分片;当针对一个文档进行index
、update
或delete
操作时,主本分片会将该文档异步复制到其所有副本分片中;正常情况下,主本分片与副本分片中该文本版本是一致的;但如果主本分片意外地挂掉了,主副本复制流程没有完全完成,且主本分片所在节点在某个时刻又重新返回集群(此时它已经不再是主本分片了,因为在挂掉时,Elasticsearch会选择其中一个副本分片,将其提升为主本分片),那么此时该文档在主副本分片中的版本就是不一致的了。
从6.7版本开始,Elasticsearch使用seq_num
和primary_term
参数来实现乐观并发控制。在每个索引被创建时,seq_num
初始值为0,当面向该索引针对文档进行index
、update
或delete
操作时,seq_num
会不断递增;primary_term
用于区分新主本分片和旧主本分片,其初始值为1,同样是不断递增的,当主本分片所在节点挂掉时,Elasticsearch会在其众多副本分片中选择一个然后将其提升为主本分片,而primary_term
递增的时机就在此刻,
如何获取seq_num
和primary_term
参数值
Get API
代码语言:javascript复制GET /products/_doc/1
-----------------------
{
"_index": "products",
"_type": "_doc",
"_id": "1",
"_version": 1,
"_seq_no": 0,
"_primary_term": 1,
"found": true,
"_source": {
"product": "r2d2",
"details": "A resourceful astromech droid"
}
}
Search API
代码语言:javascript复制GET /products/_search
{
"seq_no_primary_term": true,
"query": {
"match_all": {}
}
}
-----------------------
{
"took": 766,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "products",
"_type": "_doc",
"_id": "1",
"_seq_no": 1,
"_primary_term": 1,
"_score": 1.0,
"_source": {
"product": "r2d2",
"details": "A resourceful astromech droid",
"tags": [
"droid"
]
}
}
]
}
}
使用if_seq_no和if_primary_term请求参数进行更新
代码语言:javascript复制PUT /products/_doc/1?if_seq_no=0&if_primary_term=2
{
"product": "r2d2",
"details": "A resourceful astromech droid",
"tags": [
"droid"
]
}
-----------------------
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[1]: version conflict, required seqNo [0], primary term [2]. current document has seqNo [0] and primary term [1]",
"index_uuid": "jwzozHXXSYuw88SJ2SpxdQ",
"shard": "0",
"index": "products"
}
],
"type": "version_conflict_engine_exception",
"reason": "[1]: version conflict, required seqNo [0], primary term [2]. current document has seqNo [0] and primary term [1]",
"index_uuid": "jwzozHXXSYuw88SJ2SpxdQ",
"shard": "0",
"index": "products"
},
"status": 409
}
从上面响应来看,更新失败了,这是因为传入的if_primary_term
值与当前primary_term
值并不一致,只要将其调整为1就可以成功了。
PUT /products/_doc/1?if_seq_no=0&if_primary_term=2
{
"product": "r2d2",
"details": "A resourceful astromech droid",
"tags": [
"droid"
]
}
-----------------------
{
"_index": "products",
"_type": "_doc",
"_id": "1",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 1,
"_primary_term": 1
}
参考文档
- https://www.elastic.co/guide/en/elasticsearch/reference/current/optimistic-concurrency-control.html
- https://www.elastic.co/cn/blog/elasticsearch-sequence-ids-6-0