在 document 的 CRUD 這篇文章中,示範了如何針對單一資源進行 CRUD。
而這篇文章則會提到如何使用 update_by_query 的方式來大量地更新 documents。
Update by query
從字面上的意義來說,就是 根據搜尋的結果進行更新,也就是會提供一個 query 的欄位,所有匹配到這個欄位的 documents 都會接受相同的更新。
來一個簡單的需求,針對所有年齡超過 20 歲的使用者,幫他們新增一個 workout
的 habit,我知道這個需求真的很奇怪,但只是在練習,不要吹毛求疵 :D
先看一下現在的資料是怎麼樣:
在 update_by_query 之後,圖片上紅色框框的部分會希望被新增一個 habit 叫做 workout
。
來實際試試看:
在請求的 payload 之中,就是由兩個 key 所組成:
- query:找到你想要匹配的 documents
- script:匹配到之後,對 document 所進行的操作
query 的部分,我們還沒有提到太多,但從字面上看應該也能看出是找出 age
大於且不等於 20 的 users。
script 在 這篇文章 有提過該如何使用。
看看結果:
確實兩個 age: 27
的 user 都被新增了一個 workout
的 habit。
雖然 endpoint 是叫做 update_by_query,但如果知道 script 如何使用的話,不一定只能拿來更新唷!
這些過程中,發生了什麼事?
這種涉及到大量更新的操作,背後的原理是一定要好好了解的。
📍 小複習 ! 更新單一的 document 是透過 routing 的方式,找到 primary shard 進行更新,之後才同步到 replica shard。
➡️ 複習連結
關於 update_by_query,Elasticsearch ( 以下簡稱 ES ) 所做的第一件事情就是 query。
為什麼要先 query?
這是為了之後的 snapshot 而準備的,ES 會先根據我們提供的 query 找到相對應的 documents,接著針對這些 documents 進行一次 snapshot。
為什麼要 snapshot?
試想一下,當進行 update_by_query 時,整個 ES 還是在運作,所以目標中的 document 也可能隨時被更新,這就會衍生出 上一篇文章 中所提到有關於樂觀鎖的問題。
要特別注意的是,這邊的 snapshot 並不是為了避免資料流失,而是為了這次的更新所準備的,可以想像成 ES 把這個時間點匹配到的 documents 給照起來,然後針對這些被照起來的 documents 進行更新,保證了操作的一致性以及可靠性。
這和關聯式資料庫常見的鎖的概念不一樣,鎖是為了保證同時只有一個請求可以處理單筆資料,但 ES 並不是這樣的機制。
如下圖,ES 還是在繼續處理其他的同樣對於 user_id = 1
的請求。
而 update_by_query 就是針對 snapshot 上的資料進行更新,你一定會有疑問,這樣還是沒有解決衝突的問題呀?是的,我等等會解釋,但現在讓我們先把整個 update_by_query 的執行過程解釋完畢。
接著 ES 會在背景使用 Scroll API,滾動更新這些匹配的 documents。
Elastic Scroll API 參考文件為什麼要用 scroll?
Scroll API 比想像中的還要複雜,它做了很多事情,但以今天這個 update_by_query 的例子,可以先想成因為效能的問題,需要使用 scroll 的方式去處理這些匹配的 documents,接著採用批量的方式更新,預設是 1000 筆。
而在完成 update_by_query 的請求後,回應中也會看到這個值,往上滾可以看到 batches 為 1,代表在一個 batch 中就完成任務,以此類推,當 query 的結果有 5400 筆,那 batches 就會是 6。
好的,接著 ES 開始針對這 1000 筆資料執行我們一開始所提供的 script
來更新 document,直到所有的 documents 都被更新完畢,這邊要提醒一下,滾動更新的過程預設是不會中斷的,不論發生什麼事情。
更新完畢後,ES 回傳這整個過程的結果,這樣 update_by_query 就結束了。
等等,衝突的問題呢?
就像我說的,這個更新的過程不會停下來,所以可以假設衝突 100% 會發生,是可以預期的錯誤。
這時候 ES 的做法是什麼呢?它會把這些有衝突的 document 記錄下來,然後不更新他們,接著我們可以根據 response 的結果,選擇下一步要怎麼做,像是針對這些 document 逐個更新等等。
如果你在想,它是怎麼找到衝突的?那你要回去看 這篇文章,一樣是透過 primary term
以及 sequence number
來解決問題。
下面是一個擁有 version conflict 的 response 範例:
{
"version_conflicts": 1,
// 省略....
"retries": {
"bulk": 0,
"search": 0
},
// 省略...
"failures": [
{
"index": "users",
"type": "_doc",
"id": "1",
"cause": {
"type": "version_conflict_engine_exception",
"reason": "[_doc][5]: version conflict, current version [2] is different than the one provided [1]",
"index_uuid": "c3M3h3MBGv6iNpvDroul",
"shard": "0",
"index": "users"
},
"status": 409
}
]
}
Response 中有一個 retires 的值,那是做什麼的?
這是 ES 在防止一些非預期性錯誤採取的策略,和版本衝突不同的是,這些錯誤是 ES 沒辦法知道的,像是網路問題、Node 故障、資源阻塞等等。
這種時候採用重試的策略說不定可以解決問題,就和電腦有問題就重開機的道理很像。
而這個重試的預設值是 10 次,同樣的,10 次重試之後,這個更新也會被記錄到 failures
的欄位之中,好讓我們決定該怎麼做。
調整遇到衝突時的策略
我們可以在一開始請求的時候加入 conflicts: abort
的選項。
{
// 省略
"conflicts": "abort",
}
這會讓 ES 在執行 update_by_query 的過程中,一遇到版本衝突就停止後續所有的作業;而在這個 endpoint 的預設中,是採用 conflicts: proceed
的選項,就是一次到底。
Delete by query
如下圖所示,這個 API 是 POST /:index_name/_delete_by_query
就是單純的把所有符合查詢的 documents 刪除。
結語
使用 update_by_query 時,由於它可能涉及大量的 I/O 操作,因此在 production 的環境中使用要特別小心。
它還可能觸發大量的 segment merging,可能對性能有短暫的影響,至於什麼是 segment?這是 ES 或是 Lucene 底層如何儲存資料的細節,目前不需要深入理解。