在工作中使用hive比較多,也寫了很多HiveQL。這裡從三個方面對 Hive 常用的一些性能優化進行了總結。
表設計層面優化
利用分區表優化
分區表是在某一個或者幾個次元上對資料進行分類存儲,一個分區對應一個目錄。如果篩選條件裡有分區字段,那麼 Hive 隻需要周遊對應分區目錄下的檔案即可,不需要周遊全局資料,使得處理的資料量大大減少,進而提高查詢效率。
當一個 Hive 表的查詢大多數情況下,會根據某一個字段進行篩選時,那麼非常适合建立為分區表。
利用桶表優化
指定桶的個數後,存儲資料時,根據某一個字段進行哈希後,确定存儲在哪個桶裡,這樣做的目的和分區表類似,也是使得篩選時不用全局周遊所有的資料,隻需要周遊所在桶就可以了。
選擇合适的檔案存儲格式
Apache Hive 支援 Apache Hadoop 中使用的幾種熟悉的檔案格式。
TextFile預設格式,如果建表時不指定預設為此格式。
存儲方式:行存儲。
每一行都是一條記錄,每行都以換行符
n
結尾。資料不做壓縮時,磁盤會開銷比較大,資料解析開銷也比較大。
可結合
Gzip、
Bzip2等壓縮方式一起使用(系統會自動檢查,查詢時會自動解壓),但對于某些壓縮算法 hive 不會對資料進行切分,進而無法對資料進行并行操作。
SequenceFile一種Hadoop API 提供的二進制檔案,使用友善、可分割、個壓縮的特點。
支援三種壓縮選擇:NONE、RECORD、BLOCK。RECORD壓縮率低,一般建議使用BLOCK壓縮。
RCFile存儲方式:資料按行分塊,每塊按照列存儲 。
- 首先,将資料按行分塊,保證同一個record在一個塊上,避免讀一個記錄需要讀取多個block。
- 其次,塊資料列式存儲,有利于資料壓縮和快速的列存取。
存儲方式:資料按行分塊,每塊按照列存儲
Hive 提供的新格式,屬于 RCFile 的更新版,性能有大幅度提升,而且資料可以壓縮存儲,壓縮快,快速列存取。
Parquet存儲方式:列式存儲
Parquet 對于大型查詢的類型是高效的。對于掃描特定表格中的特定列查詢,Parquet特别有用。Parquet一般使用 Snappy、Gzip 壓縮。預設 Snappy。
Parquet 支援 Impala 查詢引擎。
表的檔案存儲格式盡量采用 Parquet 或 ORC ,不僅降低存儲量,還優化了查詢,壓縮,表關聯等性能;
選擇合适的壓縮方式
Hive 語句最終是轉化為 MapReduce 程式來執行的,而 MapReduce 的性能瓶頸在與
網絡IO和
磁盤IO,要解決性能瓶頸,最主要的是
減少資料量,對資料進行壓縮是個好方式。壓縮雖然是減少了資料量,但是壓縮過程要消耗CPU,但是在Hadoop中,往往性能瓶頸不在于CPU,CPU壓力并不大,是以壓縮充分利用了比較空閑的CPU。
常用壓縮算法對比 如何選擇壓縮方式- 壓縮比率
- 壓縮解壓速度
- 是否支援split
支援分割的檔案可以并行的有多個 mapper 程式處理大資料檔案,大多數檔案不支援可分割是因為這些檔案隻能從頭開始讀。
文法和參數層面優化
列裁剪
Hive 在讀資料的時候,可以隻讀取查詢中所需要用到的列,而忽略其他的列。這樣做可以節省讀取開銷,中間表存儲開銷和資料整合開銷。
set hive.optimize.cp = true; -- 列裁剪,取數隻取查詢中需要用到的列,預設為真
分區裁剪
在查詢的過程中隻選擇需要的分區,可以減少讀入的分區數目,減少讀入的資料量。
set hive.optimize.pruner=true; // 預設為true
合并小檔案
Map 輸入合并
在執行 MapReduce 程式的時候,一般情況是一個檔案需要一個 mapper 來處理。但是如果資料源是大量的小檔案,這樣豈不是會啟動大量的 mapper 任務,這樣會浪費大量資源。可以将輸入的小檔案進行合并,進而減少mapper任務數量。
詳細分析set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- Map端輸入、合并檔案之後按照block的大小分割(預設)
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; -- Map端輸入,不合并
Map/Reduce輸出合并
大量的小檔案會給 HDFS 帶來壓力,影響處理效率。可以通過合并 Map 和 Reduce 的結果檔案來消除影響。
set hive.merge.mapfiles=true; -- 是否合并Map輸出檔案, 預設值為真
set hive.merge.mapredfiles=true; -- 是否合并Reduce 端輸出檔案,預設值為假
set hive.merge.size.per.task=25610001000; -- 合并檔案的大小,預設值為 256000000
合理控制 map/reduce 任務數量
合理控制 mapper 數量
減少 mapper 數可以通過合并小檔案來實作 增加 mapper 數可以通過控制上一個 reduce
預設的 mapper 個數計算方式
輸入檔案總大小:total_size
hdfs 設定的資料塊大小:dfs_block_size
default_mapper_num = total_size/dfs_block_size
MapReduce 中提供了如下參數來控制 map 任務個數:
set mapred.map.tasks=10;
從字面上看,貌似是可以直接設定 mapper 個數的樣子,但是很遺憾不行,這個參數設定隻有在大于
default_mapper_num
的時候,才會生效。
那如果我們需要減少 mapper 數量,但是檔案大小是固定的,那該怎麼辦呢?
可以通過
mapred.min.split.size
設定每個任務處理的檔案的大小,這個大小隻有在大于
dfs_block_size
的時候才會生效
split_size=max(mapred.min.split.size, dfs_block_size)
split_num=total_size/split_size
compute_map_num = min(split_num, max(default_mapper_num, mapred.map.tasks))
這樣就可以減少mapper數量了。
總結一下控制 mapper 個數的方法:
- 如果想增加 mapper 個數,可以設定
為一個較大的值mapred.map.tasks
- 如果想減少 mapper 個數,可以設定
為一個較大的值maperd.min.split.size
- 如果輸入是大量小檔案,想減少 mapper 個數,可以通過設定
合并小檔案hive.input.format
如果想要調整 mapper 個數,在調整之前,需要确定處理的檔案大概大小以及檔案的存在形式(是大量小檔案,還是單個大檔案),然後再設定合适的參數。
合理控制reducer數量
如果 reducer 數量過多,一個 reducer 會産生一個結數量果檔案,這樣就會生成很多小檔案,那麼如果這些結果檔案會作為下一個 job 的輸入,則會出現小檔案需要進行合并的問題,而且啟動和初始化 reducer 需要耗費和資源。
如果 reducer 數量過少,這樣一個 reducer 就需要處理大量的資料,并且還有可能會出現資料傾斜的問題,使得整個查詢耗時長。 預設情況下,hive 配置設定的 reducer 個數由下列參數決定:
- 參數1:
(預設1G)hive.exec.reducers.bytes.per.reducer
- 參數2:
(預設為999)hive.exec.reducers.max
reducer的計算公式為:
N = min(參數2, 總輸入資料量/參數1)
可以通過改變上述兩個參數的值來控制reducer的數量。 也可以通過
set mapred.map.tasks=10;
直接控制reducer個數,如果設定了該參數,上面兩個參數就會忽略。
Join優化
優先過濾資料
盡量減少每個階段的資料量,對于分區表能用上分區字段的盡量使用,同時隻選擇後面需要使用到的列,最大限度的減少參與 join 的資料量。
小表 join 大表原則
小表 join 大表的時應遵守小表 join 大表原則,原因是 join 操作的 reduce 階段,位于 join 左邊的表内容會被加載進記憶體,将條目少的表放在左邊,可以有效減少發生記憶體溢出的幾率。join 中執行順序是從左到右生成 Job,應該保證連續查詢中的表的大小從左到右是依次增加的。
使用相同的連接配接鍵
在 hive 中,當對 3 個或更多張表進行 join 時,如果 on 條件使用相同字段,那麼它們會合并為一個 MapReduce Job,利用這種特性,可以将相同的 join on 的放入一個 job 來節省執行時間。
啟用 mapjoin
mapjoin 是将 join 雙方比較小的表直接分發到各個 map 程序的記憶體中,在 map 程序中進行 join 操作,這樣就不用進行 reduce 步驟,進而提高了速度。隻有 join 操作才能啟用 mapjoin。
set hive.auto.convert.join = true; -- 是否根據輸入小表的大小,自動将reduce端的common join 轉化為map join,将小表刷入記憶體中。
set hive.mapjoin.smalltable.filesize = 2500000; -- 刷入記憶體表的大小(位元組)
set hive.mapjoin.maxsize=1000000; -- Map Join所處理的最大的行數。超過此行數,Map Join程序會異常退出
盡量原子操作
盡量避免一個SQL包含複雜的邏輯,可以使用中間表來完成複雜的邏輯。
桶表 mapjoin
當兩個分桶表 join 時,如果 join on的是分桶字段,小表的分桶數是大表的倍數時,可以啟用 mapjoin 來提高效率。
set hive.optimize.bucketmapjoin = true; -- 啟用桶表 map join
Group By 優化
預設情況下,Map階段同一個Key的資料會分發到一個Reduce上,當一個Key的資料過大時會産生
資料傾斜。進行
group by
操作時可以從以下兩個方面進行優化:
1. Map端部分聚合事實上并不是所有的聚合操作都需要在 Reduce 部分進行,很多聚合操作都可以先在 Map 端進行部分聚合,然後在 Reduce 端的得出最終結果。
set hive.map.aggr=true; -- 開啟Map端聚合參數設定
set hive.grouby.mapaggr.checkinterval=100000; -- 在Map端進行聚合操作的條目數目
2. 有資料傾斜時進行負載均衡 set hive.groupby.skewindata = true; -- 有資料傾斜的時候進行負載均衡(預設是false)
當選項設定為 true 時,生成的查詢計劃有兩個 MapReduce 任務。在第一個 MapReduce 任務中,map 的輸出結果會随機分布到 reduce 中,每個 reduce 做部分聚合操作,并輸出結果,這樣處理的結果是相同的
group by key
有可能分發到不同的 reduce 中,進而達到負載均衡的目的;第二個 MapReduce 任務再根據預處理的資料結果按照
group by key
分布到各個 reduce 中,最後完成最終的聚合操作。
Order By 優化
order by
隻能是在一個reduce程序中進行,是以如果對一個大資料集進行
order by
,會導緻一個reduce程序中處理的資料相當大,造成查詢執行緩慢。
- 在最終結果上進行
,不要在中間的大資料集上進行排序。如果最終結果較少,可以在一個reduce上進行排序時,那麼就在最後的結果集上進行order by
。order by
- 如果是去排序後的前N條資料,可以使用
和distribute by
在各個reduce上進行排序後前N條,然後再對各個reduce的結果集合合并後在一個reduce中全局排序,再取前N條,因為參與全局排序的sort by
的資料量最多是order by
,是以執行效率很高。reduce個數 * N
COUNT DISTINCT優化
-- 優化前(隻有一個reduce,先去重再count負擔比較大):
select count(distinct id) from tablename;
-- 優化後(啟動兩個job,一個job負責子查詢(可以有多個reduce),另一個job負責count(1)):
select count(1) from (select distinct id from tablename) tmp;
一次讀取多次插入
有些場景是從一張表讀取資料後,要多次利用,這時可以使用
multi insert
文法:
from sale_detail
insert overwrite table sale_detail_multi partition (sale_date='2010', region='china' )
select shop_name, customer_id, total_price where .....
insert overwrite table sale_detail_multi partition (sale_date='2011', region='china' )
select shop_name, customer_id, total_price where .....;
說明:
- 一般情況下,單個SQL中最多可以寫128路輸出,超過128路,則報文法錯誤。
- 在一個multi insert中:
- 對于分區表,同一個目标分區不允許出現多次。
- 對于未分區表,該表不能出現多次。
- 對于同一張分區表的不同分區,不能同時有
和insert overwrite
操作,否則報錯傳回。insert into
啟用壓縮
map 輸出壓縮
set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
中間資料壓縮
中間資料壓縮就是對 hive 查詢的多個 job 之間的資料進行壓縮。最好是選擇一個節省CPU耗時的壓縮方式。可以采用
snappy
壓縮算法,該算法的壓縮和解壓效率都非常高。
set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;
結果資料壓縮
最終的結果資料(Reducer輸出資料)也是可以進行壓縮的,可以選擇一個壓縮效果比較好的,可以減少資料的大小和資料的磁盤讀寫時間; 注:常用的gzip,snappy壓縮算法是不支援并行處理的,如果資料源是gzip/snappy壓縮檔案大檔案,這樣隻會有有個mapper來處理這個檔案,會嚴重影響查詢效率。 是以如果結果資料需要作為其他查詢任務的資料源,可以選擇支援splitable的
LZO
算法,這樣既能對結果檔案進行壓縮,還可以并行的處理,這樣就可以大大的提高job執行的速度了。關于如何給Hadoop叢集安裝LZO壓縮庫可以檢視
這篇文章。
set hive.exec.compress.output=true;
set mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
Hadoop叢集支援一下算法:
- org.apache.hadoop.io.compress.DefaultCodec
- org.apache.hadoop.io.compress.GzipCodec
- org.apache.hadoop.io.compress.BZip2Codec
- org.apache.hadoop.io.compress.DeflateCodec
- org.apache.hadoop.io.compress.SnappyCodec
- org.apache.hadoop.io.compress.Lz4Codec
- com.hadoop.compression.lzo.LzoCodec
- com.hadoop.compression.lzo.LzopCodec
Hive架構層面優化
啟用直接抓取
Hive 從 HDFS 中讀取資料,有兩種方式:啟用 MapReduce 讀取、直接抓取。
直接抓取資料比 MapReduce 方式讀取資料要快的多,但是隻有少數操作可以使用直接抓取方式。
可以通過
hive.fetch.task.conversion
參數來配置在什麼情況下采用直接抓取方式:
- minimal :隻有
、在分區字段上select *
過濾、有where
這三種場景下才啟用直接抓取方式。limit
- more :在
、select
篩選、where
時,都啟用直接抓取方式。limit
set hive.fetch.task.conversion=more; -- 啟用fetch more模式
本地化執行
Hive 在叢集上查詢時,預設是在叢集上多台機器上運作,需要多個機器進行協調運作,這種方式很好的解決了大資料量的查詢問題。但是在Hive查詢處理的資料量比較小的時候,其實沒有必要啟動分布式模式去執行,因為以分布式方式執行設計到跨網絡傳輸、多節點協調等,并且消耗資源。對于小資料集,可以通過本地模式,在單台機器上處理所有任務,執行時間明顯被縮短。
set hive.exec.mode.local.auto=true; -- 打開hive自動判斷是否啟動本地模式的開關
set hive.exec.mode.local.auto.input.files.max=4; -- map任務數最大值
set hive.exec.mode.local.auto.inputbytes.max=134217728; -- map輸入檔案最大大小
JVM重用
Hive 語句最終會轉換為一系列的 MapReduce 任務,每一個MapReduce 任務是由一系列的Map Task 和 Reduce Task 組成的,預設情況下,MapReduce 中一個 Map Task 或者 Reduce Task 就會啟動一個 JVM 程序,一個 Task 執行完畢後,JVM程序就會退出。這樣如果任務花費時間很短,又要多次啟動 JVM 的情況下,JVM的啟動時間會變成一個比較大的消耗,這時,可以通過重用 JVM 來解決。
set mapred.job.reuse.jvm.num.tasks=5;
JVM也是有缺點的,開啟JVM重用會一直占用使用到的 task 的插槽,以便進行重用,直到任務完成後才會釋放。如果某個 不平衡的job
中有幾個 reduce task 執行的時間要比其他的 reduce task 消耗的時間要多得多的話,那麼保留的插槽就會一直空閑卻無法被其他的 job 使用,直到所有的 task 都結束了才會釋放。
并行執行
有的查詢語句,hive會将其轉化為一個或多個階段,包括:MapReduce 階段、抽樣階段、合并階段、limit 階段等。預設情況下,一次隻執行一個階段。但是,如果某些階段不是互相依賴,是可以并行執行的。多階段并行是比較耗系統資源的。
set hive.exec.parallel=true; -- 可以開啟并發執行。
set hive.exec.parallel.thread.number=16; -- 同一個sql允許最大并行度,預設為8。
推測執行
在分布式叢集環境下,因為程式Bug(包括Hadoop本身的bug),負載不均衡或者資源分布不均等原因,會造成同一個作業的多個任務之間運作速度不一緻,有些任務的運作速度可能明顯慢于其他任務(比如一個作業的某個任務進度隻有50%,而其他所有任務已經運作完畢),則這些任務會拖慢作業的整體執行進度。為了避免這種情況發生,Hadoop采用了推測執行(Speculative Execution)機制,它根據一定的法則推測出“拖後腿”的任務,并為這樣的任務啟動一個備份任務,讓該任務與原始任務同時處理同一份資料,并最終選用最先成功運作完成任務的計算結果作為最終結果。
set mapreduce.map.speculative=true;
set mapreduce.reduce.speculative=true;
建議:
如果使用者對于運作時的偏差非常敏感的話,那麼可以将這些功能關閉掉。如果使用者因為輸入資料量很大而需要執行長時間的map或者Reduce task的話,那麼啟動推測執行造成的浪費是非常巨大大。
擴充閱讀
- 一篇文章帶你快速搞懂HBase RowKey設計
- 帶你快速上手HBase | HBase列族優化
- 帶你快速上手HBase | HBase讀寫性能優化
- 福利!入門HBase的正确姿勢
- 從0開始學大資料-Hive基礎篇
- 大資料-Hive配置參數知多少
- 如何給Hadoop叢集安裝snappy和lzo壓縮庫