一、概述
(一)什麼是 ChangStreams
ChangeStreams 功能是基于 MongoDB Oplog 實作的,Oplog 提供增量資料, ChangeStreams 在 Oplog 之上包裹一層應用,對外提供一個 API 接口,将資料進行實 時推送,推送的資料類型包括:Insert、Delete、Update、Invalidate、DDL。Invalidate: 主要适用于監控的表被删除時,監控此時沒有意義,會傳回 Invalidate 的事件;DDL 事件 是資料庫操作語言,如 Create Database、Drop Database 等。總的來說, ChangeStreams 是基于 Oplog 實作的,提供推送實時增量推送功能。
(二)支援場景
版本的要求: ChangeStreams 支援副本集和分片叢集的 MongoDB 形态,版本版本>=3.6。
支援粒度的三個次元:
- 全部 DB
- 單個 DB
- 單個表
引擎的要求:
WiredTiger 引擎。
ReadConcern 的要求:
ChangeStreams<=4.0 時,需要 majority 的 ReadConcern;
考慮到使用者對于資料實時性的要求比較強,對資料一緻性的要求比較弱,是以 ChangeStreams>=4.2 時放開了這個要求。我們建議使用者還是配置一個 Majority 的 ReadConcern 級别,不過使用者可根據自己場景的不同,适當放開這個一緻性要求。
(三)版本曆史
MongoDB 3.6 以前
MongoDB3.6 以前,使用者隻能自己從 Local.Oplog.Rs 表拉取增量變更的 Oplog 數
據。
困難點:1.⼿動設定過濾條件。2.分⽚叢集處理⾮常複雜。3.⾃⼰管理斷點續傳。
MongoDB 3.6
MongoDB ChangeStreams 正式釋出後,能夠提供一個實時吐出增量變更資訊,方 便使用者的運維處理。
支援 PostImage。資料發生變更,發生 Update 操作,那麼資料發生變更之前它是 PreImage,資料發生變更之後它是 PostImage。3.6 提供了 PostImage 的鏡像,同時它 還支援斷點續傳管理.
MongoDB 4.0-4.2
ChangeStreams 不斷進行優化,更好的支援分片叢集的場景。
4.0.7 支援 PostBatchResumeToken,使得位點能夠更好推進,防止使用者在發生 MongoDB 重新開機或用戶端重新開機導緻資料位點回退,進而引發大批量無效的資料掃描。
4.0 的多文檔事務,4.2 的分布式事務,ChangeStreams 都進行了很好地支援。
支援指定時間點啟啟動/恢複的功能。
使用者可以指定任意時間點,ChangeStreams 都可以從此時間點進行啟動 ChangeStreams 流以及恢複資料。
MongoDB 4.4+
MongoDB ChangeStreams 性能持續進行優化。
即将支援 PreImage,在性能上也得到了較大的提升。
随着 MongoDB ChangeStreams 版本不斷進行疊代,功能和性能上做了很大的優
化。
(四)使用場景
案例 1.監控
使用者需要及時擷取變更資訊(如賬戶相關的表), ChangeStreams 可以提供監控功 能,一旦相關的表資訊發生變更,就會将變更的消息實時推送出去。
案例 2.分析平台
如需要基于增量去分析使用者的一些行為,可以基于 ChangeStreams 把資料拉出來, 推到下遊的計算平台, 如說像 Flink、Spark 等計算平台等等。
案例 3.資料同步
基于 ChangeStreams,使用者可以搭建額外的 MongoDB 叢集,這個叢集是從原端的 MongoDB 拉取過來的,那麼這個叢集可以做一個熱備份,假如源端叢集發生網絡不通等 等之類的變故,備叢集就可以接管服務。
還可以做一個冷備份,如使用者基于 ChangeStreams 把資料同步到檔案,萬一雲端數 據庫發生不可服務,就可以從檔案裡恢複出完整的 MongoDB 資料庫,繼續提供服務。 (當然,此處還需要借助定期全量備份來一同完成恢複)
另外資料同步它不僅僅局限于同一地域,可以跨地域,從北京到上海甚至從中國到美國 等等。
案例 4.消息推送
假如使用者想實時了解公共汽車的資訊,那麼公共汽車的位置每次變動,都實時推送變更的信 息給想了解的使用者,使用者能夠實時收到公共汽車變更的資料,非常便捷實用。
總的來說,使用者可以于 MongoDB ChangeStreams 功能,進行平台化建構,滿足 使用者的各項需求。當然,使用者的需求可以是多樣化,不僅僅局限這幾個案例。
二、功能介紹
(一)特性
ChangeStreams 可歸納為 5 部分
持久性:
Majority-CommittedChanges
資料能夠保證持久化,不會被復原。
斷點續傳:
通過 Resume Token 進行斷點續傳的功能。
順序性:
對于副本集保證線性一緻性,對于分片叢集保證因果一緻性。
安全性:
ChangeStreams 可以進行安全控制。
靈活性:
因為 ChangeStreams 本身是基于 MongoDB Changes Aggregate 架構來實作 的,是以使用者還可以在 Aggregate 上添加一些步驟,實作過濾、計算等需求。
(二)Majority-Committed Changes 的持久化
如使用者寫請求寫到了 Primary 上,那麼這個時候 Primary 上産生一條 Oplog,此時 ChangeStreams 并不會把 Oplog 吐出來,它還會把資料寫到 Secondary 上,等 Secondary 寫成功後才把這個資料吐出來,防止使用者寫 Primary 成功,之後 Primary 發 生當機的情況,Secondary 成為新的 Primary,導緻資料被復原。
是以,ChangeStreams 吐出的資料都是持久化成功的資料。
(三)斷點續傳
舉例:如使用者從 MongoDB 去拉取一個實時的資料,此資料是根據時間戳遞增的,如 8:00、9:00、10:00,那麼如果 10:00 時間戳 MongoDB 發生當機,或者使用者本身的服務 發生意外,導緻連接配接電路斷開,這時候如果 MongoDB 或 Server 端恢複服務,希望資料 繼續接着 10:00 開始拉取。
恢複後服務端發送一條消息“請給我 10:00 以後的資料。”那麼 MongoDB 收到這個 消息後,繼續把 10:00 以後的資料源源不斷的通過 ChangeStreams 推送出來,10:00 後 是 11:00、 12:00,如此這般服務就能正常運作。
(四)順序性—如何滿足因果一緻性
假如使用者寫請求,将 Insert,a=1,寫在 Shard2 上,此時這條語句通過 ChangeStreams 吐出來了.
後來使用者又 Update 了文檔,把它從 a=1 改成 a=2,這條操作落在了 Shard2 上,此 時 ChangeStreams 會把第 2 條文檔輸出,也就是說這兩條文檔是具有前後因果性的,不 會先出第 2 條文檔再吐出第 1 條文檔,順序保持嚴格的因果性。
如使用者 Insert 了 1 條資料是 a=1,然後它落在 Shard2 上,然後又 Insert 了 1 條資料 a=2,但它落在 Shard3 上。
那麼這 2 條資料它是同時落在 2 個不同的 Shard 上的,那麼 ChangeStreams 它吐 出的順序可能是先 a=2,再 a=1,也就是說這 2 條資料順序在不滿足的條件下,那麼它輸 出的順序是不能夠保證順序性。
(五)ChangeStreams vs Oplog 拉取的對比
對接/使用成本:
Oplog 拉取是遠遠高于 ChangeStream 的,因為使用者需要自去監聽一個表,然後 Find+getMore 拉取,需要做過濾,對事務還需要進行一些額外的處理等等。
副本集支援:
Oplog 拉取和 ChangeStream 都是支援的。
DML 支援:
DDL 支援:
Oplog 拉取是全部支援的,ChangeStream 目前支援 dropCollection,dropDataba se,renameCollection 這 3 個語句,但後續官方會持續完善 DDL 語句,如 Create Index,Drop Index 等。
叢集闆支援:
Oplog 拉取必須關閉 Balancer,否則拉取出來 Oplog 不能保證因果一緻性。 另外 DDL 需要去重,在 ChangeStream 裡不需要額外去處理,成本對接很低。
事務處理:
Oplog 拉取的事務處理比較繁瑣,如事務中 5 條語句同步 2 條語句發生斷開, 重新開機後 事務處理會很繁瑣。ChangeStream 不需要額外的對接,隻當做一個普通語句。
斷點續傳:
Oplog 拉取是根據時間戳的,ChangeStream 可以根據時間戳,也可以根據 Token。
實時性/吞吐:
Oplog 拉取的實時性和吞吐比 ChangeStream 更高,ChangeStream 為了兼顧一 緻性,加上目前實作方式是通過副本集上單線程拉取,是以 ChangeStream 性能上略低 于 Oplog 拉取。
但是未來MongoDB官方将會持續進行優化,使ChangeStream的性能追上Oplog, 縮小 Gap。
權限控制 :
Oplog 拉取隻能是 2 種權限,All or Nothing,All 意思是全部的權限,Nothing 的意 思是沒有權限,是以要麼能監聽到所有表所有資料,要麼 1 條都拉不到。
ChangeStream 權限控制會更細粒度,使用者可以根據目前賬号的權限,有哪些表權限 就提供哪些表權限的拉取。
三、使用介紹
(一)MongoShell 示例
如何根據 Mongoshell 去使用 ChangeStream
1)DB 參數
DB 這個部分參數可以有 3 個參數:
- 單個 DB:db.Watch()
- 全部 DB:db.GetMongo() . Watch()
- 單個表:db.Collection.Watch()
2)Aggregate 的架構
這個參數預設可以留白,使用者如果有過濾、計算等需求可以添加到 Stage 裡。
如使用者可在$Match 比對到感興趣的一些 Field,需要 Insert 和 Uptate 操作,那麼可 以就在裡面進行一個比對。另外拉取到一個字段,使用者可能不需要這麼多字段,隻需要某幾 個字段,那麼就可以通過$Project 去進行映射,拿到自己感興趣的字段。
3)ChangeStream 的一些詳細參數
如 FullDocument 是吐出整個,預設是沒有的。
ResumeAfter 是根據 Token 進行斷點續傳,StartAfter 是根據 Token 啟動一個新 的監聽流,這 2 個差別是,如表在中間過程被 Drop 後斷開,那麼 ResumeAfter 就無法 恢複這個表了,因為今天的表本身已經 Drop。
StartAfter 是啟動一個新的監聽流。
StartAtOperationTime 是根據輸入的時間戳啟動一個監聽。
MaxAwaitTimeMS 是指逾時的時間,使用者設定逾時時間,如果該時間内沒有資料返 回,那麼連接配接将會被中斷。
BatchSize 就是 1 次傳回的 1 個 Batch 大小,就是 1 次傳回聚合 ,多少條 Event 傳回。
4)具體傳回 event 格式
ID 字段是存儲元資訊,目前元資訊隻包括 Data 字段,Data 字段意思是存儲的是 ResumeToken,ChangeStream 每次都會把 Event 包括 ResumeToken,使用者拿到 Token 後可進行存儲,下次連接配接斷開可根據 Token 進行斷點續傳。
OperationType 操作類型包括 Insert,Delete,Replace,Update, Rename, DropDatabase,Invalidate。
Ns 是操作命名空間,就是 Namespace。 它某個是 DB 下面的某個表。
To 是指用于 RenameCollection 後的一個新的命名空間。
DocumentKey 是包括了一個_ID,指目前操作的文檔的主鍵_ID 是什麼。
UpdateDescription是OperationType=Update的時候出現,相當于是增量的修改。
ClusterTime 是操作一個時間戳,相當于 Oplog 裡的 Ts 字段,是一個混合邏輯時間 時鐘。
TxnNumber 是隻在事務裡面出現,是一個事務内部遞增的序列号,
lsid 表示 Logic Session ID,是請求所在的 Session 的 ID。
5)Insert 操作
使用者 Insert 條資料,x 等于 1,就會得到如圖 Event 的格式,格式為文檔類型。
首先是_ID,裡面包括 Data,是對 Token 進行序列化後的字元串。
然後 OperationType 是 Insert 類型,表示此操作為插入,ClusterTime 是一個 64 位 時間戳,高位是一個 32 位的秒級時間戳,低位是一個計數。
FullDocument 就是整個操作的 PostImage 更新後的一個文檔,然後 Ns Docume ntKey 就是整個文檔的主鍵 ID。
6) Update 操作
Update 操作基本類似,OperationType 變成了 Replace 操作。 FullDocument 吐 出了整個 Document 操作更新後資料。
對于$Set 或$Unset 的更新來說,它吐出的内容沒有 FullDocument 的字段,意味着 這裡面沒有 PostImage。它包括是 UpdateDescription 的字段,如圖更新了一個 d 字段, 更新以後的值是 4,另外如删了一個 c 字段,那麼它是展現在 RemovedFields 裡。
如果使用者想在此時拿到整個 PostImage 就需要去設定 FullDocument=True 參數, 就可在此更新場景下拿到整個更新後的文檔。
7)Drop 操作
如監聽的某個表被“Drop”了,那麼它會先吐出 “Drop”的 OperationType,之後 會再吐出一個 Invalidate 事件,表示此表已被删掉除,繼續監聽就失去意義,是以此時連 接會被斷開。
四、原理介紹
基本原理
案例 1 副本集場景
使用者啟動一個 ChangeStreams,它就 Watch 了一個表或一個 DB,甚至所有的 DB,這 個請求會發到了一個 MongoDB 上面。此請求會建立一個 Cursor,然後使用者通過 Cursor 不斷進行 GetMore,拿到使用者所希望得到的資料。
這個機制和使用者去放的加 Find+GetMore、拉取一些表和資料,原理基本一樣。
内部實作上,ChangeStreams 在副本集裡面做了哪些操作?
案例:
第一階段
MongoDB 收到 ChangeStreams 請求後會先過濾 Oplog,也就是說他先去拉 Oplog 表,然後過濾 Oplog,根據使用者設定的參數,如使用者隻要某個表,那麼就會過濾掉 其他庫表的資料,同時它還會過濾掉本身沒用的 Oplog 資料,如 Noop Event。
第二階段
會在過濾完後,它會把 Oplog 的資料轉化成 ChangeStreams Event,因為 Oplog 和 ChangeStreams Event 格式是不一樣的,需要進行轉換。
第三階段
它會去判斷這個是否需要傳回 Invalidate,如說表被删掉了,此時就需要傳回 Invalidate。
第四階段
ChangeStreams 需要判斷是否可以恢複資料流。
如使用者指定了一個時間戳,指定一個 Token,需要去判斷是否可以進行恢複。
第五階段
如果是 Invalidate,則需要處理具體關閉 Cursor 邏輯。
最後
如果參數設定了 FullDocument=True,則會進⾏⼀次額外的 Query。
分片叢集和副本集相比, Shard 上的功能和副本集功能基本一緻,此外,Mongos 還需要去承擔“消息轉發”和“消息聚合”功能,
分片叢集和副本集相比, Mongos 需要去承擔“轉發”和“消息聚合”功能。
使用者向 Mongos 送出請求,要 10:00 以後所有 DB1 的變更資料,Mongos 收到請求 後,才會把請求發送給所有的 Shard 上,建立 3 個 Cursor,告訴每個 Shard “請給我 1 0:00 以後所有 DB1 的變更資料”。
以 Shard2 為例,Shard2 上會先去檢視 Oplog 表,拿到 10 點以後的資料,過濾掉 10 點以前的資料。Oplog 表中,第 3 條發生在 10:00,db1 資料 op=u,是一個 Update 操 作,此操作符合,是以它會傳回給 MongoS;第 4 條 10:10 符合條件,但它是 db2 不是 db1,是以這條資料會被過濾掉;第 5 條 10:20 是 db1,它本身是一個 Delete 操作,是以 這個語句也符合,它也會傳回給 MongoS;
此外,别的 Shard 也是同理,如 Shard1 有一條 op=i 的操作,Shard3 有一條 op=d 的操作,這些語句都會在 Mongos 上進行聚合,排序,傳回給使用者,通過 ChangeStrea ms 按時間順序吐出。
這個案例介紹了 Mongos 如何處理消息分發。關于聚合 Mongos 不會這麼粗暴,因為 本身 ChangeStreams 是一個實時資料流的過程,它的消息是不斷推送的,不會一次性等 待 1 個小時的資料,然後進行排序再傳回,這樣使用者的實時性會受到極大的損失。
是以 MongoDB 采用更細粒度的控制方法去解決消息如何排序,如何吐出。
下圖中,正方形方框内的數字,表示 oplog 或 event 的時間戳,
此時 Mongos 已傳回“所有時間<=2”的資料,那麼 Mongos 到 Shard 上建立不同 的 Cursor,每個 Cursor 都有 1 個隊列 DocBuffer Queue。存放着從 Shard 上拉取的 資料,如 Shard1 拉到的資料“4、6、12、13”,Shard2 拉到的資料是“5、9、11、14”, Shard3 上拉到的資料“3、7、8、10”,然後 MongoS 會根據傳回的時間戳進行聚合排 序。
總結來說就是多路歸并+小頂堆的排序算法。Mongos 會比較每個 DocBuffer 隊列頭 部的元素哪個最小,然後将資料拿出。如圖會比較 Shard1 的 4,Shard2 的 5,Shard3 的 3,發現 3 是最小的,然後将 3 拿出來。接着會比較 4,5,7,将 4 拿出來,最後依次拿到 了“3、4、5、6、7、8、9、10”,10 條 Oplog(或 Event),将這些消息聚合,排完 序以後傳回給使用者。
傳回後,MongoS 會繼續進行資料的拉取排序。
繼續檢視 DocBuffer Queue1 有“12、13”、 Queue2 有“11、14”、 Queue3 目前沒有資料(沒有資料就無法進行排序)。此時不能對 Queue1 和 Queue2 的資料排序 後傳回。因為假如先傳回了 Queue2 的 11,但很有可能能因為網絡原因,或其他原因導緻 Shard3 資料沒有立刻傳回,比如後面 Queue3 傳回了 10,比 11 還小,如果之前 11 已經 傳回,則破壞了順序性,是以此時資料不能傳回出去。
這個時候 Mongos 的處理,會繼續發送 3 條 GetMore 請求,到 3 個 Shard 上,然 後自己拉取資料,然後放到 DocBuffer Queue 裡進行緩存。
如下圖:
Shard1 裡傳回兩條資料“20、22”, Shard2 裡傳回兩條資料“21、27”, Shard3 依舊沒有資料傳回,Shard3 沒有資料不會什麼都不傳回,它會傳回 1 個承諾,這個 承諾的作用是告訴 Mongos 雖然現在沒有資料,但下次将傳回>17 的資料時間戳。
Mongos 拿到這個承諾以後就知道可以對<17 的所有資料進行排序操作,這樣使用者就不需 要等待。
然後 Mongos 會将“11、12、13、14”按時間進行排序然後傳回給使用者并更新“ MinPromisedSortKey=17”。然後下面繼續重複剛才的過程,它是實時流的過程,不斷 的請求 Shard,然後拉取資料,然後再到 DocBuffer Queue 裡進行緩存,然後進行排序 這樣一個過程。
快速掌握MongoDB核心技術幹貨目錄
電子書下載下傳:《玩轉MongoDB從入門到實戰》 | https://developer.aliyun.com/article/780915 |
走進 MongoDB | https://developer.aliyun.com/article/781079 |
MongoDB聚合架構 | https://developer.aliyun.com/article/781095 |
複制集使用及原理介紹 | https://developer.aliyun.com/article/781137 |
分片叢集使用及原理介紹 | https://developer.aliyun.com/article/781104 |
ChangeStreams 使用及原理 | https://developer.aliyun.com/article/781107 |
事務功能使用及原理介紹 | https://developer.aliyun.com/article/781111 |
MongoDB最佳實踐一 | https://developer.aliyun.com/article/781139 |
MongoDB最佳實踐二 | https://developer.aliyun.com/article/781141 |