概要
Elasticsearch 在文档更新时默认使用的是乐观锁方案, 而 Elasticsearch 利用文档的一些 create 限制条件, 也能达到悲观锁的效果, 我们一起来看一看.
乐观锁与悲观锁
乐观锁
ES 默认实现乐观锁, 所有的数据更新默认使用乐观锁机制. document 更新时, 必须要带上 currenct version, 更新时与 document 的 version 进行比较, 如果相同进行更新操作, 不相同表示已经被别的线程更新过了, 此时更新失败, 并且重新获取新的 version 再尝试更新.
悲观锁
我们举一个这样的例子: Elasticsearch 存储文件系统的目录, 文件名信息, 有多个线程需要对 / home/workspace/ReadMe.txt 进行追加修改, 而且是并发执行的, 有先后顺序之分, 跟之前的库存更新案例有点不一样, 此时单纯使用乐观锁, 可能会出现乱序的问题.
这种场景就需要使用悲观锁控制, 保证线程的执行顺序, 有一个线程在修改, 其他的线程只能挂起等待. 悲观锁通过 / index/lock / 实现, 只有一个线程能做修改操作, 其他线程 block 掉.
悲观锁有三种, 分别对应三种粒度, 由粗到细可为分:
全局锁: 最粗的锁, 直接锁整个索引
document 锁: 指定 id 加锁, 只锁一条数据, 类似于数据库的行锁
共享锁和排他锁: 也叫读写锁, 针对一条数据分读和写两种操作, 一般共享锁允许多个线程对同一条数据进行加锁, 排他锁只允许一个线程对数据加锁, 并且排他锁和共享锁互斥.
锁的基本操作步骤
我们使用锁的基本步骤都是一样的, 无论是关系型数据库, Redis/Memcache/Zookeeper 分布式锁, 还是今天介绍的 Elasticsearch 实现的锁机制, 都有如下三步:
上锁
执行事务方法
解锁
全局锁
假定有两个线程, 线程 1 和线程 2
线程 1 上锁命令:
- PUT /files/file/global/_create
- {
- }
files 表示索引名称.
file 为 type,6.3.1 一个索引只允许有一个 type, 选用 file 作用 type 名称.
global: 即 document 的 id, 固定写为 global 表示全局锁, 或者使用专门的索引进行加锁操作.
_create: 强制必须是创建, 如果已经存在, 那么创建失败, 报错.
线程 1 执行事务方法: 更新文件名
- POST /files/file/global/_update
- {
- "doc": {
- "name":"ReadMe.txt"
- }
- }
线程 2 尝试加锁, 失败, 此时程序进行重试阶段, 直到线程 1 释放锁
- # 请求:
- PUT /files/file/global/_create
- {}
- # 响应:
- {
- "error": {
- "root_cause": [
- {
- "type": "version_conflict_engine_exception",
- "reason": "[file][global]: version conflict, document already exists (current version [1])",
- "index_uuid": "_6E1d7BLQmy9-7gJptVp7A",
- "shard": "2",
- "index": "files"
- }
- ],
- "type": "version_conflict_engine_exception",
- "reason": "[file][global]: version conflict, document already exists (current version [1])",
- "index_uuid": "_6E1d7BLQmy9-7gJptVp7A",
- "shard": "2",
- "index": "files"
- },
- "status": 409
- }
线程 1 释放锁
DELETE files/file/global
线程 2 加锁
- PUT /files/file/global/_create
- {
- }
响应
- {
- "_index": "files",
- "_type": "file",
- "_id": "global",
- "_version": 3,
- "result": "created",
- "_shards": {
- "total": 2,
- "successful": 1,
- "failed": 0
- },
- "_seq_no": 2,
- "_primary_term": 1
- }
加锁成功, 然后执行事务方法.
优缺点
全局锁本质上是所有线程都用_create 语法来创建 id 为 global 的文档, 利用 Elasticsearch 对_create 语法的校验来实现锁的目的.
优点: 操作简单, 容易使用, 成本低.
缺点: 直接锁住整个索引, 除了加锁的那个线程, 其他所有对此索引的线程都 block 住了, 并发量较低.
适用场景: 读多写少的数据, 并且加解锁的时间非常短, 类似于数据库的表锁.
注意事项: 加锁解锁的控制必须严格在程序里定义, 因为单纯基于 doc 的锁控制, 如果 id 固定使用 global, 在有锁的情况, 任何线程执行 delete 操作都是可以成功的, 因为大家都知道 id.
document level 级别的锁
document level 级别的锁是更细粒度的锁, 以文档为单位进行锁控制.
我们新建一个索引专门用于加锁操作:
- PUT /files-lock/_mapping/lock
- {
- "properties": {
- }
- }
我们先创建一个 script 脚本, ES6.0 以后默认使用 painless 脚本:
- POST _scripts/document-lock
- {
- "script": {
- "lang": "painless",
- "source": "if ( ctx._source.process_id != params.process_id ) { Debug.explain('already locked by other thread'); } ctx.op ='noop';"
- }
- }
Debug.explain 表示抛出一个异常, 内容为 already locked by other thread.
ctx.op = 'noop'表示不执行更新.
线程 1 增加行锁, 此时传入的 process_id 为 181ab3ee-28cc-4339-ba35-69802e06fe42
- POST /files-lock/lock/1/_update
- {
- "upsert": { "process_id": "181ab3ee-28cc-4339-ba35-69802e06fe42" },
- "script": {
- "id": "document-lock",
- "params": {
- "process_id": "181ab3ee-28cc-4339-ba35-69802e06fe42"
- }
- }
- }
响应结果:
- {
- "_index": "files-lock",
- "_type": "lock",
- "_id": "1",
- "_version": 1,
- "result": "created",
- "_shards": {
- "total": 2,
- "successful": 1,
- "failed": 0
- },
- "_seq_no": 0,
- "_primary_term": 1
- }
线程 1, 线程 2 查询锁信息
- {
- "_index": "files-lock",
- "_type": "lock",
- "_id": "1",
- "_version": 1,
- "found": true,
- "_source": {
- "process_id": "181ab3ee-28cc-4339-ba35-69802e06fe42"
- }
- }
线程 2 传入的 process_id 为 181ab3ee-28cc-4339-ba35-69802e06fe42, 尝试加锁, 失败, 此时应该启动重试机制
- POST /files-lock/lock/1/_update
- {
- "upsert": { "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5" },
- "script": {
- "id": "document-lock",
- "params": {
- "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5"
- }
- }
- }
提示该文档已经被别的线程 (线程 1) 锁住了, 你不能更新了, 响应报文如下:
- {
- "error": {
- "root_cause": [
- {
- "type": "remote_transport_exception",
- "reason": "[node-1][192.168.17.137:9300][indices:data/write/update[s]]"
- }
- ],
- "type": "illegal_argument_exception",
- "reason": "failed to execute script",
- "caused_by": {
- "type": "script_exception",
- "reason": "runtime error",
- "painless_class": "java.lang.String",
- "to_string": "already locked by other thread",
- "java_class": "java.lang.String",
- "script_stack": [
- "Debug.explain('already locked by other thread'); }",
- "^---- HERE"
- ],
- "script": "judge-lock",
- "lang": "painless",
- "caused_by": {
- "type": "painless_explain_error",
- "reason": null
- }
- }
- },
- "status": 400
- }
线程 1 执行事务方法
- POST /files/file/1/_update
- {
- "doc": {
- "name":"README1.txt"
- }
- }
线程 1 的事务方法执行完成, 并通过删除 id 为 1 的文档, 相当于释放锁
DELETE /files-lock/lock/1
线程 2 在线程 1 执行事务的期间, 一直在模拟挂起, 重试的操作, 直到线程 1 完成释放锁, 然后线程 2 加锁成功
- POST /files-lock/lock/1/_update
- {
- "upsert": { "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5" },
- "script": {
- "id": "document-lock",
- "params": {
- "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5"
- }
- }
- }
结果:
- {
- "_index": "files-lock",
- "_type": "lock",
- "_id": "1",
- "_version": 3,
- "found": true,
- "_source": {
- "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5"
- }
- }
此时锁的 process_id 变成线程 2 传入的 "a6d13529-86c0-4422-b95a-aa0a453625d5"
- {
- "_index": "files-lock",
- "_type": "lock",
- "_id": "1",
- "_version": 3,
- "found": true,
- "_source": {
- "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5"
- }
- }
这样基于 ES 的行锁操作控制过程就完成了.
脚本解释
update+upsert 操作, 如果该记录没加锁(此时 document 为空), 执行 upsert 操作, 设置 process_id, 如果已加锁, 执行 script
script 内的逻辑是: 判断传入参数与当前 doc 的 process_id, 如果不相等, 说明有别的线程尝试对有锁的 doc 进行加锁操作, Debug.explain 表示抛出一个异常.
process_id 可以由 Java 应用系统里生成, 如 UUID.
如果两个 process_id 相同, 说明当前执行的线程与加锁的线程是同一个, ctx.op = 'noop'表示什么都不做, 返回成功的响应, Java 客户端拿到成功响应的报文, 就可以继续下一步的操作, 一般这里的下一步就是执行事务方法.
点评
文档级别的锁颗粒度小, 并发性高, 吞吐量大, 类似于数据库的行锁.
共享锁与排他锁
概念
共享锁: 允许多个线程获取同一条数据的共享锁进行读操作
排他锁: 同一条数据只能有一个线程获取排他锁, 然后进行增删改操作
互斥性: 共享锁与排他锁是互斥的, 如果这条数据有共享锁存在, 那么排他锁无法加上, 必须得共享锁释放完了, 排他锁才能加上.
反之也成立, 如果这条数据当前被排他锁锁信, 那么其他的排他锁不能加, 共享锁也加不上. 必须等这个排他锁释放完了, 其他锁才加得上.
有人在改数据, 就不允许别人来改, 也不让别人来读.
读写锁的分离
如果只是读数据, 每个线程都可以加一把共享锁, 此时该数据的共享锁数量一直递增, 如果这时有写数据的请求(写请求是排他锁), 由于互斥性, 必须等共享锁全部释放完, 写锁才加得上.
有人在读数据, 就不允许别人来改.
案例实验
我们先创建一个共享锁的脚本:
- # 读操作加锁脚本
- POST _scripts/rw-lock
- {
- "script": {
- "lang": "painless",
- "source": "if (ctx._source.lock_type =='exclusive') { Debug.explain('one thread is writing data, the lock is exclusive now'); } ctx._source.lock_count++"
- }
- }
- # 读操作完毕释放锁脚本
- POST _scripts/rw-unlock
- {
- "script": {
- "lang": "painless",
- "source": "if ( --ctx._source.lock_count == 0) { ctx.op ='delete'}"
- }
- }
每次有一个线程读数据时, 执行一次加锁操作
- POST /files-lock/lock/1/_update
- {
- "upsert": {
- "lock_type": "shared",
- "lock_count": 1
- },
- "script": {
- "id": "rw-lock"
- }
- }
在多个页面上尝试, 可以看到 lock_count 在逐一递增, 模拟多个线程同时读一个文档的操作.
在有线程读文档, 还未释放的情况下, 尝试对该文档加一个排他锁
- PUT /files-lock/lock/1/_create
- {
- "lock_type": "exclusive"
- }
结果肯定会报错:
- {
- "error": {
- "root_cause": [
- {
- "type": "version_conflict_engine_exception",
- "reason": "[lock][1]: version conflict, document already exists (current version [8])",
- "index_uuid": "XD7LFToWSKe_6f1EvLNoFw",
- "shard": "3",
- "index": "files-lock"
- }
- ],
- "type": "version_conflict_engine_exception",
- "reason": "[lock][1]: version conflict, document already exists (current version [8])",
- "index_uuid": "XD7LFToWSKe_6f1EvLNoFw",
- "shard": "3",
- "index": "files-lock"
- },
- "status": 409
- }
线程读数据完成后, 对共享锁进行释放, 执行释放锁的脚本
- POST /files-lock/lock/1/_update
- {
- "script": {
- "id": "rw-unlock"
- }
- }
释放 1 次 lock_count 减 1, 减到 0 时, 说明所有的共享锁已经释放完毕, 就把这个 doc 删除掉
所有共享锁释放完毕, 尝试加排他锁
- PUT /files-lock/lock/1/_create
- {
- "lock_type": "exclusive"
- }
此时能够加锁成功, 响应报文:
- {
- "_index": "files-lock",
- "_type": "lock",
- "_id": "1",
- "_version": 1,
- "found": true,
- "_source": {
- "lock_type": "exclusive"
- }
- }
有排他锁的情况, 尝试加一个共享锁, 失败信息如下:
- {
- "error": {
- "root_cause": [
- {
- "type": "remote_transport_exception",
- "reason": "[node-1][192.168.17.137:9300][indices:data/write/update[s]]"
- }
- ],
- "type": "illegal_argument_exception",
- "reason": "failed to execute script",
- "caused_by": {
- "type": "script_exception",
- "reason": "runtime error",
- "painless_class": "java.lang.String",
- "to_string": "one thread is writing data, the lock is exclusive now",
- "java_class": "java.lang.String",
- "script_stack": [
- "Debug.explain('one thread is writing data, the lock is exclusive now'); }",
- "^---- HERE"
- ],
- "script": "rw-lock",
- "lang": "painless",
- "caused_by": {
- "type": "painless_explain_error",
- "reason": null
- }
- }
- },
- "status": 400
- }
排他锁事务执行完成时, 删除文档即可对锁进行释放
DELETE /files-lock/lock/1
脚本解释
读锁的加锁脚本和释放锁脚本, 成对出现, 用来统计线程的数量.
写锁利用_create 语法来实现, 如果有线程对某一文档有读取操作, 那么对这个文档执行_create 操作肯定报错.
小结
利用 Elasticsearch 一些语法的特性, 加上 painless 脚本的配合, 也能完整的复现全局锁, 行锁, 读写锁的特性, 实现的思路还是挺有意思的, 跟使用 Redis,zookeeper 实现分布式锁有异曲同工之处, 只是生产案例上用 Redis 实现分布式锁是比较成功的实践, Elasticsearch 的对这种分布式锁的实现方式可能不是最佳实践, 但也可以了解一下.
专注 Java 高并发, 分布式架构, 更多技术干货分享与心得, 请关注公众号: Java 架构社区
可以扫左边二维码添加好友, 邀请你加入 Java 架构社区微信群共同探讨技术
来源: https://www.cnblogs.com/huangying2124/p/12806508.html