Website logo

Robert Chang

技術部落格

Elasticsearch - Update & Delete by query

document 的 CRUD 這篇文章中,示範了如何針對單一資源進行 CRUD。

而這篇文章則會提到如何使用 update_by_query 的方式來大量地更新 documents。

Update by query

從字面上的意義來說,就是 根據搜尋的結果進行更新,也就是會提供一個 query 的欄位,所有匹配到這個欄位的 documents 都會接受相同的更新。

來一個簡單的需求,針對所有年齡超過 20 歲的使用者,幫他們新增一個 workout 的 habit,我知道這個需求真的很奇怪,但只是在練習,不要吹毛求疵 :D

先看一下現在的資料是怎麼樣:

screen shot

在 update_by_query 之後,圖片上紅色框框的部分會希望被新增一個 habit 叫做 workout

來實際試試看:

screen shot

在請求的 payload 之中,就是由兩個 key 所組成:

  1. query:找到你想要匹配的 documents
  2. script:匹配到之後,對 document 所進行的操作

query 的部分,我們還沒有提到太多,但從字面上看應該也能看出是找出 age 大於且不等於 20 的 users。

script這篇文章 有提過該如何使用。

看看結果:

screen shot

確實兩個 age: 27 的 user 都被新增了一個 workout 的 habit。

雖然 endpoint 是叫做 update_by_query,但如果知道 script 如何使用的話,不一定只能拿來更新唷!

這些過程中,發生了什麼事?

這種涉及到大量更新的操作,背後的原理是一定要好好了解的。

📍 小複習 ! 更新單一的 document 是透過 routing 的方式,找到 primary shard 進行更新,之後才同步到 replica shard。

➡️ 複習連結

關於 update_by_query,Elasticsearch ( 以下簡稱 ES ) 所做的第一件事情就是 query。

為什麼要先 query?

這是為了之後的 snapshot 而準備的,ES 會先根據我們提供的 query 找到相對應的 documents,接著針對這些 documents 進行一次 snapshot。

為什麼要 snapshot?

試想一下,當進行 update_by_query 時,整個 ES 還是在運作,所以目標中的 document 也可能隨時被更新,這就會衍生出 上一篇文章 中所提到有關於樂觀鎖的問題。

要特別注意的是,這邊的 snapshot 並不是為了避免資料流失,而是為了這次的更新所準備的,可以想像成 ES 把這個時間點匹配到的 documents 給照起來,然後針對這些被照起來的 documents 進行更新,保證了操作的一致性以及可靠性。

這和關聯式資料庫常見的鎖的概念不一樣,鎖是為了保證同時只有一個請求可以處理單筆資料,但 ES 並不是這樣的機制。

如下圖,ES 還是在繼續處理其他的同樣對於 user_id = 1 的請求。

screen shot

update_by_query 就是針對 snapshot 上的資料進行更新,你一定會有疑問,這樣還是沒有解決衝突的問題呀?是的,我等等會解釋,但現在讓我們先把整個 update_by_query 的執行過程解釋完畢。

接著 ES 會在背景使用 Scroll API,滾動更新這些匹配的 documents。

Elastic Scroll API 參考文件

為什麼要用 scroll?

Scroll API 比想像中的還要複雜,它做了很多事情,但以今天這個 update_by_query 的例子,可以先想成因為效能的問題,需要使用 scroll 的方式去處理這些匹配的 documents,接著採用批量的方式更新,預設是 1000 筆。

而在完成 update_by_query 的請求後,回應中也會看到這個值,往上滾可以看到 batches 為 1,代表在一個 batch 中就完成任務,以此類推,當 query 的結果有 5400 筆,那 batches 就會是 6。

好的,接著 ES 開始針對這 1000 筆資料執行我們一開始所提供的 script 來更新 document,直到所有的 documents 都被更新完畢,這邊要提醒一下,滾動更新的過程預設是不會中斷的,不論發生什麼事情。

更新完畢後,ES 回傳這整個過程的結果,這樣 update_by_query 就結束了。

等等,衝突的問題呢?

就像我說的,這個更新的過程不會停下來,所以可以假設衝突 100% 會發生,是可以預期的錯誤。

這時候 ES 的做法是什麼呢?它會把這些有衝突的 document 記錄下來,然後不更新他們,接著我們可以根據 response 的結果,選擇下一步要怎麼做,像是針對這些 document 逐個更新等等。

如果你在想,它是怎麼找到衝突的?那你要回去看 這篇文章,一樣是透過 primary term 以及 sequence number 來解決問題。

下面是一個擁有 version conflict 的 response 範例:

{
  "version_conflicts": 1,
  // 省略....
  "retries": {
    "bulk": 0,
    "search": 0
  },
  // 省略...
  "failures": [
    {
      "index": "users",
      "type": "_doc",
      "id": "1",
      "cause": {
        "type": "version_conflict_engine_exception",
        "reason": "[_doc][5]: version conflict, current version [2] is different than the one provided [1]",
        "index_uuid": "c3M3h3MBGv6iNpvDroul",
        "shard": "0",
        "index": "users"
      },
      "status": 409
    }
  ]
}

Response 中有一個 retires 的值,那是做什麼的?

這是 ES 在防止一些非預期性錯誤採取的策略,和版本衝突不同的是,這些錯誤是 ES 沒辦法知道的,像是網路問題、Node 故障、資源阻塞等等。

這種時候採用重試的策略說不定可以解決問題,就和電腦有問題就重開機的道理很像。

而這個重試的預設值是 10 次,同樣的,10 次重試之後,這個更新也會被記錄到 failures 的欄位之中,好讓我們決定該怎麼做。

調整遇到衝突時的策略

我們可以在一開始請求的時候加入 conflicts: abort 的選項。

{
  // 省略
  "conflicts": "abort",
}

這會讓 ES 在執行 update_by_query 的過程中,一遇到版本衝突就停止後續所有的作業;而在這個 endpoint 的預設中,是採用 conflicts: proceed 的選項,就是一次到底。

Delete by query

如下圖所示,這個 API 是 POST /:index_name/_delete_by_query

screen shot

就是單純的把所有符合查詢的 documents 刪除。

結語

使用 update_by_query 時,由於它可能涉及大量的 I/O 操作,因此在 production 的環境中使用要特別小心。

它還可能觸發大量的 segment merging,可能對性能有短暫的影響,至於什麼是 segment?這是 ES 或是 Lucene 底層如何儲存資料的細節,目前不需要深入理解。

上一篇文章Elasticsearch - Versioning 以及樂觀鎖

下一篇文章Elasticsearch - Bulk Import