Elasticsearch-86-基于document锁实现悲观锁并发控制

document锁

上文中的全局锁,一次性就锁上了整个index,对这个index的所有增删改操作都会被block住,如果上锁不频繁,还可以,如果上锁释放锁的操作很频繁,显然不适用

document锁,顾名思义,每次就锁需要操作的几个doc,就可以了,被锁的doc,其他线程就不能对这些doc执行增删改操作了,但是只是对一部分doc上了锁,其他线程对于其他的doc还是可以操作的

具体实现.

document是用脚本进行上锁的 es的config/script目录下,写一个名为judge-lock的groovy脚本,内容是:

1
if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';

加锁:

1
2
3
4
5
6
7
8
9
10
11
POST /fs/lock/1/_update
{
"upsert": { "process_id": 123 },
"script": {
"lang": "groovy",
"file": "judge-lock",
"params": {
"process_id": 123
}
}
}

  • fs/lock: 就是说fs下的lock type,专门用于进行上锁
  • fs/lock/id: 比如上面的1,id就是要上锁的那个doc的id,代表了某个doc对应的lock, 其实也是一个doc
  • _update: 请求里面还有个upsert,执行的是upsert操作
  • script: 要执行的脚本
  • script.param: 传入的参数
  • script.param.process_id: 是要执行增删改操作的进程的唯一id,比如说在java系统中,启动的时候,可以给每个线程都用uuid生成一个唯一id,进程也可以分配一个uuid,process_id+thread_id就代表了某个进程下的某个线程

process_id很重要,会在lock中设置对对应的doc加锁的进程的id,这样其他进程过来的时候,才知道这条数据已经被别人锁了

请求进来的话,走groovy脚本,参数就是process_id,看一下这个脚本,意思就是process_id和当前数据中的process_id不相同的话就assert false,抛出异常,如果是相同的,就ctx.op = ‘noop’,不做任何修改

如果一个document之前没有被锁,拿上面的请求举例,比如说/fs/lock/1之前不存在,也就是id是1的doc没有被别人上锁, 因为用的是upsert,那么执行index操作,创建一个/fs/lock/1,用params中的数据作为这个lock的数据,process_id被设置为123,脚本不执行,完成之后,就标识id是1的这个doc被process_id=123的进程锁上了

然后现在又有一个process_id是234的进程过来尝试加锁,请求如下

1
2
3
4
5
6
7
8
9
10
11
POST /fs/lock/1/_update
{
"upsert": { "process_id": 234 },
"script": {
"lang": "groovy",
"file": "judge-lock",
"params": {
"process_id": 234
}
}
}

这时候,这个doc已经被上一个进程锁上了,/fs/lock/1 这条数据也已经存在了, 那么这个upsert就是执行script脚本了,比对process_id,发现两个id并不相同,也就是说这个doc已经被别的进程锁上了,就直接报错了,需要一直重试,直到上锁成功

那如果说,process_id=123的这个线程,又要做一些其他的操作, 也还是过来先上锁,在执行脚本的时候,发现两个process_id是相同的,此时就会返回ctx.op= ‘noop’,什么都不做,所以这个请求是不会被 block的

最后做完全部的操作后,就要释放这个进程上的所有锁

案例

还是之前的文件系统,两个线程要同时去修改id是1的doc的文件名

第一个线程过来,尝试上锁..

1
2
3
4
5
6
7
8
9
10
11
POST /fs/lock/1/_update
{
"upsert": {"process_id": 123},
"script": {
"lang": "groovy",
"file": "judge-lock",
"params": {
"process_id": 123
}
}
}

返回值:

1
2
3
4
5
6
7
8
9
10
11
12
{
"_index": "fs",
"_type": "lock",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
}
}

process_id是123的进程上锁成功,看一下

1
GET /fs/lock/1

返回值:

1
2
3
4
5
6
7
8
9
10
{
"_index": "fs",
"_type": "lock",
"_id": "1",
"_version": 1,
"found": true,
"_source": {
"process_id": 123
}
}

这个时候第二个线程过来尝试加锁

1
2
3
4
5
6
7
8
9
10
11
POST /fs/lock/1/_update
{
"upsert": {"process_id": 234},
"script": {
"lang": "groovy",
"file": "judge-lock",
"params": {
"process_id": 234
}
}
}

返回值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
"error": {
"root_cause": [
{
"type": "remote_transport_exception",
"reason": "[f57uV91][127.0.0.1:9300][indices:data/write/update[s]]"
}
],
"type": "illegal_argument_exception",
"reason": "failed to execute script",
"caused_by": {
"type": "script_exception",
"reason": "error evaluating judge-lock",
"caused_by": {
"type": "power_assertion_error",
"reason": "assert false\n"
},
"script_stack": [],
"script": "",
"lang": "groovy"
}
},
"status": 400
}

直接就报错了,是加不了锁的

如果说还是第一个线程来获取锁的话会怎样呢

1
2
3
4
5
6
7
8
9
10
11
POST /fs/lock/1/_update
{
"upsert": {"process_id": 123},
"script": {
"lang": "groovy",
"file": "judge-lock",
"params": {
"process_id": 123
}
}
}

返回值:

1
2
3
4
5
6
7
8
9
10
11
12
{
"_index": "fs",
"_type": "lock",
"_id": "1",
"_version": 1,
"result": "noop",
"_shards": {
"total": 0,
"successful": 0,
"failed": 0
}
}

返回值就是noop,什么都不做,这时候,这个进程就可以对id是1的这个doc进行各种操作,比如修改文件名

1
2
3
4
5
6
POST /fs/file/1/_update
{
"doc": {
"name": "README1.txt"
}
}

返回值:

1
2
3
4
5
6
7
8
9
10
11
12
{
"_index": "fs",
"_type": "file",
"_id": "1",
"_version": 6,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
}
}

更新成功了,最后一步释放掉锁

1
POST /fs/_refresh

手动refresh一下,数据写入os cache并被打开供搜索的过程,叫做refresh

查询这个线程id,持有的所有锁,用scroll查询,或者普通的搜索都可以

1
2
3
4
5
6
7
8
9
10
GET /fs/lock/_search?scroll=1m
{
"query": {
"term": {
"process_id": {
"value": "123"
}
}
}
}

返回值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
"_scroll_id": "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAAAyjFmY1N3VWOTF4U19HUlRRUzJIbzgxcmcAAAAAAAAMpBZmNTd1VjkxeFNfR1JUUVMySG84MXJnAAAAAAAADKUWZjU3dVY5MXhTX0dSVFFTMkhvODFyZwAAAAAAAAymFmY1N3VWOTF4U19HUlRRUzJIbzgxcmcAAAAAAAAMohZmNTd1VjkxeFNfR1JUUVMySG84MXJn",
"took": 14,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 1,
"max_score": 1,
"hits": [
{
"_index": "fs",
"_type": "lock",
"_id": "1",
"_score": 1,
"_source": {
"process_id": 123
}
}
]
}
}

搜索到所有的要释放的锁之后, 删除掉,这里用一个_bulk批量操作

1
2
PUT /fs/lock/_bulk
{"delete":{"_id" : 1 }}

返回值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
"took": 26,
"errors": false,
"items": [
{
"delete": {
"found": true,
"_index": "fs",
"_type": "lock",
"_id": "1",
"_version": 2,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 200
}
}
]
}

删除成功,锁释放掉了,这时候,process_id=234的线程尝试上锁

1
2
3
4
5
6
7
8
9
10
11
POST /fs/lock/1/_update
{
"upsert": {"process_id": 234},
"script": {
"lang": "groovy",
"file": "judge-lock",
"params": {
"process_id": 234
}
}
}

返回值:

1
2
3
4
5
6
7
8
9
10
11
12
{
"_index": "fs",
"_type": "lock",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
}
}

上锁成功,和前面的一样,做完各种操作释放掉就ok了