Elasticsearch-12-批量增删改查操作

批量查询

批量查询的优点: 如果用 GET /index_name/type_name/id 这样的查询去查询100条数据的话,就要发送100次网络请求,开销比较大,如果是批量查询的话,查询100条数据只需要用1次网络请求,网络请求的性能开销会很少.

_mget批量查询

比如我们要查询test_index/test_type下面 id是1和2的两条数据
执行代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
GET /_mget
{
"docs":[
{
"_index":"test_index", // 索引名
"_type":"test_type", // 类型名
"_id":1 // id
},
{
"_index":"test_index",
"_type":"test_type",
"_id":2
}
]
}

返回值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
"docs": [
{
"_index": "test_index",
"_type": "test_type",
"_id": "1",
"_version": 1,
"found": true,
"_source": {
"test_content": "test content"
}
},
{
"_index": "test_index",
"_type": "test_type",
"_id": "2",
"_version": 2,
"found": true,
"_source": {
"num": 1,
"tags": []
}
}
]
}

GET请求后面直接跟_mget端点, 将参数封装在一个docs数组里面,数组中的一个元素就是一个请求条件, 执行后返回值也封装在了一个docs数组中,一个元素对应一个返回结果.

如果说查询的是同一个index下的不同type的数据,直接将请求跟在url上就可以了,请求体中值需要指明type 和 id即可

1
2
3
4
5
6
7
8
9
10
11
12
13
GET /test_index/_mget
{
"docs":[
{
"_type":"test_type", // 类型名称
"_id":1 // id
},
{
"_type":"test_type",
"_id":2
}
]
}

如果查询的是同一个index下的同一个type下的内容,那么index和type都写在url上,请求体中只需要一个ids的数组即可

1
2
3
4
GET /test_index/test_type/_mget
{
"ids":[1,2]
}

可以看出来ids不需要包在docs中, 但是返回值还是会包在docs数组中.

总结:一般来说,在进行查询的时候,如果一次要查询多条数据的话,一定要用batch批量操作的api,尽可能减少网络请求的开销,可以大大提升性能.

_bulk批量增删改

_bulk 可以操作批量增删改,语法如下:

1
2
{"action":{"metadata"}}
{"data"}

bulk api对json的语法有严格的要求,每个json不能换行,只能放一行,同时一个json和另一个json之间必须换行,每个操作需要两个json字符串,删除操作只需要一个.

举例,比如现在要创建一个document,放到bulk里面是这样的

1
2
{"index":{"_index":"test_index","_type":"test_type","_id":1}}
{"test_field":"test1","test_field2":"test2"}

那么有哪些类型的操作可以执行呢?

  1. delete:,删除一个document
  2. create:相当于 PUT /index/type/id/_create,强制创建
  3. index:普通的PUT操作,可以是创建,也可以是全量替换
  4. update:相当于 partial update操作

示例:

1
2
3
4
5
6
7
8
POST /_bulk
{"delete":{"_index":"test_index","_type":"test_type","_id":3}}
{"create":{"_index":"test_index","_type":"test_type","_id":2}}
{"test_field":"already exists"}
{"index":{"_index":"test_index","_type":"test_type","_id":1}}
{"test_field":"replace all"}
{"update":{"_index":"test_index","_type":"test_type","_id":4,"_retry_on_conflict":3}}
{"doc":{"test_content":"update id is4"}}

上面这个请求中:

  1. 删除了id为4document
  2. 强制创建一个document,id为2 (id为2的已经存在了)
  3. 将id为1的document全量替换
  4. partial update更新id为4的document,更新失败的时候重试3次

执行返回结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
{
"took": 79,
"errors": true,
"items": [
{
"delete": {
"found": true,
"_index": "test_index",
"_type": "test_type",
"_id": "3",
"_version": 2,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 200
}
},
{
"create": {
"_index": "test_index",
"_type": "test_type",
"_id": "2",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[test_type][2]: version conflict, document already exists (current version [2])",
"index_uuid": "qFba3qxtTO6YxlO3m7qtug",
"shard": "2",
"index": "test_index"
}
}
},
{
"index": {
"_index": "test_index",
"_type": "test_type",
"_id": "1",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false,
"status": 200
}
},
{
"update": {
"_index": "test_index",
"_type": "test_type",
"_id": "4",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 200
}
}
]
}

可以看到,第2个操作失败了,原因是已经存在了,但是其他操作仍然执行成功了,所以bulk操作中,任意一个操作失败,是不会影响其他的操作的,但是在返回结果里,会告诉你异常日志

bulk size最佳大小

bulk request会加载到内存里面,如果太大的话,性能反而会下降,因此需要反复尝试一个最佳的bulk size. 一般从1000~5000 条数据开始,或者数据大小在5~15MB之间,尝试逐渐增加

奇特的json格式详解

上面有说过bulk的请求体 每个json不能换行,只能放一行,同时一个json和另一个jso之间必须换行,那为什么要这样设计呢?

第一点,bulk中的操作都可能要转发到不同的node的shard去执行.
第二,如果允许任意换行,es拿到json后就得进行如下处理:

  1. 将json数组解析为JSONArray对象,这个时候,整个数据就会在内存中出现两份,一份是json文本,一份是JSONArray对象
  2. 解析json数组里的每个json,对每个请求中的document进行路由
  3. 为路由到一个shard上的多个请求,创建一个请求数组
  4. 将这个请求数组进行序列化
  5. 将序列化后的请求数组发送到对应的节点上去

所以按照这种方式的话,会耗费更多的内存,更多的jvm gc开销
上文中有说到过bulk size的最佳大小,如果说现在有100个bulk请求,每个请求10MB,那就是 1000MB的数据,每个请求的json都copy一份为JSONArray对象,此时内存中的占用就会翻倍,会占用到2000MB+的内存,占用更多的内存就可能积压其他请求的内存使用量,就可能导致其他请求的性能急速下降. 另外,占用更多的内存,gc回收次数更多,每次要回收的垃圾对象更多,耗费的时间更多,会导致es的java虚拟机停止工作线程的时间更多

使用这种不换行的格式的话

  1. 不需要转为json对象,不会在内存中copy数据,直接按照换行符切割即可
  2. 对每两个一组的json,读取metadata,进行document路由.
  3. 直接将对应的json发送到node上去

最大的优势在于,不需要将json数组解析为JSONArray对象,不需要浪费内存空间,尽可能保证性能