天天看點

【硬剛大資料】從零到大資料專家之Apache Doris篇

 歡迎關注部落格首頁:https://blog.csdn.net/u013411339

歡迎點贊、收藏、留言 ,歡迎留言交流!

本文由【王知無】原創,首發于 CSDN部落格!

本文首發CSDN論壇,未經過官方和本人允許,嚴禁轉載!

【硬剛大資料】從零到大資料專家之Apache Doris篇

本文是對《【硬剛大資料之學習路線篇】2021年從零到大資料專家的學習指南(全面更新版)》的面試部分補充。

硬剛大資料系列文章連結:

  • 2021年從零到大資料專家的學習指南(全面更新版)
  • 2021年從零到大資料專家之Hadoop/HDFS/Yarn篇
  • 2021年從零到大資料專家之SparkSQL篇
  • 2021年從零到大資料專家之消息隊列篇
  • 2021年從零到大資料專家之Spark篇
  • 2021年從零到大資料專家之Hbase篇
  • 2021年從零到大資料專家之Kafka篇
  • 2021年從零到大資料專家之Presto篇
  • 2021年從零到大資料專家之ClickHouse篇
  • 2021年從零到大資料專家之IceBerg篇
  • 2021年從零到大資料專家之IceBerg篇

DorisDB是由Apache Doris核心研發團隊打造的新一代企業級MPP資料庫。它繼承了Apache Doris項目十多年研發成果,累積了線上數千台伺服器穩定運作經驗,并在此基礎上,對傳統MPP資料庫進行了開創性的革新。

DorisDB重新定義了MPP分布式架構,叢集可擴充至數百節點,支援PB級資料規模,是目前唯一可以在大資料規模下進行線上彈性擴充的企業級分析型資料庫。

DorisDB還打造了全新的向量化執行引擎,單節點每秒可處理多達100億行資料,查詢速度比其他産品快10-100倍!

Doris 簡史

Doris 自第一版誕生以來,經過了 11 年的發展,中間做過無數改進。這⾥隻羅列對 Doris 發展來說⽐比較重要的關鍵節點與事件。

2008

Doris1 ,「築巢引鳳」的重要基石

在 Doris1 誕生之前,百度使用 MySQL Sharding 方式來為廣告主提供廣告報表支援。随着百度本身流量的增加,廣告流量也随之增加,已有的 MySQL Sharding 方案變得不再能夠滿足業務的需求。當時資料存儲和計算成熟的開源産品很少,Hbase 的導入性能隻有大約 2000 條/秒,不能滿足業務每小時新增的要求。而業務還在不斷增長,來自業務的壓力越來越大。在這種情況下,Doris1 誕生了,并且在 2008 年 10 月份跟随百度鳳巢系統一起正式上線。

2009

Doris2,解「百度統計」燃眉之急

2008 年的百度統計服務大約有 50-60 台 MySQL,但是業務每天有 3000 萬+條增量資料,由于 MySQL 的存儲和查詢性能無法滿足需求,對存量資料的支撐已經到了極限,問題頻出,萬般無奈之下百度統計甚至關閉了新增使用者的功能,以減少資料量的增加。

Doris1 由于當時時間緊、任務重,是以設計、實作的時候隻為了能夠滿足鳳巢的業務需求,并沒有兼顧其他的應用需求。2009 年 Doris2 研發完成後上線百度統計,并且成功支撐百度統計後續的快速增長,成功的助力百度統計成為當時國内規模最大,性能、功能最強的統計平台。

2010

Doris3 ,讓查詢再快一點

随着業務資料量的不斷增長,Doris2 系統的問題也逐漸成為業務發展的瓶頸。首先展現在 Doris2 無法滿足業務的查詢性能需求,主要是對于長時間跨度的查詢請求、以及大客戶的查詢請求。其次,Doris2 在日常運維方面基本上都需要停服後手動操作,比如 Schema Change、叢集擴縮容等,一方面使用者體驗很差,一方面還會增加叢集運維的成本。最後,Doris2 本身并不是高可用系統,機器故障等問題還是會影響服務的穩定性,并且需要人肉進行複雜的操作來恢複服務。為了解決 Doris2 的問題,團隊開始了 Doris3 設計、研發。Doris3 的主要架構中,DT(Data Transfer)負責資料導入、DS(Data Seacher)子產品負責資料查詢、DM(Data Master)子產品負責叢集中繼資料管理,資料則存儲在 Armor 分布式 Key-Value 引擎中。Doris3 依賴 ZooKeeper 存儲中繼資料,進而其他子產品依賴 ZooKeeper 做到了無狀态,進而整個系統能夠做到無故障單點。

在資料分布方面 Doris3 引入了分區的概念。

另外 Doris3 在日常運維 Schema Change,以及擴容、縮容等方面都做了針對性設計,使其能夠自動化進行,不依賴線上人工操作。

Doris3 在 2011 年完成開發後逐漸替換 Doris2 所制成的業務,并且成功的解決了大客戶查詢的問題。而公司内部後續的新需求,也都由 Doris3 來承擔支援。

2012

MySQL + Doris3 ,百度的第一個 OLAP 平台

2012 年随着 Doris3 逐漸遷移 Doris2 的同時,大資料時代悄然到來。在公司内部,随着百度業務的發展,各個業務端需要更加靈活的方式來分析已有的資料。而此時的 Doris3 仍然隻支援單表的統計分析查詢,還不能夠滿足業務進行多元分析的需求。是以,為了能夠支援業務的多元分析需求,Doris3 采用了 MySQL Storage Handler 的方式來擴充 Doris3。

2012

OLAP Engine,突破底層存儲束縛

Doris3 支援報表分析場景時,底層通用 Key-Value 存儲引擎的弊端也逐漸顯露。

為了能夠在底層存儲引擎上有所突破,OLAP Engine 項目啟動了。這個項目的發起者是當時從 Google 來的高 T,為百度帶來了當時業界最領先的底層報表引擎技術。

2013

用 PALO,玩轉 OLAP

底層技術的發展會激發上層業務的需求,而上層業務的需求同時會為底層的技術帶來新的挑戰。是以 Doris 亟需一款擁有分布式計算能力的查詢引擎。新産品的名字命名為 PALO,意為玩轉 OLAP。随着 PALO1 的正式上線,除了遷移所有 Doris3 已有的的業務外,也成功支援了當時百度内部大部分的 OLAP 分析場景。

2015

PALO 2,讓架構再簡單一點

如果說 PALO 1 是為了解決性能問題,那麼 PALO 2 主要是為了在架構上進行優化。通過 PALO2 的工作,系統架構本身變得相當簡潔,并且不需要任何依賴。

2017 and Future

Apache Doris (incubating) ,是更廣闊的世界

Palo 于 2017 年正式在 GitHub 上開源,并且在 2018 年貢獻給 Apache 社群,并将名字改為 Apache Doris(incubating)進行正式孵化。随着開源,Doris 已經在京東、美團、搜狐、小米等公司的生産環境中正式使用,也有越來越多的 Contributor 加入到 Doris 大家庭中。

整體架構 - MPP架構

彈性MPP架構-極簡架構

【硬剛大資料】從零到大資料專家之Apache Doris篇
  • FE前端節點-主要負責中繼資料的管理、查詢排程,解析sql的執行計劃給BE,
  • BE-資料的存儲和執行的引擎,這裡存儲和計算還是在一起的;
  • FE:leader 、follower(參與選舉),水準擴容
  • 對外提供了mysql相容的協定;
【硬剛大資料】從零到大資料專家之Apache Doris篇
  • 跟傳統架構的差別:
  • 通過分布式拆分成不同的task,再在中心節點彙聚;
  • druid、clickhouse都是類型;
  • MR是任務的拆分、落盤;
  • doris是MPP架構,任務之間分成task、全都在記憶體中執行和傳輸,所有任務都是流水線,沒有磁盤IO,适用于低延遲亞秒級查詢;
【硬剛大資料】從零到大資料專家之Apache Doris篇

sql查詢進入doris的過程:

  • 解析成邏輯執行計劃-A|B兩個表的scan -> Join ->聚合(group by K1 sum(V1) ),聚合操作 -> 最後再sort by sum(V1)排序 ;
  • MPP架構就是可以把執行計劃轉換成實體層面的,
  • 假設有3個節點,會把執行計劃類似fregment(有節點的組合)
  • scanB掃描B表的資料,可能通過一個brokercust、dataSink和exchange這樣的節點會把fregment串聯起來,每個fregment中會有不同的計算節點;比如資料經過廣播跟A表join,之後進行聚合操作;
  • 一個MPP就是支援兩層的聚合,每個節點做完聚合操作後最後彙總到一個節點再做一次;在doris中支援在中間做一次shuffle,shuffle完成之後在上層再做一次聚合,這樣子就不會有大單點的計算瓶頸。再推給上層去做排序。
  • 根據不同的機器每個fregment會拆成instent它執行的子單元,就可以充分發揮MPP的多基多合的能力,根據機器數量和設定的并行度,充分利用資源。

智能CBO查詢優化器

【硬剛大資料】從零到大資料專家之Apache Doris篇
  • dorisDB跟開源的apache doris有幾個改造點:
  • 在FE這邊的改造:
  • plan會根據cpu的成本預估,加入更多的統計資訊(列的基數、直方圖等等),能夠更準确的預估表的執行計劃。
  • 兩個表join時,使用brokercast join還是shuffle join還是其他join的一些方式,左右表過濾出來應該有多少行數等,哪個表作為左右表等;聚合函數用1層還是2層等等

極速向量化引擎

【硬剛大資料】從零到大資料專家之Apache Doris篇
  • BE
  • 計算+存儲,
  • 計算層: 向量化引擎,即把記憶體結構按照列的方式進行組織;跟之前按行來處理不一樣的地方是可以充分利用全新的cpu指令(單、多資料流),一條指令可以處理很多的資料。

高效的列式存儲

【硬剛大資料】從零到大資料專家之Apache Doris篇
  • 列式存儲:
  • 支援排序,選擇排序鍵,二分查找等方式。
  • 支援二級索引:bitmap、 bloom filter等
  • 會把複雜查詢推到存儲層。

現代化物化視圖加速

【硬剛大資料】從零到大資料專家之Apache Doris篇
  • 不同場景下預處理:
  • kylin的cube,doris中的物化視圖;
  • doris的物化視圖跟clickhouse的差別:clickhouse中是直接去查詢它的物化視圖,doris中會有一個路由,查詢的時候還是原表它會路由到最好的一張物化視圖中。

實時建構DWS資料

【硬剛大資料】從零到大資料專家之Apache Doris篇
  • 實時資料分析報表的場景:
  • flume-kafka-doris(進行實時資料的聚合)-BI工具的展示
  • Join優化 — colocated Join
    【硬剛大資料】從零到大資料專家之Apache Doris篇
  • doris多表關聯有一個明顯的優勢:
  • 原來的模組化傾向于寬表,一旦次元的變更就會導緻資料的重新重新整理,靈活性降低。
  • 現場關聯,秒級查詢傳回;
  • 除了高效的shuffle join外還會有一個colocate join 降低特别大的兩個表的資料傳輸量。
  • colocate join 在建表時就資料的分布方式,相同的資料可以哈希到一個桶中,所有的資料都可以在本地進行關聯操作,最後再在上層做一次資料的聚合。

極簡運維,彈性伸縮

【硬剛大資料】從零到大資料專家之Apache Doris篇

設計原理

海量分布式存儲系統Doris原理概述

Doris是一個海量分布式 KV 存儲系統,其設計目标是支援中等規模高可用可伸縮的 KV 存儲叢集。Doris可以實作海量存儲,線性伸縮、平滑擴容,自動容錯、故障轉移,高并發,且運維成本低。部署規模,建議部署4-100+台伺服器。

邏輯架構

【硬剛大資料】從零到大資料專家之Apache Doris篇

Doris采用兩層架構,Client 和 DataServer+Store。

有四個核心元件,Client、DataServer、Store、Administration。

應用程式通過Client SDK進行Doris的通路,每台伺服器上部署一個Data Sever做伺服器的管理,每台伺服器上有自己的存儲Store,整個叢集的資料存儲,每台機器獨立部署。資料通過路由選擇寫入到不同的機器中。

Administration為管理中心,提供配置、管理和監控。

config指應用程式啟動一個Data Server,在啟動時要配置管理中心的ip位址,通關管理中心。管理中心會修改配置項感覺到叢集中加了新機器,對新機器管理,擴容等。待機器處于可用狀态,将該機器的配置項通知給KV Client。進而KV Client進行新的路由選擇。擴容、下線機器等的控制台界面通過Management管理。Monitor監控機器是否正常。

KV Storage 概念模型

【硬剛大資料】從零到大資料專家之Apache Doris篇

client寫資料,綁定産品的namespace(邏輯隔離),構成新key,路由到具體機器上讀寫。

【硬剛大資料】從零到大資料專家之Apache Doris篇
  • 路由解析算法是設計的一個關鍵點,決定叢集的管理方式,也決定了叢集擴容的複雜性和難度。
  • Doris的算法類似redis,有桶的概念,key映射到1w個虛拟節點,虛拟節點在映射到實體節點。
  • 由于Doris設計時,用于4-100+規模的叢集。是以,Doris分了1w個虛拟節點,當伺服器超過100會導緻負載不均衡,1000會更差,相當于每一個叢集上有10個虛拟節點,虛拟節點會有10%的影響。
  • 擴容時,需要調節虛拟節點指向新的位置。具體過程為,暴利輪詢新節點添加後,一個伺服器上應該承載的虛拟節點個數,将超出的虛拟節點遷移到新機器即可。如上圖左圖有2個實體節點,擴容後,有3個實體節點,變為右圖。

基本通路架構

【硬剛大資料】從零到大資料專家之Apache Doris篇
  • 為了保證高可用。doris所有服務分成2個組,兩組伺服器對等。兩個group是可以有不同數量的伺服器。
  • 寫操作時,client的路由算法在兩個group分别選2個伺服器,分别(同時)寫入,兩個伺服器全部傳回後,再繼續向下進行。讀操作時,從兩個伺服器随機選一個讀。這樣,提高可用性,資料持久性,不會丢失。

監控檢測

【硬剛大資料】從零到大資料專家之Apache Doris篇

叢集管理的重要角色Config Server,有一個功能是負責發現故障伺服器。

發現故障的方式有2種:

  • ConfigServer對DataServer心跳檢測
  • Client通路時Fail報告

通常心跳檢測是慢的,幾秒進行一次心跳檢測。更多時候,是client Fail失敗報告發現無效伺服器,當寫入失敗時,Client會告訴Config Server。Config Server校驗,也通路失敗,則通知其他client。

基本原理

讀取資料流程

使用者可使用MySQL用戶端連接配接FE,執行SQL查詢, 獲得結果。

查詢流程如下:

① MySQL用戶端執行DQL SQL指令。

② FE解析, 分析, 改寫, 優化和規劃, 生成分布式執行計劃。

③ 分布式執行計劃由 若幹個可在單台be上執行的plan fragment構成, FE執行exec_plan_fragment, 将plan fragment分發給BE,指定其中一台BE為coordinator。

④ BE執行本地計算, 比如掃描資料。

⑤ 其他BE調用transimit_data将中間結果發送給BE coordinator節點彙總為最終結果。

⑥ FE調用fetch_data擷取最終結果。

⑦ FE将最終結果發送給MySQL client。

執行計劃在BE上的實際執行過程比較複雜, 采用向量化執行方式,比如一個算子産生4096個結果,輸出到下一個算子參與計算,而非batch方式或者one-tuple-at-a-time。

【硬剛大資料】從零到大資料專家之Apache Doris篇

導入資料流程

使用者建立表之後, 導入資料填充表.

  • 支援導入資料源有: 本地檔案, HDFS, Kafka和S3.
  • 支援導入方式有: 批量導入, 流式導入, 實時導入.
  • 支援的資料格式有: CSV, Parquet, ORC等.
  • 導入發起方式有: 用RESTful接口, 執行SQL指令.

資料導入的流程如下:

① 使用者選擇一台BE作為協調者, 發起資料導入請求, 傳入資料格式, 資料源和辨別此次資料導入的label, label用于避免資料重複導入. 使用者也可以向FE發起請求, FE會把請求重定向給BE.

② BE收到請求後, 向FE master節點上報, 執行loadTxnBegin, 建立全局事務。 因為導入過程中, 需要同時更新base表和物化索引的多個bucket, 為了保證資料導入的一緻性, 用事務控制本次導入的原子性.

③ BE建立事務成功後, 執行streamLoadPut調用, 從FE獲得本次資料導入的計劃. 資料導入, 可以看成是将資料分發到所涉及的全部的tablet副本上, BE從FE擷取的導入計劃包含資料的schema資訊和tablet副本資訊.

④ BE從資料源拉取資料, 根據base表和物化索引表的schema資訊, 構造内部資料格式.

⑤ BE根據分區分桶的規則和副本位置資訊, 将發往同一個BE的資料, 批量打包, 發送給BE, BE收到資料後, 将資料寫入到對應的tablet副本中.

⑥ 當BE coordinator節點完成此次資料導入, 向FE master節點執行loadTxnCommit, 送出全局事務, 發送本次資料導入的 執行情況, FE master确認所有涉及的tablet的多數副本都成功完成, 則釋出本次資料導入使資料對外可見, 否則, 導入失敗, 資料不可見, 背景負責清理掉不一緻的資料.

【硬剛大資料】從零到大資料專家之Apache Doris篇

修改中繼資料流程

更改中繼資料的操作有: 建立資料庫, 建立表, 建立物化視圖, 修改schema等等. 這樣的操作需要:

  • 持久化到永久存儲的裝置上;
  • 保證高可用, 複制FE多執行個體上, 避免單點故障;
  • 有的操作需要在BE上生效, 比如建立表時, 需要在BE上建立tablet副本.

中繼資料的更新操作流程如下:

① 使用者使用MySQL client執行SQL的DDL指令, 向FE的master節點發起請求; 比如: 建立表.

② FE檢查請求合法性, 然後向BE發起同步指令, 使操作在BE上生效; 比如: FE确定表的列類型是否合法, 計算tablet的副本的放置位置, 向BE發起請求, 建立tablet副本.

③ BE執行成功, 則修改記憶體的Catalog. 比如: 将table, partition, index, tablet的副本資訊儲存在Catalog中.

④ FE追加本次操作到EditLog并且持久化.

⑤ FE通過複制協定将EditLog的新增操作項同步到FE的follower節點.

⑥ FE的follower節點收到新追加的操作項後, 在自己的Catalog上按順序播放, 使得自己狀态追上FE master節點.

上述執行環節出現失敗, 則本次中繼資料修改失敗.

【硬剛大資料】從零到大資料專家之Apache Doris篇

表設計詳解

資料存儲基本原理

查找次元列的字首的查找過程為: 先查找shortkey index, 獲得邏輯塊的起始行号, 查找次元列的行号索引, 獲得目标列的資料塊, 讀取資料塊, 然後解壓解碼, 從資料塊中找到次元列字首對應的資料項.

加速資料處理

  • 列式存儲
【硬剛大資料】從零到大資料專家之Apache Doris篇

DorisDB的表和關系型資料相同, 由行和列構成. 每行資料對應使用者一條記錄, 每列資料有相同資料類型. 所有資料行的列數相同, 可以動态增删列. DorisDB中, 一張表的列可以分為次元列(也成為key列)和名額列(value列), 次元列用于分組和排序, 名額列可通過聚合函數SUM, COUNT, MIN, MAX, REPLACE, HLL_UNION, BITMAP_UNION等累加起來. 是以, DorisDB的表也可以認為是多元的key到多元名額的映射.

在DorisDB中, 表中資料按列存儲, 實體上, 一列資料會經過分塊編碼壓縮等操作, 然後持久化于非易失裝置, 但在邏輯上, 一列資料可以看成由相同類型的元素構成的數組. 一行資料的所有列在各自的列數組中保持對齊, 即擁有相同的數組下标, 該下标稱之為序号或者行号. 該序号是隐式, 不需要存儲的, 表中的所有行按照次元列, 做多重排序, 排序後的位置就是該行的行号.

查詢時, 如果指定了次元列的等值條件或者範圍條件, 并且這些條件中次元列可構成表次元列的字首, 則可以利用資料的有序性, 使用range-scan快速鎖定目标行.

  • 稀疏索引

當範圍查找時, 如何快速地找到起始的目标行呢? 答案是shortkey index. 如下圖所示: shortkey索引為稀疏索引,

【硬剛大資料】從零到大資料專家之Apache Doris篇

表模型介紹

為了描述友善, 我們借鑒關系模式中的主鍵概念, 稱DorisDB表的次元列的取值構成資料表的排序鍵, DorisDB的排序鍵對比傳統的主鍵具有:

  • 資料表所有次元列構成排序鍵, 是以後文中提及的排序列, key列本質上都是次元列.
  • 排序鍵可重複, 不必滿足唯一性限制.
  • 資料表的每一列, 以排序鍵的順序, 聚簇存儲.
  • 排序鍵使用稀疏索引.

對于攝入(ingest)的主鍵重複的多行資料, 填充于(populate)資料表中時, 按照三種處理方式劃分:

  • 明細模型: 表中存在主鍵重複的資料行, 和攝入資料行一一對應, 使用者可以召回所攝入的全部曆史資料.
  • 聚合模型: 表中不存在主鍵重複的資料行, 攝入的主鍵重複的資料行合并為一行, 這些資料行的名額列通過聚合函數合并, 使用者可以召回所攝入的全部曆史資料的累積結果, 但無法召回全部曆史資料.
  • 更新模型: 聚合模型的特殊情形, 主鍵滿足唯一性限制, 最近攝入的資料行, 替換掉其他主鍵重複的資料行. 相當于在聚合模型中, 為資料表的名額列指定的聚合函數為REPLACE, REPLACE函數傳回一組資料中的最新資料.

需要注意:

  • 建表語句, 排序列的定義必須出現在名額列定義之前.
  • 排序列在建表語句中的出現次序為資料行的多重排序的次序.
  • 排序鍵的稀疏索引(shortkey index)會選擇排序鍵的若幹字首列.

明細模型

DorisDB建表的預設模型是明細模型。

一般用明細模型來處理的場景有如下特點:

  • 需要保留原始的資料(例如原始日志,原始操作記錄等)來進行分析;
  • 查詢方式靈活, 不局限于預先定義的分析方式, 傳統的預聚合方式難以命中;
  • 資料更新不頻繁。導入資料的來源一般為日志資料或者是時序資料, 以追加寫為主要特點, 資料産生後就不會發生太多變化。

聚合模型

在資料分析領域,有很多需要對資料進行統計和彙總操作的場景。比如:

  • 分析網站或APP通路流量,統計使用者的通路總時長、通路總次數;
  • 廣告廠商為廣告主提供的廣告點選總量、展示總量、消費統計等;
  • 分析電商的全年的交易資料, 獲得某指定季度或者月份的, 各人口分類(geographic)的爆款商品.

适合采用聚合模型來分析的場景具有如下特點:

  • 業務方進行的查詢為彙總類查詢,比如sum、count、 max等類型的查詢;
  • 不需要召回原始的明細資料;
  • 老資料不會被頻繁更新,隻會追加新資料。

更新模型

有些分析場景之下,資料會更新, DorisDB采用更新模型來滿足這種需求。比如在電商場景中,定單的狀态經常會發生變化,每天的訂單更新量可突破上億。在這種量級的更新場景下進行實時資料分析,如果在明細模型下通過delete+insert的方式,是無法滿足頻繁更新需求的; 是以, 使用者需要使用更新模型來滿足資料分析需求。

以下是一些适合更新模型的場景特點:

  • 已經寫入的資料有大量的更新需求;
  • 需要進行實時資料分析。

資料分布

資料分布方式

  • 資料分布:資料分布是将資料劃分為子集, 按一定規則, 均衡地分布在不同節點上,以期最大限度地利用叢集的并發性能。
  • 短查詢:short-scan query,指掃描資料量不大,單機就能完成掃描的查詢。
  • 長查詢:long-scan query,指掃描資料量大,多機并行掃描能顯著提升性能的查詢。

常見的四種資料分布方式有:(a) Round-Robin、(b) Range、(c) List和(d) Hash (DeWitt and Gray, 1992)。如下圖所示:

【硬剛大資料】從零到大資料專家之Apache Doris篇
  • Round-Robin : 以輪轉的方式把資料逐個放置在相鄰節點上。
  • Range : 按區間進行資料分布,圖中區間[1-3],[4-6]分别對應不同Range。
  • List : 直接基于離散的各個取值做資料分布,性别、省份等資料就滿足這種離散的特性。每個離散值會映射到一個節點上,不同的多個取值可能也會映射到相同節點上。
  • Hash : 按哈希函數把資料映射到不同節點上。

如何選擇排序鍵

排序鍵基本原理

  • 資料傾斜:業務方如果确定資料有很大程度的傾斜,那麼建議采用多列組合的方式進行資料分桶,而不是隻單獨采用傾斜度大的列做分桶。
  • 高并發:分區和分桶應該盡量覆寫查詢語句所帶的條件,這樣可以有效減少掃描資料,提高并發。
  • 高吞吐:盡量把資料打散,讓叢集以更高的并發掃描資料,完成相應計算。

dynamic_partition.enable : 是否開啟動态分區特性,可指定為 TRUE 或 FALSE。如果不填寫,預設為 TRUE。

dynamic_partition.time_unit : 動态分區排程的粒度,可指定為 DAY/WEEK/MONTH。

  • 指定為 DAY 時,分區名字尾需為yyyyMMdd,例如20200325。圖1 就是一個按天分區的例子,分區名的字尾滿足yyyyMMdd。 PARTITION p20200321 VALUES LESS THAN ("2020-03-22"), PARTITION p20200322 VALUES LESS THAN ("2020-03-23"), PARTITION p20200323 VALUES LESS THAN ("2020-03-24"), PARTITION p20200324 VALUES LESS THAN ("2020-03-25")
  • 指定為 WEEK 時,分區名字尾需為yyyy_ww,例如2020_13代表2020年第13周。
  • 指定為 MONTH 時,動态建立的分區名字尾格式為 yyyyMM,例如 202003。

dynamic_partition.start: 動态分區的開始時間。以當天為基準,超過該時間範圍的分區将會被删除。如果不填寫,則預設為Integer.MIN_VALUE 即 -2147483648。

dynamic_partition.end: 動态分區的結束時間。 以當天為基準,會提前建立N個機關的分區範圍。

dynamic_partition.prefix : 動态建立的分區名字首。

dynamic_partition.buckets : 動态建立的分區所對應的分桶數量。

  • 指定為 DAY 時,分區名字尾需為yyyyMMdd,例如20200325。
  • 指定為 WEEK 時,分區名字尾需為yyyy_ww,例如 2020_13, 代表2020年第13周。
  • 指定為 MONTH 時,動态建立的分區名字尾格式為 yyyyMM,例如 202003。

DorisDB中為加速查詢,在内部組織并存儲資料時,會把表中資料按照指定的列進行排序,這部分用于排序的列(可以是一個或多個列),可以稱之為Sort Key。明細模型中Sort Key就是指定的用于排序的列(即 DUPLICATE KEY 指定的列),聚合模型中Sort Key列就是用于聚合的列(即 AGGREGATE KEY 指定的列),更新模型中Sort Key就是指定的滿足唯一性限制的列(即 UNIQUE KEY 指定的列)。

核心功能

存儲結構設計解析

Doris是基于MPP架構的互動式SQL資料倉庫,主要用于解決近實時的報表和多元分析。Doris高效的導入、查詢離不開其存儲結構精巧的設計。

設計目标

  • 批量導入,少量更新
  • 絕大多數的讀請求
  • 寬表場景,讀取大量行,少量列
  • 非事務場景
  • 良好的擴充性

儲存檔案格式

1、存儲目錄結構

存儲層對存儲資料的管理通過storage_root_path路徑進行配置,路徑可以是多個。存儲目錄下一層按照分桶進行組織,分桶目錄下存放具體的tablet,按照tablet_id命名子目錄。

Segment檔案存放在tablet_id目錄下按SchemaHash管理。Segment檔案可以有多個,一般按照大小進行分割,預設為256MB。其中,Segment v2檔案命名規則為:${rowset_id}_${segment_id}.dat。

2、Segment v2檔案結構

Segment整體的檔案格式分為資料區域,索引區域和footer三個部分

  • Data Region: 用于存儲各個列的資料資訊,這裡的資料是按需分page加載的
  • Index Region: Doris中将各個列的index資料統一存儲在Index Region,這裡的資料會按照列粒度進行加載,是以跟列的資料資訊分開存儲
  • Footer資訊

SegmentFooterPB: 定義檔案的中繼資料資訊

4個位元組的FooterPB内容的checksum

4個位元組的FileFooterPB消息長度,用于讀取FileFooterPB

8個位元組的MAGIC CODE,之是以在末位存儲,是友善不同的場景進行檔案類型的識别

Footer資訊

Footer資訊段在檔案的尾部,存儲了檔案的整體結構,包括資料域的位置,索引域的位置等資訊,其中有SegmentFooterPB,CheckSum,Length,MAGIC CODE 4個部分。

SegmentFooterPB資料結構如下:

【硬剛大資料】從零到大資料專家之Apache Doris篇

SegmentFooterPB采用了PB格式進行存儲,主要包含了列的meta資訊、索引的meta資訊,Segment的short key索引資訊、總行數。

1、列的meta資訊

  • ColumnId:目前列在schema中的序号
  • UniqueId:全局唯一的id
  • Type:列的類型資訊
  • Length:列的長度資訊
  • Encoding:編碼格式
  • Compression:壓縮格式
  • Dict PagePointer:字典資訊

2、列索引的meta資訊

  • OrdinalIndex:存放列的稀疏索引meta資訊。
  • ZoneMapIndex:存放ZoneMap索引的meta資訊,内容包括了最大值、最小值、是否有空值、是否沒有非空值。SegmentZoneMap存放了全局的ZoneMap資訊,PageZoneMaps則存放了每個頁面的統計資訊。
  • BitMapIndex:存放BitMap索引的meta資訊,内容包括了BitMap類型,字典資料BitMap資料。
  • BloomFilterIndex:存放了BloomFilter索引資訊。

Ordinal Index (一級索引)

Ordinal Index索引提供了通過行号來查找Column Data Page資料頁的實體位址。Ordinal Index能夠将按列存儲資料按行對齊,可以了解為一級索引。其他索引查找資料時,都要通過Ordinal Index查找資料Page的位置。是以,這裡先介紹Ordinal Index索引。

在一個segment中,資料始終按照key(AGGREGATE KEY、UNIQ KEY 和 DUPLICATE KEY)排序順序進行存儲,即key的排序決定了資料存儲的實體結構。确定了列資料的實體結構順序,在寫入資料時,Column Data Page是由Ordinal index進行管理,Ordinal index記錄了每個Column Data Page的位置offset、大小size和第一個資料項行号資訊,即Ordinal。這樣每個列具有按行資訊進行快速掃描的能力。

列資料存儲

Column的data資料按照Page為機關分塊存儲,每個Page大小一般為64*1024個位元組。

Page在存儲的位置和大小由ordinal index管理。

1、data page存儲結構

DataPage主要為Data部分、Page Footer兩個部分。

Data部分存放了目前Page的列的資料。當允許存在Null值時,對空值單獨存放了Null值的Bitmap,由RLE格式編碼通過bool類型記錄Null值的行号。

Page Footer包含了Page類型Type、UncompressedSize未壓縮時的資料大小、FirstOrdinal目前Page第一行的RowId、NumValues為目前Page的行數、NullMapSize對應了NullBitmap的大小。

2、資料壓縮

針對不同的字段類型采用了不同的編碼。預設情況下,針對不同類型采用的對應關系如下:

【硬剛大資料】從零到大資料專家之Apache Doris篇

預設采用LZ4F格式對資料進行壓縮。

存儲結構

1、存儲結構

Short Key Index字首索引,是在key(AGGREGATE KEY、UNIQ KEY 和 DUPLICATE KEY)排序的基礎上,實作的一種根據給定字首列,快速查詢資料的索引方式。這裡Short Key Index索引也采用了稀疏索引結構,在資料寫入過程中,每隔一定行數,會生成一個索引項。這個行數為索引粒度預設為1024行,可配置。

【硬剛大資料】從零到大資料專家之Apache Doris篇

其中,KeyBytes中存放了索引項資料,OffsetBytes存放了索引項在KeyBytes中的偏移。

2、索引生成規則

Short Key Index采用了前36 個位元組,作為這行資料的字首索引。當遇到 VARCHAR 類型時,字首索引會直接截斷。

ZoneMap Index索引

ZoneMap索引存儲了Segment和每個列對應每個Page的統計資訊。這些統計資訊可以幫助在查詢時提速,減少掃描資料量,統計資訊包括了Min最大值、Max最小值、HashNull空值、HasNotNull不全為空的資訊。

BloomFilter

當一些字段不能利用Short Key Index并且字段存在區分度比較大時,Doris提供了BloomFilter索引。

Bitmap Index索引

Doris還提供了BitmapIndex用來加速資料的查詢。

寫入流程、删除流程分析

Doris 針對不同場景支援了多種形式的資料寫入方式,其中包括了從其他存儲源導入 Broker Load、http 同步資料導入 Stream Load、例行的 Routine Load 導入和 Insert Into 寫入等。同時導入流程會涉及 FE 子產品(主要負責導入規劃生成和導入任務的排程工作)、BE 子產品(主要負責資料的 ETL 和存儲)、Broker 子產品(提供 Doris 讀取遠端存儲系統中檔案的能力)。其中 Broker 子產品僅在 Broker Load 類型的導入中應用。

下面以 Stream Load 寫入為例子,描述了 Doris 的整體的資料寫入流程如下圖所示:

【硬剛大資料】從零到大資料專家之Apache Doris篇

流程描述如下:

1、FE 接收使用者的寫入請求,并随機選出 BE 作為 Coordinator BE。将使用者的請求重定向到這個 BE 上。

2、Coordinator BE 負責接收使用者的資料寫入請求,同時請求 FE 生成執行計劃并對排程、管理導入任務 LoadJob 和導入事務。

3、Coordinator BE 排程執行導入計劃,執行對資料校驗、清理之後。

4、資料寫入到 BE 的存儲層中。在這個過程中會先寫入到記憶體中,寫滿一定資料後按照存儲層的資料格式寫入到實體磁盤上。

資料分發流程

資料在經過清洗過濾後,會通過 Open/AddBatch 請求分批量的将資料發送給存儲層的 BE 節點上。在一個 BE 上支援多個 LoadJob 任務同時并發寫入執行。LoadChannelMgr 負責管理了這些任務,并對資料進行分發。

資料分發和寫入過程如下圖所示:

【硬剛大資料】從零到大資料專家之Apache Doris篇
  • 每次導入任務 LoadJob 會建立一個 LoadChannel 來執行,LoadChannel 維護了一次導入的通道,LoadChannel 可以将資料分批量寫入操作直到導入完成。
  • LoadChannel 會建立一個 TabletsChannel 執行具體的導入操作。一個 TabletsChannel 對應多個 Tablet。一次資料批量寫入操作中,TabletsChannel 将資料分發給對應 Tablet,由 DeltaWriter 将資料寫入到 Tablet,便開始了真正的寫入操作。

DeltaWriter 與 Memtable

DeltaWriter 主要負責不斷接收新寫入的批量資料,完成單個 Tablet 的資料寫入。由于新增的資料可以是增量 Delta 部分,是以叫做 DeltaWriter。

DeltaWriter 資料寫入采用了類 LSM 樹的結構,将資料先寫到 Memtable 中,當 Memtable 資料寫滿後,會異步 flush 生成一個 Segment 進行持久化,同時生成一個新的 Memtable 繼續接收新增資料導入,這個 flush 操作由 MemtableFlushExecutor 執行器完成。

Memtable 中采用了跳表的結構對資料進行排序,排序規則使用了按照 schema 的 key 的順序依次對字段進行比較。這樣保證了寫入的每一個寫入 Segment 中的資料是有序的。如果目前模型為非 DUP 模型(AGG 模型和 UNIQUE 模型)時,還會對相同 key 的資料進行聚合。

實體寫入

1、RowsetWriter 各個子產品設計

在實體存儲層面的寫入,由 RowsetWriter 完成。RowsetWriter 中又分為 SegmentWriter、ColumnWriter、PageBuilder、IndexBuilder 等子子產品。

  • 其中 RowsetWriter 從整體上完成一次導入 LoadJob 任務的寫入,一次導入 LoadJob 任務會生成一個 Rowset,一個 Rowset 表示一次導入成功生效的資料版本。實作上由 RowsetWriter 負責完成 Rowset 的寫入。
  • SegmentWriter 負責實作 Segment 的寫入。一個 Rowset 可以由多個 Segment 檔案組成。
  • ColumnWriter 被包含在 SegmentWriter 中,Segment 的檔案是完全的列存儲結構,Segment 中包含了各個列和相關的索引資料,每個列的寫入由 ColumnWriter 負責寫入。
  • 在檔案存儲格式中,資料和索引都是按 Page 進行組織,ColumnWriter 中又包含了生成資料 Page 的 PageBuilder 和生成索引 Page 的 IndexBuilder 來完成 Page 的寫入。
  • 最後,FileWritableBlock 來負責具體的檔案的讀寫。

2、RowsetWriter 寫入流程

【硬剛大資料】從零到大資料專家之Apache Doris篇

實體寫入流程的較長的描述:

1.當一個 Memtable 寫滿時(預設為 100M),将 Memtable 的資料會 flush 到磁盤上,這時 Memtable 内的資料是按 key 有序的。然後逐行寫入到 RowsetWriter 中。

2.RowsetWriter 将資料同樣逐行寫入到 SegmentWriter 中,RowsetWriter 會維護目前正在寫入的 SegmentWriter 以及要寫入的檔案塊清單。每完成寫入一個 Segment 會增加一個檔案塊對應。

3.SegmentWriter 将資料按行寫入到各個 ColumnWriter 的中,同時寫入 ShortKeyIndexBuilder。ShortKeyIndexBuilder 主要負責生成 ShortKeyIndex 的索引 Page 頁。具體的 ShortKeyIndex 索引格式可以參見《Doris 存儲層設計介紹 1——存儲結構設計解析》文檔。

4.ColumnWriter 将資料分别寫入 PageBuilder 和各個 IndexBuilder,PageBuilder 用來生成 ColumnData 資料的 PageBuilder,各個 IndexBuilder 包括了(OrdinalIndexBuilder 生成 OrdinalIndex 行号稀疏索引的 Page 格式、ZoneMapIndexBuilder 生成 ZoneMapIndex 索引的 Page 格式、BitMapIndexBuilder 生成 BitMapIndex 索引的 Page 格式、BloomFilterIndexBuilder 生成 BloomFilterIndex 索引的 Page 格式)。具體參考 Doris 存儲檔案格式解析。

5.添加完資料後,RowsetWriter 執行 flush 操作。

6.SegmentWriter 的 flush 操作,将資料和索引寫入到磁盤。其中對磁盤的讀寫由 FileWritableBlock 完成。

7.ColumnWriter 将各自資料、索引生成的 Page 順序寫入到檔案中。

8.SegmentWriter 生成 SegmentFooter 資訊,SegmentFooter 記錄了 Segment 檔案的原資料資訊。完成寫入操作後,RowsetWriter 會再開啟新的 SegmentWriter,将下一個 Memtable 寫入新的 Segment,直到導入完成。

Rowset 釋出

整個釋出過程如下:

1.DeltaWriter 統計目前 RowsetMeta 中繼資料資訊,包括行數、位元組數、時間、Segment 數量。

2.儲存到 RowsetMeta 中,向 FE 送出導入事務。目前導入事務由 FE 開啟,用來保證一次導入在各個 BE 節點的資料的同時生效。

3.在 FE 協調好之後,由 FE 統一下發 Publish 任務使導入的 Rowset 版本生效。任務中指定了釋出的生效 version 版本資訊。之後 BE 存儲層才會将這個版本的 Rowset 設定為可見。

4.Rowset 加入到 BE 存儲層的 Tablet 進行管理。

删除流程

目前 Delete 有兩種實作,一種普通的删除類型為 DELETE,一種為 LOAD_DELETE。

DELETE 執行流程

DELETE 的支援一般的删除操作,實作較為簡單,DELETE 模式下沒有對資料進行實際删除操作,而是對資料删除條件進行了記錄。存儲在 Meta 資訊中。當執行 Base Compaction 時删除條件會一起被合入到 Base 版本中。Base 版本為 Tablet 從[0-x]的第一個 Rowset 資料版本。具體流程如下:

1.删除時由 FE 直接下發删除指令和删除條件。

2.BE 在本地啟動一個 EngineBatchLoadTask 任務,生成新版本的 Rowset,并記錄删除條件資訊。這個删除記錄的 Rowset 與寫入過程的略有不同,該 Rowset 僅記錄了删除條件資訊,沒有實際的資料。

3.FE 同樣釋出生效版本。其中會将 Rowset 加入到 Tablet 中,儲存 TabletMeta 資訊。

LOAD_DELETE 執行流程

LOAD_DELETE 支援了在 UNIQUE KEY 模型下,實作了通過批量導入要删除的 key 對資料進行删除,能夠支援大量資料删除能力。整體思路是在資料記錄中加入删除狀态辨別,在 Compaction 流程中會對删除的 key 進行壓縮。

資料模型和物化視圖

聚合模型

聚合模型的特點就是将表中的列分為了 Key 和 Value 兩種。 Key 就是資料的次元列,比如時間,地區等等。Value 則是資料的名額列,比如點選量,花費等。每個名額列還會有自己的聚合函數,包括 sum、min、max 和 bitmap_union 等。資料會根據次元列進行分組,并對名額列進行聚合。

【硬剛大資料】從零到大資料專家之Apache Doris篇

首先是導入資料 ,原始資料在導入過程中,會根據表結構中的 Key 進行分組,相同 Key 的 Value 會根據表中定義的 Aggregation Function 進行聚合。

【硬剛大資料】從零到大資料專家之Apache Doris篇

由于 Doris 采用的是 MVCC 機制進行的并發控制,是以每一次新的導入都是一個新的版本。我們把這種版本稱為 Singleton 。

不斷的導入新的資料後,盡管同一批次的資料在導入過程中已經發生了聚合,但不同版本之間的資料依舊存在次元列相同但是名額列并沒有被聚合的情況。這時候就需要通過 Compaction 機制進行二次聚合。

【硬剛大資料】從零到大資料專家之Apache Doris篇

Compaction 的意思其實就是将不同版本的資料進行合并。它 分為兩個階段,第一個階段是: 當 Singleton 的資料版本個數到達 Doris 設定的門檻值時,就會觸發 Cumulative 級别的 Compaction。 這個級别的 Compaction 會将一個區間段内的版本資料根據定義好的聚合函數進行再聚合。

【硬剛大資料】從零到大資料專家之Apache Doris篇

說完聚合模型,再介紹一種聚合模型上的 提升查詢效率 的方式—— 建構 Rollup

【硬剛大資料】從零到大資料專家之Apache Doris篇

Rollup 也就是上卷,是一種在多元分析中比較常用的操作——也就是從細粒度的資料向高層的聚合。

在 Doris 中,我們提供了在聚合模型上的建構 Rollup 功能,将資料根據更少的次元進行預聚合。将本身在使用者查詢時才會進行聚合計算的資料預先計算好,并存儲在 Doris 中,進而達到提升使用者粗粒度上的查詢效率。

Rollup 還有一點好處在于,由于 Doris 具有在原始資料上實時計算的能力,是以不需要對所有次元的每個組合都建立 Rollup。尤其是在次元很多的情況下,可以取得一個存儲空間和查詢效率之間的平衡。

【硬剛大資料】從零到大資料專家之Apache Doris篇

在建立 Rollup 的時候首先你需要有一個聚合模型的 Base 表,然後就可以取部分次元建立一個 Rollup 表。

聚合模型的優點就在于:劃分維護和名額列後,資料本身已經進行過預聚合,對于分析型查詢效率提升明顯。

  • 但是聚合模型在某些使用者場景下并不适用:
  • 很多業務并沒有聚合的需求,就是要存儲原始的使用者行為日志。
  • 一些業務在初期還不能确認哪些是次元列,哪些是名額列
  • 聚合模型本身更難了解,對新使用者體驗不好,比如一些查詢結果和使用者預期的不一緻。

基于以上問題,我們增加了對明細資料模型的支援。

明細模型

【硬剛大資料】從零到大資料專家之Apache Doris篇

明細資料模型剛好和聚合模型相反,不區分維護和名額列,并不對導入的資料做任何聚合,每條原始資料都會保留在表中。

明細模型就像 Mysql 中的表一樣,優勢就在于你可以詳細追溯每個使用者行為或訂單詳情。但劣勢也很明顯,分析型的查詢效率不高。

Doris 的物化視圖

物化視圖的出現主要是為了滿足使用者,既能對原始明細資料的任意次元分析,也能快速的對固定次元進行分析查詢的需求。

首先,什麼是物化視圖?

【硬剛大資料】從零到大資料專家之Apache Doris篇

從定義上來說,就是包含了查詢結果的資料庫對象,可能是對遠端資料的本地 Copy;也可能是一個表或多表 Join 後結果的行或列的子集;也可能是聚合後的結果。說白了,就是預先存儲查詢結果的一種資料庫對象。

在 Doris 中的物化視圖,就是查詢結果預先存儲起來的特殊的表。

它的優勢在于:

1.對于那些經常重複的使用相同的子查詢結果的查詢性能大幅提升

2.Doris 自動更新物化視圖的資料,保證 Base 表和物化視圖表的資料一緻性。無需額外的維護成本

3.查詢的時候也可以自動比對最優的物化視圖

物化視圖

目前支援的聚合函數包括常用的 sum、min、max、count 以及 pv、uv, 留存率等計算時常用的去重算法 hll_union,和用于精确去重計算 count(distinct) 的算法 bitmap_union。

【硬剛大資料】從零到大資料專家之Apache Doris篇

使用物化視圖功能後,由于物化視圖實際上是損失了部分次元資料的。是以對表的 DML 類型操作會有一些限制。

使用物化視圖功能後,由于物化視圖實際上是損失了部分次元資料的。是以對表的 DML 類型操作會有一些限制。

對于物化視圖和 Rollup 來說,他們的共同點都是 通過預聚合 的方式來提升查詢效率。 實際上物化視圖是 Rollup 的一個超集,在覆寫 Rollup 的工作同時,還支援更靈活的聚合方式。

是以,如果對資料的分析需求既 覆寫了明細查詢也存在分析類查詢 ,則可以先建立一個明細模型的表,并建構物化視圖。

Doris SQL 原了解析

SQL解析在下文中指的是将一條sql語句經過一系列的解析最後生成一個完整的實體執行計劃的過程。

這個過程包括以下四個步驟:詞法分析,文法分析,生成邏輯計劃,生成實體計劃。

【硬剛大資料】從零到大資料專家之Apache Doris篇

設計目标

Doris SQL解析架構的設計有以下目标:

  • 最大化計算的并行性
  • 最小化資料的網絡傳輸
  • 最大化減少需要掃描的資料

總體架構

Doris SQL解析具體包括了五個步驟:詞法分析,文法分析,生成單機邏輯計劃,生成分布式邏輯計劃,生成實體執行計劃。

具體代碼實作上包含以下五個步驟:Parse, Analyze, SinglePlan, DistributedPlan, Schedule。

【硬剛大資料】從零到大資料專家之Apache Doris篇

下文側重介紹查詢SQL的解析。

下圖展示了一個簡單的查詢SQL在Doris的解析實作。

【硬剛大資料】從零到大資料專家之Apache Doris篇

Parse階段

詞法分析采用jflex技術,文法分析采用java cup parser技術,最後生成抽象文法樹(Abstract Syntax Tree)AST,這些都是現有的、成熟的技術,在這裡不進行詳細介紹。

AST是一種樹狀結構,代表着一條SQL。不同類型的查詢select, insert, show, set, alter table, create table等經過Parse階段後生成不同的資料結構(SelectStmt, InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt等),但他們都繼承自Statement,并根據自己的文法規則進行一些特定的處理。例如:對于select類型的sql, Parse之後生成了SelectStmt結構。

SelectStmt結構包含了SelectList,FromClause,WhereClause,GroupByClause,SortInfo等結構。這些結構又包含了更基礎的一些資料結構,如WhereClause包含了BetweenPredicate(between表達式), BinaryPredicate(二進制表達式), CompoundPredicate(and or組合表達式), InPredicate(in表達式)等。

Analyze階段

抽象文法樹是由StatementBase這個抽象類表示。這個抽象類包含一個最重要的成員函數analyze(),用來執行Analyze階段要做的事。

不同類型的查詢select, insert, show, set, alter table, create table等經過Parse階段後生成不同的資料結構(SelectStmt, InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt等),這些資料結構繼承自StatementBase,并實作analyze()函數,對特定類型的SQL進行特定的Analyze。

例如:select類型的查詢,會轉成對select sql的子語句SelectList, FromClause, GroupByClause, HavingClause, WhereClause, SortInfo等的analyze()。然後這些子語句再各自對自己的子結構進行進一步的analyze(),通過層層疊代,把各種類型的sql的各種情景都分析完畢。例如:WhereClause進一步分析其包含的BetweenPredicate(between表達式), BinaryPredicate(二進制表達式), CompoundPredicate(and or組合表達式), InPredicate(in表達式)等。

生成單機邏輯Plan階段

這部分工作主要是根據AST抽象文法樹生成代數關系,也就是俗稱的算子數。樹上的每個節點都是一個算子,代表着一種操作。

ScanNode代表着對一個表的掃描操作,将一個表的資料讀出來。HashJoinNode代表着join操作,小表在記憶體中建構哈希表,周遊大表找到連接配接鍵相同的值。Project表示投影操作,代表着最後需要輸出的列,圖檔表示隻用輸出citycode這一列。

【硬剛大資料】從零到大資料專家之Apache Doris篇

生成分布式Plan階段

有了單機的PlanNode樹之後,就需要進一步根據分布式環境,拆成分布式PlanFragment樹(PlanFragment用來表示獨立的執行單元),畢竟一個表的資料分散地存儲在多台主機上,完全可以讓一些計算并行起來。

這個步驟的主要目标是最大化并行度和資料本地化。主要方法是将能夠并行執行的節點拆分出去單獨建立一個PlanFragment,用ExchangeNode代替被拆分出去的節點,用來接收資料。拆分出去的節點增加一個DataSinkNode,用來将計算之後的資料傳送到ExchangeNode中,做進一步的處理。

這一步采用遞歸的方法,自底向上,周遊整個PlanNode樹,然後給樹上的每個葉子節點建立一個PlanFragment,如果碰到父節點,則考慮将其中能夠并行執行的子節點拆分出去,父節點和保留下來的子節點組成一個parent PlanFragment。拆分出去的子節點增加一個父節點DataSinkNode組成一個child PlanFragment,child PlanFragment指向parent PlanFragment。這樣就确定了資料的流動方向。

Schedule階段

這一步是根據分布式邏輯計劃,建立分布式實體計劃。主要解決以下問題:

  • 哪個 BE 執行哪個 PlanFragment
  • 每個 Tablet 選擇哪個副本去查詢
  • 如何進行多執行個體并發

實踐

Apache Doris 基于 Bitmap 的精确去重和使用者行為分析

How Doris Count Distinct without Bitmap

Doris 除了支援 HLL 近似去重,也是支援 Runtime 現場精确去重的。實作方法和 Spark,MR 類似。

【硬剛大資料】從零到大資料專家之Apache Doris篇

對于上圖計算 PV 的 SQL,Doris 在計算時,會按照下圖進行計算,先根據 page 列和 user_id 列 group by,最後再 count。

【硬剛大資料】從零到大資料專家之Apache Doris篇

顯然,上面的計算方式,當資料量越來越大,到幾十億,幾百億時,使用的 IO 資源,CPU 資源,記憶體資源,網絡資源就會越來越多,查詢也會越來越慢。

那麼,下面一個自然而然的問題就是,應該如何讓 Doris 的精确去重查詢性能更快呢?

How To Make Count Distinct More Faster

  1. 堆機器
  2. Cache
  3. 優化 CPU 執行引擎 (向量化,SIMD,查詢編譯等)
  4. 支援 GPU 執行引擎
  5. 預計算

How Doris Count Distinct With Bitmap

要在 Doris 中預計算,自然要用到 Doris 的聚合模型,下面簡單看下 Doris 中的聚合模型:

【硬剛大資料】從零到大資料專家之Apache Doris篇

Doris 的聚合模型分為 Key 列和 Value 列,Key 列就是次元列,Value 列是名額列,Key 列全局有序,每個 Value 列會有對應的聚合函數,相同 Key 列的 Value 會根據對應的聚合函數進行聚合。上圖中,Year,City 是 Key 列,Cost 是 Value 列,Cost 對應的聚合函數式 Sum。Doris 支援根據不同次元組合建立不同的 Rollup 表,并能在查詢時自動路由。

是以要在 Doris 中實作 Count Distinct 的預計算,就是實作一種 Count Distinct 的聚合名額。那麼可以像 Sum,Min,Max 聚合名額一樣直接實作一種 Count Distinct 聚合名額嗎?

【硬剛大資料】從零到大資料專家之Apache Doris篇

Doris 中聚合名額必須支援上卷。 但如果隻保留每個 City 的 User 的去重值,就沒辦法上卷聚合出隻有 Year 為次元的時候 User 的去重值,因為去重值不能直接相加,已經把明細丢失了,不知道在 2016 或 2017 年,北京和上海不重合的 User 有多少。

是以去重名額要支援上卷聚合,就必須保留明細,不能隻保留一個最終的去重值。而計算機保留資訊的最小機關是一個 bit,是以很自然的想到用 Bitmap 來保留去重名額的明細資料。

【硬剛大資料】從零到大資料專家之Apache Doris篇

Roaring Bitmap 的核心思路很簡單,就是根據資料的不同特征采用不同的存儲或壓縮方式。 為了實作這一點,Roaring Bitmap 首先進行了分桶,将整個 int 域拆成了 2 的 16 次方 65536 個桶,每個桶最多包含 65536 個元素。

是以一個 int 的高 16 位決定了,它位于哪個桶,桶裡隻存儲低 16 位。以圖中的例子來說,62 的前 1000 個倍數,高 16 位都是 0,是以都在第一個桶裡。

【硬剛大資料】從零到大資料專家之Apache Doris篇

然後在桶粒度針對不同的資料特點,采用不同的存儲或壓縮方式:

【硬剛大資料】從零到大資料專家之Apache Doris篇

預設會采用 16 位的 Short 數組來存儲低 16 位資料,當元素個數超過 4096 時,會采用 Bitmap 來存儲資料。

第 3 類 Run Container 是優化連續的資料, Run 指的是 Run Length Encoding(RLE)

在做字典映射時,使用比較廣泛的資料結構是 Trie 樹。

Trie 樹的問題是字典對應的編碼值是基于節點位置決定的,是以 Trie 樹是不可變的。這樣沒辦法用來實作全局字典,因為要做全局字典必然要支援追加。

【硬剛大資料】從零到大資料專家之Apache Doris篇

如何讓同一個 String 永遠映射到同一個 ID。一個簡單的思路是把 String 對應的 ID 直接序列化下來,因為全局字典隻需要支援 String 到 ID 的單向查找,不需要支援 ID 到 String 的反向查找。

當全局字典越來越大的時候,就會面臨記憶體不足的問題。一個自然的想法就是 Split。當全局字典拆成多個子樹之後,必然會涉及到每個子樹的按需加載和删除,這個功能是使用 Guava 的 LoadingCache 實作的。

為了解決讀寫沖突的問題,實作了 MVCC,使得讀寫可以同時進行。全局字典目前是存儲在 HDFS 上的,一個全局字典目錄下會有多個 Version,讀的時候永遠讀取最新 Version 的資料,寫的時候會先寫到臨時目錄,完成後再拷貝到最新的 Version 目錄。同時為了保證全局字典的串行更新,引入分布式鎖。

目前基于 Trie 樹的全局字典存在的一個問題是,全局字典的編碼過程是串行的,沒有分布式化,是以當全局字典的基數到幾十億規模時,編碼過程就會很慢。一個可行的思路是,類似 Roaring Bitmap,可以将整個 Int 域進行分桶,每個桶對應固定範圍的 ID 編碼值,每個 String 通過 Hash 決定它會落到哪個桶内,這樣全局字典的編碼過程就可以并發。

【硬剛大資料】從零到大資料專家之Apache Doris篇

正是由于目前基于 Trie 樹的全局字典 無法分布式建構,滴滴的同學引入了基于 Hive 表的全局字典。

【硬剛大資料】從零到大資料專家之Apache Doris篇

這種方案中全局字典本身是一張 Hive 表,Hive 表有兩個列,一個是原始值,一個是編碼的 Int 值,然後通過上面的 4 步就可以通過 Spark 或者 MR 實作全局字典的更新,和對事實表中 Value 列的替換。

基于 Hive 表的全局字典相比基于 Trie 樹的全局字典的優點除了可以分布式化,還可以實作全局字典的複用。但是缺點也是顯而易見,相比基于 Trie 樹的全局字典,會使用多幾倍的資源,因為原始事實表會被讀取多次,而且還有兩次 Join。

How to Use Doris Bitmap

Create Table (為了有更好的加速效果,最好建下 ROLLUP)

CREATE TABLE `pv_bitmap` (

`dt` int,

`page` varchar(10),

`user_id` bitmap bitmap_union

)

AGGREGATE KEY(`dt`, page)

DISTRIBUTED BY HASH(`dt`) BUCKETS 2;

ALTER TABLE pv_bitmap ADD ROLLUP pv (page, user_id);
           

Load Data

cat data | curl --location-trusted -u user:passwd -T -

-H "columns: dt,page,user_id, user_id=$BITMAP_LOAD_FUNCTION(user_id)"

http://host:8410/api/test/pv_bitmap/_stream_load

TO_BITMAP(expr) : 将 0 ~ 4294967295 的 unsigned int 轉為 bitmap

BITMAP_HASH(expr): 将任意類型的列通過 Hash 的方式轉為 bitmap

BITMAP_EMPTY(): 生成空 bitmap,用于 insert 或導入的時填充預設值
           

Query

select bitmap_count(bitmap_union(user_id)) from pv_bitmap;                                select bitmap_union_count(user_id) from pv_bitmap;                                select bitmap_union_int(id) from pv_bitmap;

BITMAP_UNION(expr) : 計算兩個 Bitmap 的并集,傳回值是序列化後的 Bitmap 值

BITMAP_COUNT(expr) : 計算 Bitmap 的基數值

BITMAP_UNION_COUNT(expr): 和 BITMAP_COUNT(BITMAP_UNION(expr)) 等價

BITMAP_UNION_INT(expr) : 和 COUNT(DISTINCT expr) 等價(僅支援TINYINT,SMALLINT 和 INT)
           

Insert Into ( 可以加速無需上卷的精确去重查詢場景 )

insert into bitmaptable1 (id, id2) VALUES (1001, tobitmap(1000)), (1001, to_bitmap(2000));

insert into bitmaptable1 select id, bitmapunion(id2) from bitmap_table2 group by id;

insert into bitmaptable1 select id, bitmaphash(id_string) from table;
           

基于 Bitmap 的使用者行為分析

使用者行為分析從字面意思上講,就是分析使用者的行為。分析使用者的哪些行為呢?可以簡單用 5W2H 來總結。即 Who(誰)、What(做了什麼行為)、When(什麼時間)、Where(在哪裡)、Why(目的是什麼)、How(通過什麼方式),How much (用了多長時間、花了多少錢)。

其終極目的就是為了不斷優化産品,提升使用者體驗,讓使用者花更多的時間,花更多的錢在自己的産品上。

目前使用者行為分析的解法大概有這麼幾種:

第一種就資料庫的 Join 解法,一般效率是比較低的。我們在 Doris 中是可以用這種思路實作的。

第二種是基于明細資料的,UDAF 實作。Doris 也是支援的。

第三種是基于 Bitmap 的 UDAF 實作的,也就是今天要分享的。

第四種是用專用的系統來做使用者行為分析,專用系統的好處是可以針對特定場景,做更多的優化。

Doris Intersect_count

現在已經在 Doris 的聚合模型中支援了 Bitmap,是以可以基于 Bitmap 實作各類 UDF, UDAF,來實作大多數使用者行為分析。

Intersect_count 計算留存

select intersect_count(user_id, dt, '20191111') as first_day,

intersect_count(user_id, dt, '20191112') as second_day,

intersect_count(user_id, dt, '20191111', '20191112') as retention,

from table

where dt in ('20191111', '20191112')
           

假如有 user_id 和 page 的資訊,我們希望知道在通路美團頁面之後,又有多少使用者通路了外賣頁面,也可以用 intersect_count 來進行計算。

Intersect_count 篩選特定使用者

select

intersect_count(user_id, tag_value, '男', '90後', '10-20萬')

from user_profile

where (tag_type='性别' and tag_value='男')

or (tag_type='年齡' and tag_value='90後')

or (tag_type='收入' and tag_value='10-20萬')
           

最後也可以通過 intersect_count 來進行一些特定使用者的篩選。例如原始表裡有 user_id,tag_value,tag_type 這些資訊,我們想計算年收入 10-20 萬的 90 後男性有多少,就可以用這個簡單的 SQL 來實作。

Doris Bitmap ToDo

  • 全局字典進行開源,支援任意類型的精确去重
  • 支援 Int64,支援 Int64 後一方面支援更高基數的 bitmap 精确去重,另一方面如果原始資料中有 bigint 類型的資料便不需要全局字典進行編碼。
  • 支援 Array 類型。很多使用者行為分析的場景下的 UDAF 或 UDF,用 Array 表達更加友善和規範。
  • 更友善更智能的批量建立 Rollup。當使用者基數到達十多億時,Bitmap 本身會比較大,而且對幾十萬個 Bitmap 求交的開銷也會很大,是以還是需要建立 Rollup 來進行加速查詢。更進一步,我們期望可以做到根據使用者的查詢特點去自動建立 Rollup。
  • 希望支援更多、更複雜的使用者行為分析。

Summary

如果應用基數在百萬、千萬量級,并擁有幾十台機器,那麼直接選擇 count distinct 即可;

如果希望進行使用者行為分析,可以考慮 IntersectCount 或者自定義 UDAF。

Reference

Apache Doris 在美團外賣數倉中的應用實踐

外賣營運業務特點

外賣業務為大家提供送餐服務,連接配接商家與使用者,這是一個勞動密集型的業務,外賣業務有上萬人的營運團隊來服務全國幾百萬的商家,并以“商圈”為單元,服務于“商圈”内的商家。“商圈”及其上層組織機構是一個變化次元,當“商圈”邊界發生變化時,就導緻在往常日增量的業務生産方式中,曆史資料的回溯失去了參考意義。在所有展現組織機構資料的業務場景中,組織機構的變化是一個繞不開的技術問題。此外,商家品類、類型等其它次元也存在變化維的問題。如下圖所示:

【硬剛大資料】從零到大資料專家之Apache Doris篇

資料生産面臨的挑戰

資料爆炸,每日使用最新次元對曆史資料進行回溯計算。在 Kylin 的 MOLAP 模式下存在如下問題:

  • 曆史資料每日重新整理,失去了增量的意義。
  • 每日回溯曆史資料量大,10 億+的曆史資料回溯。
  • 資料計算耗時 3 小時+,存儲 1TB+,消耗大量計算存儲資源,同時嚴重影響 SLA 的穩定性。
  • 預計算的大量曆史資料實際使用率低下,實際工作中對曆史的回溯 80%集中在近 1 個月左右,但為了應對所有需求場景,業務要求計算近半年以上的曆史。
  • 不支援明細資料的查詢。

解決方案:引入 MPP 引擎,資料現用現算

既然變化維的曆史資料預計算成本巨大,最好的辦法就是現用現算,但現用現算需要強大的并行計算能力。OLAP 的實作有 MOLAP、ROLAP、HOLAP 三種形式。長期以來,由于傳統關系型 DBMS 的資料處理能力有限,是以 ROLAP 模式受到很大的局限性。随着分布式、并行化技術成熟應用,MPP 引擎逐漸表現出強大的高吞吐、低延遲時間計算能力,号稱“億級秒開”的引擎不在少數,ROLAP 模式可以得到更好的延伸。例如:日資料量的 ROLAP 現場計算,周、月趨勢的計算,以及明細資料的浏覽都可以較好的應對。

下圖是 MOLAP 模式與 ROLAP 模式下應用方案的比較:

【硬剛大資料】從零到大資料專家之Apache Doris篇

MOLAP 模式的劣勢

  • 應用層模型複雜,根據業務需要以及 Kylin 生産需要,還要做較多模型預處理。這樣在不同的業務場景中,模型的使用率也比較低。
  • Kylin 配置過程繁瑣,需要配置模型設計,并配合适當的“剪枝”政策,以實作計算成本與查詢效率的平衡。
  • 由于 MOLAP 不支援明細資料的查詢,在“彙總+明細”的應用場景中,明細資料需要同步到 DBMS 引擎來響應互動,增加了生産的運維成本。
  • 較多的預處理伴随着較高的生産成本。

ROLAP 模式的優勢

  • 應用層模型設計簡化,将資料固定在一個穩定的資料粒度即可。比如商家粒度的星形模型,同時複用率也比較高。
  • App 層的業務表達可以通過視圖進行封裝,減少了資料備援,同時提高了應用的靈活性,降低了運維成本。
  • 同時支援“彙總+明細”。
  • 模型輕量标準化,極大的降低了生産成本。

綜上所述,在變化維、非預設維、細粒度統計的應用場景下,使用 MPP 引擎驅動的 ROLAP 模式,可以簡化模型設計,減少預計算的代價,并通過強大的實時計算能力,可以支撐良好的實時互動體驗。

Doris 引擎在美團的重要改進

Join 謂詞下推的傳遞性優化

【硬剛大資料】從零到大資料專家之Apache Doris篇

如上圖所示,對于下面的 SQL:

select * from t1 join t2 on t1.id = t2.id where t1.id = 1
           

Doris 開源版本預設會對 t2 表進行全表 Scan,這樣會導緻上面的查詢逾時,進而導緻外賣業務在 Doris 上的第一批應用無法上線。

于是在 Doris 中實作了第一個優化:Join 謂詞下推的傳遞性優化(MySQL 和 TiDB 中稱之為 Constant Propagation)。Join 謂詞下推的傳遞性優化是指:基于謂詞 t1.id = t2.id 和 t1.id = 1, 可以推斷出新的謂詞 t2.id = 1,并将謂詞 t2.id = 1 下推到 t2 的 Scan 節點。這樣假如 t2 表有數百個分區的話,查詢性能就會有數十倍甚至上百倍的提升,因為 t2 表參與 Scan 和 Join 的資料量會顯著減少。

查詢執行多執行個體并發優化

【硬剛大資料】從零到大資料專家之Apache Doris篇

如上圖所示,Doris 預設在每個節點上為每個算子隻會生成 1 個執行執行個體。這樣的話,如果資料量很大,每個執行執行個體的算子就需要處理大量的資料,而且無法充分利用叢集的 CPU、IO、記憶體等資源。

一個比較容易想到的優化手段是,可以在每個節點上為每個算子生成多個執行執行個體。這樣每個算子隻需要處理少量資料,而且多個執行執行個體可以并行執行。

下圖是并發度設定為 5 的優化效果,可以看到對于多種類型的查詢,會有 3 到 5 倍的查詢性能提升:

【硬剛大資料】從零到大資料專家之Apache Doris篇

Colocate Join

Colocate Join(Local Join)是和 Shuffle Join、Broadcast Join 相對的概念,即将兩表的資料提前按照 Join Key Shard,這樣在 Join 執行時就沒有資料網絡傳輸的開銷,兩表可以直接在本地進行 Join。

整個 Colocate Join 在 Doris 中實作的關鍵點如下:

  • 資料導入時保證資料本地性。
  • 查詢排程時保證資料本地性。
  • 資料 Balance 後保證資料本地性。
  • 查詢 Plan 的修改。
  • Colocate Table 中繼資料的持久化和一緻性。
  • Hash Join 的粒度從 Server 粒度變為 Bucket 粒度。
  • Colocate Join 的條件判定。

對于下面的 SQL,Doris Colocate Join 和 Shuffle Join 在不同資料量下的性能對比如下:

select count(*) FROM A t1 INNER JOIN [shuffle] B t5    ON ((t1.dt = t5.dt) AND (t1.id = t5.id)) INNER JOIN [shuffle] C t6    ON ((t1.dt = t6.dt) AND (t1.id = t6.id)) where t1.dt in (xxx days);
           
【硬剛大資料】從零到大資料專家之Apache Doris篇

Bitmap 精确去重

Doris 之前實作精确去重的方式是現場計算的,實作方法和 Spark、MapReduce 類似:

【硬剛大資料】從零到大資料專家之Apache Doris篇

對于上圖計算 PV 的 SQL,Doris 在計算時,會按照下圖的方式進行計算,先根據 page 列和 user_id 列 group by,最後再 Count:

【硬剛大資料】從零到大資料專家之Apache Doris篇

顯然,上面的計算方式,當資料量越來越大,到幾十億幾百億時,使用的 IO 資源、CPU 資源、記憶體資源、網絡資源會變得越來越多,查詢也會變得越來越慢。

于是在 Doris 中新增了一種 Bitmap 聚合名額,資料導入時,相同次元列的資料會使用 Bitmap 聚合。有了 Bitmap 後,Doris 中計算精确去重的方式如下:

【硬剛大資料】從零到大資料專家之Apache Doris篇

可以看到,當使用 Bitmap 之後,之前的 PV 計算過程會大幅簡化,現場查詢時的 IO、CPU、記憶體,網絡資源也會顯著減少,并且不再會随着資料規模而線性增加。

總結與思考

實踐證明,以 Doris 引擎為驅動的 ROLAP 模式可以較好地處理彙總與明細、變化維的曆史回溯、非預設維的靈活應用、準實時的批處理等場景。而以 Kylin 為基礎的 MOLAP 模式在處理增量業務分析,固化次元場景,通過預計算以空間換時間方面依然重要。

業務方面,通過外賣數倉 Doris 的成功實踐以及跨事業群的交流,美團已經有更多的團隊了解并嘗試使用了 Doris 方案。而且在平台同學的共同努力下,引擎性能還有較大提升空間,相信以 Doris 引擎為驅動的 ROLAP 模式會為美團的業務團隊帶來更大的收益。從目前實踐效果看,其完全有替代 Kylin、Druid、ES 等引擎的趨勢。

目前,資料庫技術進步飛速,近期柏睿資料釋出全記憶體分布式資料庫 RapidsDB v4.0 支援 TB 級毫秒響應(處理千億資料可實作毫秒級響應)。可以預見,資料庫技術的進步将大大改善數倉的分層管理與應用支撐效率,業務将變得“定義即可見”,也将極大地提升資料的價值。

Apache Doris 在京東廣告的應用實踐

原有系統存在的問題

主要表現以下幾個方面:

  1. 原有系統已經逐漸無法滿足我們日常業務的性能需求。
  2. 日常業務所需的 Schema Change,Rollup 等操作,在原有系統上有極高的人力成本。
  3. 原有系統的資料無法遷移,擴容需要重刷全部曆史資料,運維成本極高。
  4. 在“618”和“雙 11”的時候,原有系統會成為我們對外服務的一個隐患。

是以需要一個合适的資料查詢引擎來替代原有系統,考慮到團隊的人力和研發能力,選擇使用開源的 OLAP 引擎來替換原有系統。

技術選型

  • 查詢

為廣告主提供線上報表資料查詢服務,是以該 OLAP 查詢引擎必須滿足:可以支援高并發查詢,可以毫秒級傳回資料,且可以随着業務的發展水準擴充。此外也承接了越來越多營運和采銷同僚的多元資料分析的需求,是以希望該 OLAP 引擎也可以支援高吞吐的 Ad-hoc 查詢。

【硬剛大資料】從零到大資料專家之Apache Doris篇
  • 資料導入

需要同時支援離線(T+1)大規模資料和實時(分鐘級間隔)資料的導入,資料導入後即可查詢,保證資料導入的實時性和原子性。離線資料(幾十 G)的導入任務需要在 1 小時内完成,實時資料(百 M 到幾 G)的導入任務需要在 10 分鐘内完成。

  • 擴容

在“618”這類大促前通常會進行擴容,是以需要新系統擴容友善,無需重刷曆史資料來重新分布資料,且擴容後原有機器的資料最好可以很友善地遷移到新機器上,避免造成資料傾斜。

根據日常業務的需要,經常會進行 Schema Change 操作。由于原有系統對這方面的支援很差,希望新系統可以進行 Online Schema Change,且對線上查詢無影響。

  • 資料修複

由于業務的日常變更會對一些表進行資料修複,是以新系統需要支援錯誤資料的删除,進而無需重刷全部曆史資料,避免人力和計算資源的浪費。

目前開源的 OLAP 引擎很多,但由于面臨大促的壓力,需要盡快完成選型并進行資料遷移,是以隻考察比較出名的幾個 OLAP 系統:ClickHouse,Druid 和 Doris。

最終選擇 Doris 來替換原有系統,主要基于以下幾方面的考慮:

  1. Doris 的查詢速度是亞秒級的,并且相對 ClickHouse 來說,Doris 對高并發的支援要優秀得多。
  2. Doris 擴容友善,資料可以自動進行負載均衡,解決了原有系統的痛點。ClickHouse 在擴容時需要進行資料重分布,工作量比較大。
  3. Doris 支援 Rollup 和 Online Schema Change,這對日常業務需求十分友好。而且由于支援 MySQL 協定,Doris 可以很好地和之前已有的系統進行融合。而 Druid 對标準 SQL 的支援有限,并且不支援 MySQL 協定,改造成本很高。

廣告場景應用

經過對系統的改造,目前使用 Doris 作為系統中的一個資料存儲層,彙總了離線和實時資料,也為上層查詢系統提供統一的效果資料查詢接口。如下圖所示:

【硬剛大資料】從零到大資料專家之Apache Doris篇
  • 資料導入

日常實時資料主要包含展現/點選跟單資料,DMP 人群包的效果資料以及十幾條産品線的點選,展現和消耗資料,導入時間間隔從 1 分鐘到 1 小時不等,資料量在百 M 左右的可以秒級導入,資料量在 1G 左右的可以在 1 分鐘内完成。離線資料主要包含搜尋詞的效果資料和各種營銷工具的基礎資料,大多數都是 T+1 資料,每日新增資料量在 20G-30G,導入耗時在 10-20 分鐘。

  • 預計算

大多數效果資料報表,廣告主的查詢次元相對固定且可控,但要求能在毫秒級傳回資料,是以必須保證這些查詢場景下的性能。Doris 支援的聚合模型可以進行資料的預聚合,将點選,展現,消耗等資料彙總到建表時指定的次元。

此外,Doris 支援建立 Rollup 表(即物化視圖)也可以在不同次元上進行預聚合,這種自定義的方式相比 Kylin 的自動建構 cube,有效避免了資料的膨脹,在滿足查詢時延的要求下,降低了磁盤占用。Doris 還可以通過 Rollup 表對次元列的順序進行調整,避免了 Kylin 中因過濾次元列在 HBase RowKey 後部而造成的查詢性能低下。

  • 現場計算

對于一些為廣告主提供的營銷工具,次元和名額通常會有 30~60 列之多,而且大部分查詢要求按照所有次元列進行聚合,由于次元列較多,這種查詢隻能依賴于現場計算能力。目前對于這種類型的查詢請求,會将其資料盡量均勻分布到多台 BE 上,利用 Doris MPP 架構的特性,并行計算,并通過控制查詢時間範圍(一個月),可以使 TP99 達到 3s 左右。

  • 業務舉例

正是由于 Doris 具有自定義的預計算能力和不俗的現場計算能力,簡化了日常工作。以為廣告主提供的營銷工具“行業大盤”為例,如圖所示,這種業務場景下,不僅要計算廣告主自身的名額資料,還需計算廣告主所在類目的名額資料,進而進行對比。

原有系統資料分片隻能按照指定列進行散列,沒有分布式查詢計劃,就不能彙總類目次元資料。原先為了解決這種業務場景,雖然底層是同一資料源,但需要建兩個表,一個是廣告主次元表,一個是類目次元表,維護了兩個資料流,增大了工作負擔。

使用了 Doris 之後,廣告主次元表可以 Rollup 出類目次元表。查詢廣告主次元資料時可以根據分區分桶(按照時間分區,按照廣告主 ID 分桶)确定一個 Tablet,縮小資料查詢範圍,查詢效率很高。查詢類目次元時,資料已經按照廣告主 ID 進行分片 ,可以充分利用 Doris 現場計算的能力,多個 BE 并行計算,實時計算類目次元資料,在我們的線上環境也能實作秒級查詢。這種方案下資料查詢更加靈活,無需為了查詢性能而維護多個預計算資料,也可以避免多張表之間出現資料不一緻的問題。

實際使用效果

  • 日常需求

Doris 支援聚合模型,可以提前聚合好資料,對計算廣告效果資料點選,展現和消耗十分适合。對一些資料量較大的高基數表,可以對查詢進行分析,建立不同次元或者順序的的 Rollup 表來滿足查詢性能的需求。

Doris 支援 Online Schema Change,相比原有系統 Schema Change 需要多個子產品關聯,耗費多個人力數天才能進行的操作,Doris 隻需一條 SQL 且在較短時間内就可以完成。對于日常需求來說,最常見的 Schema Change 操作就是加列,Doris 對于加列操作使用的是 Linked Schema Change 方式,該方式可以無需轉換資料,直接完成,新導入的資料按照新的 Schema 進行導入,舊資料可以按照新加列的預設值進行查詢,無需重刷曆史資料。

Doris 通過 HLL 列和 BITMAP 列支援了近似/精确去重的功能,解決了之前無法計算 UV 的問題。

日常資料修複,相較于以前有了更多的方式可以選擇。對于一些不是很敏感的資料,我們可以删除錯誤資料并進行重新導入;對于一些比較重要的線上資料,我們可以使用 Spark on Doris 計算正确資料和錯誤資料之間的內插補點,并導入增量進行修複。這樣的好處是,不會暴露一個中間狀态給廣告主。還有一些業務會對一個或多個月的資料進行重刷。目前在測試使用 Doris 0.12 版本提供的 Temp Partition 功能,該功能可以先将正确資料導入到 Temp Partition 中,完成後可以将要删除的 Partition 和 Temp Partition 進行交換,交換操作是原子性的,對于上層使用者無感覺。

  • 大促備戰

Doris 添加新的 BE 節點後可以自動遷移 Tablet 到新節點上,達到資料負載均衡。通過添加 FE 節點,則可以支撐更高的查詢峰值,滿足大促高并發的要求。

大促期間資料導入量會暴增,而且在備戰期間,也會有憋單演練,在短時間内會産生大量資料導入任務。通過導入子產品限制 Load 的并發,可以避免大量資料的同時導入,保證了 Doris 的導入性能。

Doris 在團隊已經經曆了數次大促,在所有大促期間無事故發生,查詢峰值 4500+qps,日查詢總量 8 千萬+,TP99 毫秒級,資料日增量近 300 億行,且實時導入資料秒級延遲。

  • 使用實踐

Doris 支援低延時的高并發查詢和高吞吐的 Ad-hoc 查詢,但是這兩類查詢會互相影響,遷移到 Doris 的初期日常線上的主要問題就是高吞吐的查詢占用資源過多,導緻大量低延時的查詢逾時。後來使用兩個叢集來對兩類查詢進行實體隔離,解決了該問題。

Doris 在 0.11 版本時 FE 的 MySQL 服務 IO 線程模型較為簡單,使用一個 Acceptor+ThreadPool 來完成 MySQL 協定的通信過程,單個 FE 節點在并發較高(2000+qps 左右)的時候會出現連接配接不上的問題,但此時 CPU 占用并不高。在 0.12 版本的時候,Doris 支援了 NIO,解決了這個問題,可以支撐更高的并發。也可以使用長連接配接解決這個問題,但需要注意 Doris 預設對連接配接數有限制,連接配接占滿了就無法建立新的連接配接了。

基于 Doris 的小程式使用者增長實踐

首先為什麼要做思域精細化營運呢,這起源于兩個痛點:

  • 私域使用者的價值不突出
  • 比如:有 100 萬個使用者,想給高收入人群去推薦奢侈品的包包,但是不知道在這 100 萬人裡面有多少人是這種高收入人群
  • 缺乏主動觸達的能力

然後針對這兩個問題,産品上面提出了一個解決方案 -- 就是分層營運,它主要分為兩部分:一個是營運觸達,還有一個是精細化的人群。

這套解決方案的收益和價值:

對于開發者來說:

  • 合理地利用私域流量提升價值
  • 促進使用者活躍和轉化

對于整個生态來講:

  • 提高了私欲使用率和活躍度
  • 激活了開發者主動經營的意願
  • 促進了生态的良性循環

使用者分層技術難點

首先給大家簡單介紹遇到的四個難點:

1.TB 級資料。資料量特别大,前面講到是基于畫像和行為去做的一個使用者分層,資料量是特别大的,每天的資料量規模是 1T +

2.查詢的頻響要求極高,毫秒級到秒級的一個要求。前面介紹 B 端視角功能時大家有看到,有一個預估人數的功能,使用者隻要點選 ”預估人數“ 按鈕,需要從 TB 級的資料量級裡面計算出篩選出的人群人數是多少,這種要在秒級時間計算 TB 級的數量的一個結果的難度其實可想而知

3.計算複雜,需要動靜組合。怎麼了解?就是現在很多元度是沒辦法去做預聚合的,必須去存明細資料,然後去實時的計算,這個後面也會細講

4.産出使用者包的時效性要求高。這個比較好了解,如果産出特别慢的話,肯定會影響使用者體驗

針對上面的四個難點,解法是:

針對第一個難點 --> 壓縮存儲,降低查詢的數量級。

具體選型就是使用 Bitmap 存儲,這解法其實很好了解,不管現在主流的 OLAP 引 擎有多麼厲害,資料量越大,查詢肯定會越慢,不可能說資料量越大,我查詢還是一直不變的,這種其實不存在的,是以我們就需要降低存儲。

針對第二和第三個難點 --> 選擇合适計算引擎

在調研了目前開源的包括 ClickHouse, Doris, Druid 等多種引擎後,最終選擇了基于 MPP 架構的 OLAP 引擎 Doris。

這裡可以簡單跟大家介紹一下選擇 Doris 的原因,從性能來說其實都差不多,但是都 Doris 有幾個優點:

第一:它是相容 Mysql 協定,也就是說你的學習成本非常低,基本上大家隻要了解 mysql, 就會用 Doris, 不需要很大的學習成本。

第二:Doris 運維成本很低,基本上就是自動化運維。

針對第四個難點 --> 選擇合适的引擎

通過對比 Spark 和 Doris,我們選擇了 Doris ,後面會詳細講為什麼會用 Doris。

使用者分層的架構和解決方案

分層營運架構:

【硬剛大資料】從零到大資料專家之Apache Doris篇

架構的話分為兩部分,就是線上部分跟離線部分。

線上部分:

分為了四層:服務層、解析層、計算層跟存儲層,然後還有排程平台和監控平台。

服務層,主要功能包含:

  • 權限控制:主要是戶權限、接口權限的控制
  • 分層管理:主要是是對使用者篩選的增删改查
  • 中繼資料管理:主要是對頁面元素、ID-Mapping 這類資料的管理
  • 任務管理:主要是支援排程平台任務的增删改查

解析層,是對 DSL 的一個解析、優化、路由以及 Sql 模闆:

比如要查線上預估人數,首先會在解析層做一個 DSL 的解析,之後根據不同情景做 DSL 的優化,比如選擇了近七天活躍且近七天不活躍的使用者,這種要七天活躍和七天不活躍的交集顯然就是零了,對不對?像這樣情況在優化層直接将結果 0 傳回給使用者就不會再往下走計算引擎,類似還有很多其他優化場景。然後優化完之後會使用 DSL 路由功能,根據不同查詢路由到不同的 Sql 模闆進行模闆的拼接。

計算層,計算引擎使用 Spark 和 Doris:

  • Spark:離線任務
  • Doris:實時任務

存儲層:

  • Mysql:主要用來存使用者分層的一些使用者資訊
  • Redis:主要用作緩存
  • Doris:主要存儲畫像資料和行為資料
  • AFS:主要是存儲産出的使用者包的一些資訊

排程平台:

  • 主要是離線任務的排程
  • 監控平台:
  • 整個服務穩定性的監控

離線部分:

離線部分的話主要是對需要的資料源(比如說畫像、關注、行為等資料源)做 ETL 清洗,清洗完之後會做一個全局字典并寫入 Doris。任務最終會産出使用者包,并會分發給小程式 B 端跟百度統計:

  • 小程式 B 端:推送給手機端使用者
  • 百度統計:拿這些使用者包做一次群體分析

以上就是一個整體的架構。

圖中大家可以看到有幾個标紅的地方,同時也用數字 1、2、3 做了标記,這幾個标紅是重點子產品,就是針對于上面提到的四個難點做的重點子產品改造,接下來會針對這三個重點子產品一一展開進行講解。

1、全局字典

首先講解全局字典這個子產品,全局字典的目的主要是為了解決難點一:資料量大,需要壓縮存儲同時壓縮存儲之後還要保證查詢性能。

為啥要用全局字典:

這裡大家可能會有一個疑問,就是說用 BitMap 存儲為啥還要做全局字典?這個主要是因為 Doris 的 BitMap 功能是基于 RoaringBitmap 實作的,是以假如說使用者 ID 過于離散的時候,RoaringBitmap 底層存儲結構用的是 Array Container 而不是 BitMap Container,Array Container 性能遠遠差于 BitMap Container。是以我們要使用全局字典将使用者 ID 映射成連續遞增的 ID,這就是使用全局字典的目的。

全局字典的更新邏輯概況:

這裡是使用 Spark 程式來實作的,首先加載經過 ETL 清洗之後各個資料源(畫像、關注、行為這些資料源)和全局字典曆史表(用來維護維護使用者 ID 跟自增 ID 映射關系),加載完之後會判斷 ETL 裡面的使用者 ID 是否已經存在字典表裡面,如果有的話,就直接把 ETL 的資料寫回 Doris 就行了,如果沒有就說明這是一個新使用者,然後會用 row_number 方法生成一個自增 ID ,跟這個新使用者做一次映射,映射完之後更新到全局字典并寫入 Doris。

2、Doris

接下來介紹第二個重點子產品 Doris。

2.1 Doris 分桶政策

【硬剛大資料】從零到大資料專家之Apache Doris篇

分桶政策的目的是為了解決難點二:查詢頻響要求高。

為啥要做分桶政策:

之前使用了全局字典保證使用者的連續遞增,但是發現用了全局字典之後,BitMap 的查詢性能其實并沒有達到預期。 Doris 其實是分布式的一個叢集,它會按照某些 Key 進行分桶,也就是分桶之後使用者 ID 在桶内就不連續,又變成零散的了。

【硬剛大資料】從零到大資料專家之Apache Doris篇

方案其實就是在表裡面增加了一個 hid 的字段,然後讓 Doris 按照 hid 字段進行分桶,這裡 hid 生成算法是:

hid = V/(M/N) 然後取整

其中:

  • V:全局字典的使用者 ID 對應的整數
  • M:預估的使用者總數
  • N:分層數

大家可以看一下:userid 是六個即 0~5,是以 M= 6;分為三個桶,N = 3;是以 M 除以 N 就等于二。這樣的話我就要用 userid 去除以二,然後取整作為 hid。可以看一下,比如說 userid 是零,0÷2 取整為 0 ,userid 是一的話,hid 還是這樣,因為 1÷2 的整數部分是零;同理 2÷2 、3÷2 是一,4÷2、5÷2 是二,這樣的話就把 userid 跟 hid 做對應,然後再根據 hid 做分層。大家可以看到分層結果,hid = 0 時 userid 是 0、1,hid = 1 時 userid 是 2、3,hid = 2 時 userid 是 4、5,這樣就保證了桶内連續。

2.2 doris 之使用者畫像标簽優化

【硬剛大資料】從零到大資料專家之Apache Doris篇

畫像标簽優化解決的難點也是難點二:查詢頻響要求高。

方案一:

tag_type, tag_value 。tag_type 是用來記錄标簽的類型,tag_value 是用來記錄标簽的内容。

如圖所示:比如說 tag_type 是性别,tag_value 可能是男或女,bitmap 這裡就是存儲所有性别是男的使用者 id 清單。

同樣對于 tag_type 是地域、tag_value 是北京,bitmap 存儲的是所有地域在北京的使用者 id 清單。

方案二:

大寬表,使用大寬表在一行記錄了所有的标簽,然後使用 bitmap 記錄這個标簽的使用者 id 清單。

最終選擇方案二,為什麼沒有選方案一呢 ?因為方案一它是一個标簽對應一個使用者 bitmap,當想查一個聯合的結果就比較耗時,比如想查詢性别是男且區域是北京的所有使用者,這樣的話需要取出 “男” 的使用者和 “北京“ 的使用者,兩者之間做一個交集。肯定會有計算量會有更多的時間消耗,但是如果用大寬表去做存儲的話,就可以根據使用者常用的查詢去建構一個物化視圖,當使用者的查詢(比如在北京的男性)命中了物化視圖,就可以直接去取結果,而不用再去做計算,進而降低耗時。

這裡還有一個知識點跟大家分享一下:在使用 Doris 的時候,一定要盡量去命中它的字首索引跟物化視圖,這樣會大大的提升查詢效率。

2.3 doris 之動靜組合查詢

【硬剛大資料】從零到大資料專家之Apache Doris篇

動靜組合查詢,對應的難點是難點三:計算複雜。

首先介紹一下什麼叫動靜組合查詢:

  • 靜态查詢:定義為使用者次元是固定的,就是可以進行預聚合的查詢為靜态查詢。比如說男性使用者,男性使用者個就是一個固定的群體,不管怎麼查使用者肯定不會變,就可以提前進行預聚合的。
  • 動态查詢:主要偏向于一些行為,就是那種查詢跟着使用者的不同而不同。比如說查近 30 天收藏超過三次的使用者,或者還有可能是近 30 天收藏超過四次的使用者,這種的話就很随意,使用者可能會查詢的次元會特别的多,而且也沒法沒辦法進行一個預聚合,是以稱之為動态的一個查詢。

然後小程式使用者分層,相比于同類型的使用者分層功能增加了使用者行為篩選,這也是小程式産品的特點之一。比如說可以查近 30 天使用者支付訂單超過 30 元的男性, 這種 ”近 30 天使用者支付訂單超過 30 元“ 的查詢是沒辦法用 bitmap 做記錄的,也沒辦法說提前計算好,隻能線上去算。這種就是一個難點,就是說怎麼用非 bitmap 表和 bitmap 做交并補集的運算,為了解決這個問題,結合上面的例子把查詢拆分為四步:查近 30 天使用者支付訂單超過 30 元的男性,且年齡在 20 ~30 歲的使用者(具體查詢語句參考 PPT 圖檔)

第一步先查 20~30 歲的男性使用者。因為是比較固定,這裡可以直接查 bitmap 表。

第二步要查近 30 天使用者支付訂單超過 30 元的使用者。這種的話就沒辦法去查 bitmap 表了,因為 bitmap 沒有辦法做這種聚合,隻能去查行為表。

第三步就是要做使用者 ID 跟在 線 bitmap 的一個轉化。Doris 其實已經提供了這樣的功能函數:to_bitmap,可以線上将使用者 id 轉換成 bitmap。

第四步是求交集。就是第一步和第四步的結果求交集。

然後,整篇的核心其實是在第三步:Doris 提供了 to_bitmap 的功能,它幫我們解決了非 bitmap 表和 bitmap 聯合查詢的問題。

以上是基于 Doris 使用者分層方案的一個講解,基于上述方案整體的性能收益是:

  • 95 分位耗時小于一秒
  • 存儲耗降低了 9.67 倍
  • 行數優化了八倍

如何快速産出使用者包

【硬剛大資料】從零到大資料專家之Apache Doris篇

現在講一下第三部分:使用者包。這部分主要是用來解決難點四:産出使用者包要求時效性高。這個其實也有兩個方案:

方案一:排程平台 + spark。

這個其實比較容易了解,因為要跑離線任務很容易就想到了 spark。在這個排程平台裡面用了 DAG 圖,分三步:先産出使用者的 cuid,然後再産出使用者的 uid,最後是回調一下做一次更新。

方案二:排程平台 + solo。

  • 執行的 DAG 圖的話就是:solo 去産出 cuid,uid,還有回調。
  • solo:是百度雲提供的 Pingo 單機執行引擎,可以了解為是一個類似于虛拟機的産品。

最終的方案選型選用了Doris。

方案一使用的是 Spark ,它存在幾個問題:比如 Yarn 排程比較耗時,有時候也會因為隊列的資源緊張而會有延遲,是以有時候會出現一個很極端的情況就是:産出零個使用者,也要 30 分鐘才能跑完,這種對使用者的體驗度非常不好。

方案二的話就是利用了 Doris 的 SELECT INTO OUTFILE 産出結果導出功能,就是查出的結果可以直接導出到 AFS,這樣的效果就是最快不到三分鐘就可以産出百萬級使用者,是以 Doris 性能在某些場景下比 Spark 要好很多。