🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
# 通过查询 API 更新 _update_by_query 最简单的用法只是基于索引来执行每个文档的更新,而不更改源。这是非常有用快捷的,在完成一些  map 的变化也得以表现。这里是 API: ``` POST twitter/_update_by_query?conflicts=proceed ``` 返回值类似于这样: ``` { "took" : 147, "timed_out": false, "updated": 120, "deleted": 0, "batches": 1, "version_conflicts": 0, "noops": 0, "retries": { "bulk": 0, "search": 0 }, "throttled_millis": 0, "requests_per_second": -1.0, "throttled_until_millis": 0, "total": 120, "failures" : [ ] } ``` `_update_by_query `获取索引的快照,当它开始和进行索引时发现使用的是 `internal `的版本。这意味着如果在获取快照时或者进行索引请求时文档发生改变,那么将会发生版本冲突。当索引请求被处理的时间之间变化。当版本匹配文档被更新时候,版本号递增。 注意:因为 `internal `版本不支持的值 0 作为一个有效的版本号,与版本等于 0 时候,文档无法使用 _update_by_query 更新,请求将会失败。 所有的更新和查询失败导致 `_update_by_query `中止,并在返回错误的响应。正在执行的更新将会继续下去。换句话说,该方法不支持回滚,仅中止。虽然第一次失败将导致中止,由失败的请求返回所有故障将在 `failures `元素体现。因此,有可能存在一些失败的实体。 如果你想简单地统计版本冲突不会导致 `_update_by_query` 中止你可以在 URL 中设置 `conflicts=proceed `或在请求中设置 `"conflicts": "proceed"` 。第一个例子做到这一点因为它只是试图获取一个当前的 mapping 变化,这里发生的版本冲突仅仅意味着冲突的文档在 `_update_by_query` 开始的时间和文件尝试更新时间之间被更新了。这样做具有一定的优点,因为更新将获取当前 mapping 更新。 回到API格式,您可以限制 `_update_by_query `到一个单一类型。这将只从 `twitter `索引中更新 `tweet 文件`: ``` POST twitter/tweet/_update_by_query?conflicts=proceed ``` 您也可以限制 `_update_by_query `使用 。这将更新从 `twitter 索引中更新所有的文档`: ``` POST twitter/_update_by_query?conflicts=proceed { "query": { (1) "term": { "user": "kimchy" } } } ``` (1)查询必须作为一个值传递 `query `键,就像在 Search API 中同样的方式。还可以使用的 `q` 参数。、 到目前为止,我们只是一直在更新文档,而无需更改其来源。`_update_by_query `支持脚本对象更新文档。这将在所有 kimchy tweet 中增加 `likes` : ``` POST twitter/_update_by_query { "script": { "inline": "ctx._source.likes++", "lang": "painless" }, "query": { "term": { "user": "kimchy" } } } ``` 正如在 Update API 中可以设置 `ctx.op `来改变所执行的操作: noop                设置 ctx.op = "noop" 。如果你的脚本并没有做任何更改。这将导致 _update_by_query 从其更新处省略该文件。这将在响应的 noop 中被展示。 delete                设置ctx.op = "delete",如果你的脚本如此设定,该文件必须被删除。这将在响应的 deleted 中被展示。 设置 `ctx.op `到别的地方是错误的。设置任何其它领域中 `ctx 也`是错误的。 注意,当我们指定 `conflicts=proceed 时,`我们希望版本冲突中的一个中止进程,以至于我们可以处理失败。 此 API 不允许移动文件本身,只需修改其源。这是有意而为之,我们并没有获得从其原始位置删除该文件的权限。 通过多索引也可以完成所有的事情,就像搜索API: ``` POST twitter,blog/tweet,post/_update_by_query ``` 如果提供 `routing ,`则将被复制到滚动查询,限制到分片的进程来匹配 `routing 值`: ``` POST twitter/_update_by_query?routing=1 ``` 在默认情况下 `_update_by_query `使用 1000 批次的回滚。可以更改与批量大小的 `scroll_size URL`参数: ``` POST twitter/_update_by_query?scroll_size=100 ``` `_update_by_query `也可以使用 Ingest Node 的特点,通过指定一个 `pipeline`: ``` PUT _ingest/pipeline/set-foo { "description" : "sets foo", "processors" : [ { "set" : { "field": "foo", "value": "bar" } } ] } POST twitter/_update_by_query?pipeline=set-foo ``` ## URL 参数 除了标准的参数,如 `pretty`,通过查询 API 还支持 `refresh`,`wait_for_completion`,`wait_for_active_shards `和 `timeout`。 当请求完成时,发送 `refresh `将更新索引中的所有分片。这与索引 API 的 `refresh` 参数不同,这会使得仅仅分片获取到最新的数据来作为索引。 如果请求包含 `wait_for_completion=false 那么 `Elasticsearch 将执行一些预检,启动请求,然后返回一个 `task` 可与用于任务的 API 取消或获取任务的状态。Elasticsearch 还将创建此任务的记录,在文档 `.tasks/task/${taskId}中`。可以保留或删除认为合适的。当你进行删除时候,Elasticsearch 可以收回其使用的空间。 `wait_for_active_shards 控制着在请求时一个分片有多少个副本需要时活跃的`。`timeout `控制每个写请求的等待时间从不可用到可用。这两个同时运行就像在 Bulk API 中一样。 `requests_per_second `可以被设置为任何正十进制数(`1.4,``6`, `1000`等)并且禁用每秒字节流,或者它可以被设置为-1 来禁用字节流。禁用之后的等待时间,可以控制回滚延迟。等待时间不用于批处理完成时间和 `requests_per_second * requests_in_the_batch 的时间。由于该批次并没有分成多个批量传输的块,大的数据块将会导致 Elasticsearch 创建更多的请求导致较长时间的等待。这是突发的,不是平稳的。默认值为 -1。` ## 响应 Json 响应如下: ``` { "took" : 639, "updated": 0, "batches": 1, "version_conflicts": 2, "retries": { "bulk": 0, "search": 0 } "throttled_millis": 0, "failures" : [ ] } ``` took         从开始的毫秒数来结束整个操作过程。 updated         已成功更新的文件数量。 batches         通过查询请求返回响应的数目。 version_conflicts         版本冲突的次数。 retries         更新请求重试次数。bulk 是 buck action 重试的次数,search 是 search action 重试的次数。 throttled_millis         符合 `requests_per_second 的毫秒数。` failures         所有索引失败的集合。如果是非空的那么因为这些失败请求失效。请参阅 `conflicts `如何防止从版本冲突中停止运行。 ## Works With the Task API 你可以通过 Task API 获取所有正在运行的更新请求状态: ``` GET _tasks?detailed=true&actions=*byquery ``` 响应如下: ``` { "nodes" : { "r1A2WoRbTwKZ516z6NEs5A" : { "name" : "r1A2WoR", "transport_address" : "127.0.0.1:9300", "host" : "127.0.0.1", "ip" : "127.0.0.1:9300", "attributes" : { "testattr" : "test", "portsfile" : "true" }, "tasks" : { "r1A2WoRbTwKZ516z6NEs5A:36619" : { "node" : "r1A2WoRbTwKZ516z6NEs5A", "id" : 36619, "type" : "transport", "action" : "indices:data/write/update/byquery", "status" : {(1) "total" : 6154, "updated" : 3500, "created" : 0, "deleted" : 0, "batches" : 4, "version_conflicts" : 0, "noops" : 0, "retries": { "bulk": 0, "search": 0 } "throttled_millis": 0 }, "description" : "" } } } } } ``` (1)这个对象的实际状态。它就像响应中 json 用做  `total 的`重要补充。`total `是操作任务的总数重新索引预计执行。您可以添加 `updated`,`created `以及 `deleted 等多个`域。当它们之和等于 `total `字段值时该请求将完成。 通过 task id 可以直接获得 task: ``` GET /_tasks/taskId:1 ``` 此 API 的优点是,它具有集成 `wait_for_completion=false` 到返回完成任务的状态。如果任务完成,并 `wait_for_completion=false `设置,将会返回 `results `或 `error `。此功能的成本,该文件 `wait_for_completion=false`创建的`.tasks/task/${taskId}`。它是由你来删除该文档。 ## Works With the Cancel Task API 通过 Task Cancle API 可以取消任何通过请求的更新: ``` POST _tasks / TASK_ID:1 / _cancel ``` 利用上述 task API 可以获取到 task id。 取消任务很快就会执行,通常需要几秒钟。上述的任务状态 API 将任务以列表形式展示直到它被取消。 ## Rethrottling `requests_per_second 的值`可以在运行更新中可以通过使用 `_rethrottle `的 API 来改变: ``` POST _update_by_query/task_id:1/_rethrottle?requests_per_second=-1 ``` 利用上述 task API 可以获取到 task id。 在设置上,它就像 `_update_by_query `API 一样,`requests_per_second` 可以设置成 `-1 来`禁用字节流和非整数。Rethrottling 是加快查询需要,但是会减慢查询在完成当前批后立即生效的效果,这可以防止回滚超时。 ## Manually slicing 通过请求的更新支持分片回滚,允许更加简单的手动多线程操作: ``` POST twitter/_update_by_query { "slice": { "id": 0, "max": 2 }, "script": { "inline": "ctx._source['extra'] = 'test'" } } POST twitter/_update_by_query { "slice": { "id": 1, "max": 2 }, "script": { "inline": "ctx._source['extra'] = 'test'" } } ``` 你可以通过如下来验证任务: ``` GET _refresh POST twitter/_search?size=0&q=extra:test&filter_path=hits.total ``` 结果 total 如下: ``` { "hits": { "total": 120 } } ``` ## 一个新的属性 假设你没有创建动态映射索引,那么用数据填充它,然后添加一个映射值获取数据字段: ``` {PUT test { "mappings": { "test": { "dynamic": false,(1) "properties": { "text": {"type": "text"} } } } } POST test/test?refresh { "text": "words words", "flag": "bar" } POST test/test?refresh { "text": "words words", "flag": "foo" } PUT test/_mapping/test(2) { "properties": { "text": {"type": "text"}, "flag": {"type": "text", "analyzer": "keyword"} } } ``` (1)这意味着,新的字段将不会被索引,只是存储 `_source`。 (2)这将更新映射并添加新的 `flag `。添加了新的字段,你必须针对其重新索引。 搜索数据将不会得到任何结果: ``` POST test/_search?filter_path=hits.total { "query": { "match": { "flag": "foo" } } } ``` ``` { "hits" : { "total" : 0 } } ``` 但是你可以发出一个 `_update_by_query `请求,获取新的映射: ``` POST test/_update_by_query?refresh&conflicts=proceed POST test/_search?filter_path=hits.total { "query": { "match": { "flag": "foo" } } } ``` ``` { "hits" : { "total" : 1 } } ``` 添加一个字段或者添加到多字段时,可以做同样的事情。