天天看點

MongoDB 4.2 核心解析 - Change Stream

MongoDB 4.2 核心解析 - Change Stream

鏡像下載下傳、域名解析、時間同步請點選

阿裡巴巴開源鏡像站

作者:張友東

MongoDB 從3.6版本開始支援了 Change Stream 能力(4.0、4.2 版本在能力上做了很多增強),用于訂閱 MongoDB 内部的修改操作,change stream 可用于 MongoDB 之間的增量資料遷移、同步,也可以将 MongoDB 的增量訂閱應用到其他的關聯系統;比如電商場景裡,MongoDB 裡存儲新的訂單資訊,業務需要根據新增的訂單資訊去通知庫存管理系統發貨。

一、Change Stream 與 Tailing Oplog 對比

在 change stream 功能之前,如果要擷取 MongoDB 增量的修改,可以通過不斷

tailing oplog

  的方式來

拉取增量的 oplog

,然後針對拉取到的 oplog 集合,來過濾滿足條件的 oplog。這種方式也能滿足絕大部分場景的需求,但存在如下的不足。

  1. 使用門檻較高,使用者需要針對 oplog 集合,打開特殊選項的的 tailable cursor  ("tailable": true, "awaitData" : true)。
  2. 使用者需要自己管理增量續傳,當拉取應用 crash 時,使用者需要記錄上一條拉取oplog的 ts、h 等字段,在下一次先定位到指定 oplog 再繼續拉取。
  3. 結果過濾必須在拉取側完成,但隻需要訂閱部分 oplog 時,比如針對某個 DB、某個 Collection、或某種類型的操作,必須要把左右的 oplog 拉取到再進行過濾。
  4. 對于 update 操作,oplog 隻包含操作的部分内容,比如

    {$set: {x: 1}}

    ,而應用經常需要擷取到完整的文檔内容。
  5. 不支援 Sharded Cluster 的訂閱,使用者必須針對每個 shard 進行 tailing oplog,并且這個過程中不能有 moveChunk 操作,否則結果可能亂序。

MongoDB Change Stream 解決了 Tailing oplog 存在的不足,具有以下特點。

  1. 簡單易用,提供統一的 Change Stream API,一次 API 調用,即可從 MongoDB Server 側擷取增量修改。
  2. 統一的進度管理,通過 resume token 來辨別拉取位置,隻需在 API 調用時,帶上上次結果的 resume token,即可從上次的位置接着訂閱。
  3. 支援對結果在 Server 端進行 pipeline 過濾,減少網絡傳輸,支援針對 DB、Collection、OperationType 等次元進行結果過濾。
  4. 支援 fullDocument: "updateLookup" 選項,對于 update,傳回當時對應文檔的完整内容。
  5. 支援 Sharded Cluster 的修改訂閱,相同的 API 請求發到 mongos ,即可擷取叢集次元全局有序的修改。

二、Change Stream 實戰

以 Mongo shell 為例,使用 Change Stream 非常簡單,mongo shell 封裝了針對整個執行個體、DB、Collection 級别的訂閱操作。

db.getMongo().watch()    訂閱整個執行個體的修改
db.watch()               訂閱指定DB的修改
db.collection.watch()    訂閱指定Collection的修改           

1.建立連接配接1發起訂閱操作

mytest:PRIMARY>db.coll.watch([], {maxAwaitTimeMS: 60000})  最多阻塞等待 1分鐘           

2.建立連接配接2寫入新資料

mytest:PRIMARY> db.coll.insert({x: 100})
WriteResult({ "nInserted" : 1 })
mytest:PRIMARY> db.coll.insert({x: 101})
WriteResult({ "nInserted" : 1 })
mytest:PRIMARY> db.coll.insert({x: 102})
WriteResult({ "nInserted" : 1 })           

3.連接配接1上收到 Change Stream 更新

mytest:PRIMARY> db.watch([], {maxAwaitTimeMS: 60000})
{ "_id" : { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934389, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9"), "x" : 100 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9") } }
{ "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } }
{ "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }           

4.上述 ChangeStream 結果裡,_id 字段的内容即為 resume token,辨別着 oplog 的某個位置,如果想從某個位置繼續訂閱,在 watch 時,通過 resumeAfter 指定即可。比如每個應用訂閱了上述3條修改,但隻有第一條已經成功消費了,下次訂閱時指定第一條的 resume token 即可再次訂閱到接下來的2條。

mytest:PRIMARY> db.coll.watch([], {maxAwaitTimeMS: 60000, resumeAfter: { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }})
{ "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } }
{ "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }           

三、Change Stream 内部實作

1. watch() wrapper

db.watch() 實際上是一個 API wrapper,實際上 Change Stream 在 MongoDB 内部實際上是一個 aggregation 指令,隻是加了一個特殊的

$changestream

  階段,在發起 change stream 訂閱操作後,可通過 db.currentOp() 看到對應的 aggregation/getMore 操作的詳細參數。

{
      "op" : "getmore",
      "ns" : "test.coll",
      "command" : {
        "getMore" : NumberLong("233479991942333714"),
        "collection" : "coll",
        "maxTimeMS" : 50000,
        "lsid" : {
          "id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")
        },
      },
      "planSummary" : "COLLSCAN",
      "cursor" : {
        "cursorId" : NumberLong("233479991942333714"),
        "createdDate" : ISODate("2019-12-31T06:35:52.479Z"),
        "lastAccessDate" : ISODate("2019-12-31T06:36:09.988Z"),
        "nDocsReturned" : NumberLong(1),
        "nBatchesReturned" : NumberLong(1),
        "noCursorTimeout" : false,
        "tailable" : true,
        "awaitData" : true,
        "originatingCommand" : {
          "aggregate" : "coll",
          "pipeline" : [
            {
              "$changeStream" : {
                "fullDocument" : "default"
              }
            }
          ],
          "cursor" : {
          },
          "lsid" : {
            "id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")
          },
          "$clusterTime" : {
            "clusterTime" : Timestamp(1577774144, 1),
            "signature" : {
              "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
              "keyId" : NumberLong(0)
            }
          },
          "$db" : "test"
        },
        "operationUsingCursorId" : NumberLong(7019500)
      },
      "numYields" : 2,
      "locks" : {
      }
    }           

2. resume token

resume token 用來描述一個訂閱點,本質上是 oplog 資訊的一個封裝,包含 clusterTime、uuid、documentKey等資訊,當訂閱 API 帶上 resume token 時,MongoDB Server 會将 token 轉換為對應的資訊,并定位到 oplog 起點繼續訂閱操作。

struct ResumeTokenData {
    Timestamp clusterTime;
    int version = 0;
    size_t applyOpsIndex = 0;
    Value documentKey;
    boost::optional<UUID> uuid;
};           

ResumeTokenData 結構裡包含 version 資訊,在 4.0.7 以前的版本,version 均為0; 4.0.7 引入了一種新的 resume token 格式,version 為 1; 另外在 3.6 版本裡,Resume Token 的編碼與 4.0 也有所不同;是以在版本更新後,有可能出現不同版本 token 無法識别的問題,是以盡量要讓 MongoDB Server 所有元件(Replica Set 各個成員,ConfigServer、Mongos)都保持相同的核心版本。

更詳細的資訊,參考

https://docs.mongodb.com/manual/reference/method/Mongo.watch/#resumability

3. updateLookup

Change Stream 支援針對 update 操作,擷取目前的文檔完整内容,而不是僅更新操作本身,比如

mytest:PRIMARY> db.coll.find({_id: 101})
{ "_id" : 101, "name" : "jack", "age" : 18 }
mytest:PRIMARY> db.coll.update({_id: 101}, {$set: {age: 20}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })           

上面的 update 操作,預設情況下,change stream 會收到

 {_id: 101}, {$set: {age: 20}

  的内容,而并不會包含這個文檔其他未更新字段的資訊;而加上 fullDocument: "updateLookup" 選項後,Change Stream 會根據文檔 _id 去查找文檔目前的内容并傳回。

需要注意的是,updateLookup 選項隻能保證最終一緻性,比如針對上述文檔,如果連續更新100次,update 的 change stream 并不會按順序收到中間每一次的更新,因為每次都是去查找文檔目前的内容,而目前的内容可能已經被後續的修改覆寫。

4. Sharded cluster

Change Stream 支援針對 sharded cluster 進行訂閱,會保證全局有序的傳回結果;為了達到全局有序這個目标,mongos 需要從每個 shard 都傳回訂閱結果按時間戳進行排序合并傳回。

在極端情況下,如果某些 shard 寫入量很少或者沒有寫入,change stream 的傳回延時會受到影響,因為需要等到所有 shard 都傳回訂閱結果;預設情況下,mongod server 每10s會産生一條 Noop 的特殊oplog,這個機制會間接驅動 sharded cluster 在寫入量不高的情況下也能持續運轉下去。

由于需要全局排序,在 sharded cluster 寫入量很高時,Change Stream 的性能很可能跟不上;如果對性能要求非常高,可以考慮關閉 Balancer,在每個 shard 上各自建立 Change Stream。

四、參考資料

提供全面,高效和穩定的系統鏡像、應用軟體下載下傳、域名解析和時間同步服務。”