天天看點

ODPS SQL優化總結

ODPS(Open Data Processing Service)是一個海量資料處理平台,基于阿裡巴巴自主研發的分布式作業系統(飛天)開發,是公司雲計算整體解決方案中最核心的主力産品之一。本文結合作者多年的數倉開發經驗,結合ODPS平台分享資料倉庫中的SQL優化經驗。

背景

資料倉庫,是一個面向主題、內建的、随時間變化的、資訊本身相對穩定的資料集合。資料倉庫從Oracle(單機、RAC),到MPP(Green plum),到Hadoop(Hive、Tez、Sprak),再到批流一體Flink/Blink、資料湖等,SQL都是其主流的資料處理工具。海量資料下的高效資料流轉,是資料同學必須直面的一個挑戰。本文結合阿裡自研的ODPS平台,從自身工作出發,總結SQL的一些優化技巧。

▐ SQL的一些使用技巧

  • null

我們在進行=/<>/in/not in等判斷時,null會不包含在這些判斷條件中,是以在對null的處理時可以使用nvl或者coalesce函數對null進行預設轉換。

  • select *

在資料開發或者線上任務時,盡可能提前對列進行剪裁,即使是全表字段都需要,也盡可能的把字段都寫出來(如果實在覺得麻煩,可以使用資料地圖的生成select功能),一是減少了資料運算中不必要的資料讀取,二是避免後期因為原表或者目标表字段增加,導緻的任務報錯。

  • multi insert

讀取同一張表,但是因為粒度不同,需要插入多張表時,可以考慮使用  from () tab insert overwrite A insert overwrite B 的方式,減少資源的浪費。當然,有些團隊的數倉開發規範中會規定一個任務不能有兩個目标表,具體情況可以視情況盡可能複用公共資料,如通過臨時表的方式臨時存儲這部分邏輯。

  • 分區限定

ODPS表大部分都是分區表,分區表又會根據業務規則分為增量表、全量表、快照表等。是以在做簡單查詢,或者資料探查時,一定要養成習慣先限定分區ds。經常會在jobhistory中看到很多好資源的任務都是因為分區限定不合理或者沒有限定分區導緻的。

  • limit的使用

臨時查詢或者資料探查時,養成習慣加上limit,會快速的查詢出你想要的資料,且消耗更少的資源。

  • UDF函數的使用

盡可能把UDF的使用下沉到第一層子查詢中,效率會有很大的提升。

  • 行轉列、列轉行

collect_set 、lateral view函數可以實作行轉列或者列轉行的功能,好多大佬也都寫過類似的ATA,可以參考。

  • 視窗函數的使用

可以通過 row_number()/rank() over(partition by  order by )的方式實作資料按照某個字段分組的排序,也可以通過  max(struct())的方式實作。

  • 關聯

左關聯、内關聯、右關聯、left anti join 、left semi join等,可以實作不同情況下的多表關聯。關聯字段要確定字段類型的一緻。

  • 笛卡爾積的應用

有時會存在把一行資料翻N倍的訴求,這時候可以考慮自己建立一個維表,通過笛卡爾積操作;同時也可以通過LATERAL VIEW POSEXPLODE(split(REGEXP_REPLACE(space(end_num -start_num+1),' ','1,'),',')) t AS pos ,val 的方式。

▐ 資料傾斜問題

  • 大表關聯小表

大表關聯小表出現傾斜時,可以使用mapjoin的hint(/+mapjoin(b)/)。

同時可适當調整mapjoin中小表的記憶體大小:

**set odps.sql.mapjoin.memory.max=512; **預設512,機關M,[128,2048]之間調整。

  • 大表關聯大表

一種情況,大表中存在熱點key:可以考慮對大表進行拆分,根據join的key,把熱點的資料拆出來走mapjoin,其餘的考慮普通join即可。當然也有skewjoin的hint可以參考使用。

另一種情況,大表中不存在熱點key:可以考慮在分區的基礎上加上桶,對關聯字段進行分桶,減少shuffle的資料量。

  • count distinct

常見的資料傾斜還有一種情況是因為使用了count distinct,這種情況可以考慮使用group by先進行資料去重,再count。

  • odps新特性

可以關注MaxCompute(ODPS2.0)重裝上陣系列文章,很多心得特性對于我們的性能優化有很大的幫助。

▐ 常用的參數設定

常用的調整無外乎調整map、join、reduce的個數,map、join、reduce的記憶體大小。本文以ODPS的參數設定為例,參數可能因版本不同而略有差異。

  • Map設定

set odps.sql.mapper.cpu=100

作用:設定處理Map Task每個Instance的CPU數目,預設為100,在[50,800]之間調整。

場景:某些任務如果特别耗計算資源的話,可以适當調整Cpu數目。對于大多數Sql任務來說,一般不需要調整Cpu個數的。

set odps.sql.mapper.memory=1024

作用:設定Map Task每個Instance的Memory大小,機關M,預設1024M,在[256,12288]之間調整。

場景:當Map階段的Instance有Writer Dumps時,可以适當的增加記憶體大小,減少Dumps所花的時間。

set odps.sql.mapper.merge.limit.size=64

作用:設定控制檔案被合并的最大門檻值,機關M,預設64M,在[0,Integer.MAX_VALUE]之間調整。

場景:當Map端每個Instance讀入的資料量不均勻時,可以通過設定這個變量值進行小檔案的合并,使得每個Instance的讀入檔案均勻。一般會和odps.sql.mapper.split.size這個參數結合使用。

set odps.sql.mapper.split.size=256

作用:設定一個Map的最大資料輸入量,可以通過設定這個變量達到對Map端輸入的控制,機關M,預設256M,在[1,Integer.MAX_VALUE]之間調整。

場景:當每個Map Instance處理的資料量比較大,時間比較長,并且沒有發生長尾時,可以适當調小這個參數。如果有發生長尾,則結合odps.sql.mapper.merge.limit.size這個參數設定每個Map的輸入數量。

  • Join設定

set odps.sql.joiner.instances=-1

作用: 設定Join Task的Instance數量,預設為-1,在[0,2000]之間調整。不走HBO優化時,ODPS能夠自動設定的最大值為1111,手動設定的最大值為2000,走HBO時可以超過2000。

場景:每個Join Instance處理的資料量比較大,耗時較長,沒有發生長尾,可以考慮增大使用這個參數。

set odps.sql.joiner.cpu=100

作用: 設定Join Task每個Instance的CPU數目,預設為100,在[50,800]之間調整。

場景:某些任務如果特别耗計算資源的話,可以适當調整CPU數目。對于大多數SQL任務來說,一般不需要調整CPU。

set odps.sql.joiner.memory=1024

作用:設定Join Task每個Instance的Memory大小,機關為M,預設為1024M,在[256,12288]之間調整。

場景:當Join階段的Instance有Writer Dumps時,可以适當的增加記憶體大小,減少Dumps所花的時間。

作業跑完後,可以在 summary 中搜尋 writer dumps 字樣來判斷是否産生 Writer Dumps。

  • Reduce設定

set odps.sql.reducer.instances=-1

作用: 設定Reduce Task的Instance數量,手動設定區間在[1,99999]之間調整。不走HBO優化時,ODPS能夠自動設定的最大值為1111,手動設定的最大值為99999,走HBO優化時可以超過99999。

場景:每個Join Instance處理的資料量比較大,耗時較長,沒有發生長尾,可以考慮增大使用這個參數。

set odps.sql.reducer.cpu=100

作用:設定處理Reduce Task每個Instance的Cpu數目,預設為100,在[50,800]之間調整。

場景:某些任務如果特别耗計算資源的話,可以适當調整Cpu數目。對于大多數Sql任務來說,一般不需要調整Cpu。

set odps.sql.reducer.memory=1024

作用:設定Reduce Task每個Instance的Memory大小,機關M,預設1024M,在[256,12288]之間調整。

場景:當Reduce階段的Instance有Writer Dumps時,可以适當的增加記憶體的大小,減少Dumps所花的時間。

上面這些參數雖然好用,但是也過于簡單暴力,可能會對叢集産生一定的壓力。特别是在叢集整體資源緊張的情況下,增加資源的方法可能得不到應有的效果,随着資源的增大,等待資源的時間變長的風險也随之增加,導緻效果不好!是以請合理的使用資源參數!

  • 小檔案合并參數

set odps.merge.cross.paths=true|false

作用:設定是否跨路徑合并,對于表下面有多個分區的情況,合并過程會将多個分區生成獨立的Merge Action進行合并,是以對于odps.merge.cross.paths設定為true,并不會改變路徑個數,隻是分别去合并每個路徑下的小檔案。

set odps.merge.smallfile.filesize.threshold = 64

作用:設定合并檔案的小檔案大小閥值,檔案大小超過該閥值,則不進行合并,機關為M,可以不設,不設時,則使用全局變量odps_g_merge_filesize_threshold,該值預設為32M,設定時必須大于32M。

set odps.merge.maxmerged.filesize.threshold = 256

作用:設定合并輸出檔案量的大小,輸出檔案大于該閥值,則建立新的輸出檔案,機關為M,可以不設,不設時,則使用全局變odps_g_max_merged_filesize_threshold,該值預設為256M,設定時必須大于256M。

set odps.merge.max.filenumber.per.instance = 10000

作用:設定合并Fuxi Job的單個Instance允許合并的小檔案個數,控制合并并行的Fuxi Instance數,可以不設,不設時,則使用全局變量odps_g_merge_files_per_instance,該值預設為100,在一個Merge任務中,需要的Fuxi Instance個數至少為該目錄下面的總檔案個數除以該限制。

set odps.merge.max.filenumber.per.job = 10000

作用:設定合并最大的小檔案個數,小檔案數量超過該限制,則超過限制部分的檔案忽略,不進行合并,可以不設,不設時,則使用全局變量odps_g_max_merge_files,該值預設為10000。

  • UDF相關參數

set odps.sql.udf.jvm.memory=1024

作用: 設定UDF JVM Heap使用的最大記憶體,機關M,預設1024M,在[256,12288]之間調整。

場景:某些UDF在記憶體計算、排序的資料量比較大時,會報記憶體溢出錯誤,這時候可以調大該參數,不過這個方法隻能暫時緩解,還是需要從業務上去優化。

set odps.sql.udf.timeout=1800

作用:設定UDF逾時時間,預設為1800秒,機關秒。[0,3600]之間調整。

set odps.sql.udf.python.memory=256

作用:設定UDF python 使用的最大記憶體,機關M,預設256M。[64,3072]之間調整。

set odps.sql.udf.optimize.reuse=true/false

作用:開啟後,相同的U DF函數表達式,隻計算一次,可以提高性能,預設為True。

set odps.sql.udf.strict.mode=false/true

作用:True為金融模式,False為淘寶模式,控制有些函數在遇到髒資料時是傳回NULL還是抛異常,True是抛出異常,False是傳回null。

  • Mapjoin設定

set odps.sql.mapjoin.memory.max=512

作用:設定Mapjoin時小表的最大記憶體,預設512,機關M,[128,2048]之間調整。

  • 動态分區設定

set odps.sql.reshuffle.dynamicpt=true/false

作用:預設true,用于避免拆分動态分區時産生過多小檔案。如果生成的動态分區個數隻會是很少幾個,設為false避免資料傾斜。

  • 資料傾斜設定

set odps.sql.groupby.skewindata=true/false

作用:開啟Group By優化。

set odps.sql.skewjoin=true/false

作用:開啟Join優化,必須設定odps.sql.skewinfo 才有效。

SQL優化案例一:關聯與資料傾斜

▐ 背景

正常的一段SQL邏輯,近90天淘寶天貓訂單表作為主表,左關聯商品屬性表,左關聯SKU屬性表。

第一階段:業務訴求裡隻需要取40個葉子類目的訂單資料,正常開發上線運作兩個月,暫時沒有發現任何運作緩慢的問題。

第二階段:業務訴求葉子類目擴充到所有實物類目,開發上線後發現JOIN節點出現了運作緩慢的問題,運作時長到達了4個小時。

▐ 解決步驟
  • skewjoin

看到JOIN節點運作緩慢,第一反應是資料傾斜,通過對淘寶天貓訂單表按照商品次元彙總統計也可以印證存在熱銷商品的情況。于是毫不猶豫使用了ODPS的skewjoin hint。然而經過幾次測試,JOIN節點運作緩慢的問題有所緩解,但是運作時長還是2個多小時,明顯沒有達到優化的預期。

  • 傳統的熱點資料分離

skewjoin時效有所提升,但是還不是很理想,想嘗試下傳統的熱點資料拆分:淘寶天貓訂單表中熱賣TOP50W商品寫入臨時表,TOP50W商品訂單明細與對應的商品屬性表、SKU屬性表MAPJOIN,非TOP50W商品訂單明細與對應的商品屬性表、SKU屬性表普通JOIN。但是運作時效還是不太理想,也要2個多小時。

  • 執行計劃詳細分析
隐式轉換

實在是不知道哪裡出現了問題,嘗試通過執行計劃,看下具體的執行細節,在這裡猛然發現了一個很大的問題:關聯的時候,item_id和SKU_ID都先轉換成了DOUBLE再進行關聯。

ODPS SQL優化總結

通過一個簡單SQL測試也印證了這個問題,bm_dw.dim_itm_prop_dtl_di表中item_id存儲的是string,查詢時item_id輸入為bigint,但是執行結果明顯錯誤,原因就是預設把int的資料轉換成了double再去比對。

但是也嘗試用比較正常長度的item_id查詢,貌似資料又是正确的,猜想大概是超過15-16位後精度就不準确導緻。

資料字段類型檢查

檢查字段發現訂單表中item_id是bigint,但是sku屬性和商品屬性中的item_id存儲成了string。

最終嘗試關聯的時候都強制轉換成string再觀察,發現在資源充足的情況40分鐘即可完成任務的計算。

ODPS SQL優化總結
▐ 優化總結
  1. skewjoin或者傳統拆分冷熱資料可以解決正常的資料傾斜。
  2. 關聯時要確定左右資料類型一緻,如不一緻建議強制轉換成string再進行關聯。
  3. 商品id竟然存在18位的情況,後續使用過程中建議還是統一存儲成string,查詢時最好也使用string類型,避免各種查詢、分析帶來的麻煩。

SQL優化案例二:分桶解決大表與大表的關聯

▐ 背景

DWS層存儲了淘寶天貓使用者天增量粒度的使用者與商品互動行為輕度彙總資料(浏覽、收藏、加購、下單、交易等等),基于明細資料需要彙總使用者N天内的行為彙總資料,分析資料發現無明顯的資料分布不均勻情況,但執行效率明顯不高。

SELECT              cate_id
                    ,shop_type
                    ,user_id
                    ,SUM(itm_sty_tme) AS itm_sty_tme
                    ,SUM(itm_vst_cnt) AS itm_vst_cnt
                    ,SUM(liv_sty_tme) AS liv_sty_tme
                    ,SUM(liv_vst_cnt) AS liv_vst_cnt
                    ,SUM(vdo_sty_tme) AS vdo_sty_tme
                    ,SUM(vdo_vst_cnt) AS vdo_vst_cnt
                    ,SUM(img_txt_sty_tme) AS img_txt_sty_tme
                    ,SUM(img_txt_vst_cnt) AS img_txt_vst_cnt
                    ,SUM(col_cnt_ufm) AS col_cnt_ufm
                    ,SUM(crt_cnt_ufm) AS crt_cnt_ufm
                    ,SUM(sch_cnt_ufm) AS sch_cnt_ufm
                    ,SUM(mkt_iat_cnt) AS mkt_iat_cnt
                    ,SUM(fan_flw_cnt) AS fan_flw_cnt
                    ,SUM(fst_itm_sty_tme) AS fst_itm_sty_tme
                    ,SUM(fst_itm_vst_cnt) AS fst_itm_vst_cnt
                    ,SUM(col_cnt_fm) AS col_cnt_fm
                    ,SUM(crt_cnt_fm) AS crt_cnt_fm
                    ,SUM(sch_cnt_fm) AS sch_cnt_fm
                    ,SUM(shr_cnt) AS shr_cnt
                    ,SUM(cmt_cnt) AS cmt_cnt
                    ,SUM(pvt_iat_cnt) AS pvt_iat_cnt
            FROM    dws_tm_brd_pwr_deep_usr_cat_1d
            WHERE   ds = TO_CHAR(DATEADD(TO_DATE('${bizdate}', 'yyyymmdd'), -89, 'dd'), 'yyyymmdd')
            AND    cate_flag = '1'
            GROUP BY cate_id
                     ,shop_type
                     ,user_id      
▐ 解決步驟

參數調優: 增加map、reduce個數,執行效率沒有明顯的提升。

分桶: 測試使用hash clustering解決group/join緩慢的問題。

1、建立測試表
create table tmp_zhangtao_test_hash_range like dws_tm_brd_pwr_deep_brd_usr_cat_1d LIFECYCLE 2;
2、檢視測試表結構
desc mkt.tmp_zhangtao_test_hash_range;
3、修改測試表支援桶;測試時發現user_id傾斜情況不太嚴重
ALTER TABLE tmp_zhangtao_test_hash_range CLUSTERED BY (user_id) 
SORTED by ( user_id) INTO 1024 BUCKETS;
4、插入資料,這裡發現多了一個1024個任務的reduce。
insert OVERWRITE table mkt.tmp_zhangtao_test_hash_range partition(ds,cate_flag)
SELECT
  brand_id,
  cate_id,
  user_id,
  shop_type,
  deep_score,
  brd_ord_amt,
  discovery_score,
  engagement_score,
  enthusiasm_score,
  itm_sty_tme,
  itm_vst_cnt,
  liv_sty_tme,
  liv_vst_cnt,
  vdo_sty_tme,
  vdo_vst_cnt,
  img_txt_sty_tme,
  img_txt_vst_cnt,
  col_cnt_ufm,
  crt_cnt_ufm,
  sch_cnt_ufm,
  mkt_iat_cnt,
  fan_flw_cnt,
  fst_itm_sty_tme,
  fst_itm_vst_cnt,
  col_cnt_fm,
  crt_cnt_fm,
  sch_cnt_fm,
  shr_cnt,
  cmt_cnt,
  pvt_iat_cnt,
  ds,
  cate_flag
 FROM dws_tm_brd_pwr_deep_brd_usr_cat_1d
 WHERE ds = TO_CHAR(DATEADD(TO_DATE('${bizdate}', 'yyyymmdd'), -89, 'dd'), 'yyyymmdd');
 5、查詢資料性能比對
SELECT              cate_id
                    ,shop_type
                    ,user_id
                    ,SUM(deep_score) AS deep_score
                    ,SUM(brd_ord_amt) AS brd_ord_amt
                    ,SUM(discovery_score) AS discovery_score
                    ,SUM(engagement_score) AS engagement_score
                    ,SUM(enthusiasm_score) AS enthusiasm_score
                    ,SUM(itm_sty_tme) AS itm_sty_tme
                    ,SUM(itm_vst_cnt) AS itm_vst_cnt
                    ,SUM(liv_sty_tme) AS liv_sty_tme
                    ,SUM(liv_vst_cnt) AS liv_vst_cnt
                    ,SUM(vdo_sty_tme) AS vdo_sty_tme
                    ,SUM(vdo_vst_cnt) AS vdo_vst_cnt
                    ,SUM(img_txt_sty_tme) AS img_txt_sty_tme
                    ,SUM(img_txt_vst_cnt) AS img_txt_vst_cnt
                    ,SUM(col_cnt_ufm) AS col_cnt_ufm
                    ,SUM(crt_cnt_ufm) AS crt_cnt_ufm
                    ,SUM(sch_cnt_ufm) AS sch_cnt_ufm
                    ,SUM(mkt_iat_cnt) AS mkt_iat_cnt
                    ,SUM(fan_flw_cnt) AS fan_flw_cnt
                    ,SUM(fst_itm_sty_tme) AS fst_itm_sty_tme
                    ,SUM(fst_itm_vst_cnt) AS fst_itm_vst_cnt
                    ,SUM(col_cnt_fm) AS col_cnt_fm
                    ,SUM(crt_cnt_fm) AS crt_cnt_fm
                    ,SUM(sch_cnt_fm) AS sch_cnt_fm
                    ,SUM(shr_cnt) AS shr_cnt
                    ,SUM(cmt_cnt) AS cmt_cnt
                    ,SUM(pvt_iat_cnt) AS pvt_iat_cnt
            FROM    dws_tm_brd_pwr_deep_usr_cat_1d/tmp_zhangtao_test_hash_range
            WHERE   ds = TO_CHAR(DATEADD(TO_DATE('${bizdate}', 'yyyymmdd'), -89, 'dd'), 'yyyymmdd')
            AND    cate_flag = '1'
            GROUP BY cate_id
                     ,shop_type
                     ,user_id      

查詢結果:

使用hash clustering ,map數和桶個數相同。

Summary: resource cost: cpu 0.34 Core * Min, memory 0.61 GB * Min

不使用hash clustering:

resource cost: cpu 175.85 Core * Min, memory 324.24 GB * Min

▐ 優化總結

通過CREATE TABLE或者ALTER TABLE語句,指定一個或者多個Cluster列,通過哈希方法,把資料存儲分散到若幹個桶裡面,類似于這樣:

CREATE TABLE T (C1 string, C2 string, C3 int) CLUSTERED BY (C3) SORTED by (C3) INTO 1024 BUCKETS;

這樣做有幾個好處:

對于C3列的等值條件查詢,可以利用Hash算法,直接定位到對應的哈希桶,如果桶内資料排序存儲,還可以進一步利用索引定位,進而大大減少資料掃描量,提高查詢效率。

如果有表T2希望和T1在C3上做Join,那麼對于T1表因為C3已經Hash分布,可以省掉Shuffle的步驟,進而大大節省計算資源。

Hash Clustering也有一些局限性:

  1. 使用Hash算法分桶,有可能産生Data Skew的問題。和Join Skew一樣,這是Hash算法本身固有的局限性,輸入資料存在某些特定的資料分布時,可能造成傾斜,進而導緻各個哈希桶之間資料量差異較大。因為Hash Clustering之後,我們的并發處理機關往往是一個桶,如果哈希桶資料量不一緻,線上上往往容易造成長尾現象。
  2. Bucket Pruning隻支援等值查詢。因為使用哈希分桶方法,對于區間查詢,比如上例中使用C3 > 0這樣的條件,我們無法在哈希桶級别定位,隻能把查詢下發到所有桶内進行。
  3. 對于多個CLUSTER KEY的組合查詢,隻有所有CLUSTER KEY都出現并且都為等值條件,才能達到優化效果

SQL優化案例三:結合業務具體場景給出合理的SQL優化方案

▐ 背景

還是上面案例二的例子,DWS層存儲了淘寶天貓使用者天增量粒度的使用者與商品互動行為輕度彙總資料(浏覽、收藏、加購、下單、交易等等),基于明細資料需要彙總使用者30天内的行為彙總資料。

▐ 解決步驟
  • 基于月+日的計算方式

使用bigint類型的行為作為判斷依據,>0的儲存。采用double的判斷>0存在資料精度問題導緻的資料偏差。

ODPS SQL優化總結

優化後:

可以發現map階段讀取原始資料map減少,計算時間縮短40分鐘。

  • 一次讀取多次插入

後續需求中衍生出需要使用者+一級類目的行為彙總資料,采用from insert1 insert2的方式,實作一次讀取多次寫入,減少資源消耗。

▐ 優化總結