天天看點

hive怎麼處理過濾掉滿足多個多個條件的記錄_Hive調優的一些總結

hive怎麼處理過濾掉滿足多個多個條件的記錄_Hive調優的一些總結

hive是基于Hadoop的一個資料倉庫工具,用來進行資料提取、轉化、加載,這是一種可以存儲、查詢和分析存儲在Hadoop中的大規模資料的機制。hive資料倉庫工具能将結構化的資料檔案映射為一張資料庫表,并提供SQL查詢功能,能将SQL語句轉變成MapReduce任務來執行。Hive的優點是學習成本低,可以通過類似SQL語句實作快速MapReduce統計,使MapReduce變得更加簡單,而不必開發專門的MapReduce應用程式。hive是十分适合資料倉庫的統計分析和Windows系統資料庫檔案;

  hive強大之處不要求資料轉換成特定的格式,而是利用hadoop本身InputFormat API來從不同的資料源讀取資料,同樣地使用OutputFormat API将資料寫成不同的格式。是以對于不同的資料源,或者寫出不同的格式就需要不同的對應的InputFormat和OutputFormat類的實作。以stored as textFile為例,其在底層java API中表現是輸入InputFormat格式:TextInputFormat以及輸出OutputFormat格式:HiveIgnoreKeyTextOutputFormat。這裡InputFormat中定義了如何對資料源文本進行讀取劃分,以及如何将切片分割成記錄存入表中。而OutputFormat定義了如何将這些切片寫回到檔案裡或者直接在控制台輸出。

  Hive擁有統一的中繼資料管理,是以和Spark、Impala等SQL引擎是通用的。通用是指,在擁有了統一的metastore之後,在Hive中建立一張表,在Spark/Impala中是能用的;反之在Spark中建立一張表,在Hive中也是能用的,隻需要共用中繼資料,就可以切換SQL引擎,涉及到了Spark sql和Hive On Spark。

  不僅如此Hive使用SQL文法,提供快速開發的能力,還可以通過使用者定義的函數(UDF),使用者定義的聚合(UDAF)和使用者定義的表函數(UDTF)進行擴充,避免了去寫mapreducce,減少開發人員的學習成本。Hive中不僅可以使用逗号和制表符分隔值(CSV/TSV)文本檔案,還可以使用Sequence File、RC、ORC、Parquet(知道這幾種存儲格式的差別)。當然Hive還可以通過使用者來自定義自己的存儲格式,基本上前面說到幾種格式完全夠了。Hive旨在最大限度地提高可伸縮性(通過向Hadoop叢集動态田間更多機器擴充),性能,可擴充性,容錯性以及與其輸入格式的松散耦合。

  資料離線處理,比如日志分析,海量資料結構化分析。

Hive用的好,才能從資料中挖掘出更多的資訊來。用過hive的朋友,我想或多或少都有類似的經曆:一天下來,沒跑幾次hive,就到下班時間了。Hive在極大資料或者資料不平衡等情況下,表現往往一般,是以也出現了presto、spark-sql等替代品。這裡重點講解hive的優化方式,例如

優化分組:set hive.auto.convert.join=true;

優化表關聯記憶體運作:/*+MAPJOIN(t1,t3,t4)*/ 複制代碼           

一. 表連接配接優化

将大表放後頭

Hive假定查詢中最後的一個表是大表。它會将其它表緩存起來,然後掃描最後那個表。是以通常需要将小表放前面,或者标記哪張表是大表:/streamtable(table_name) /

使用相同的連接配接鍵

當對3個或者更多個表進行join連接配接時,如果每個on子句都使用相同的連接配接鍵的話,那麼隻會産生一個MapReduce job。

盡量盡早地過濾資料

減少每個階段的資料量,對于分區表要加分區,同時隻選擇需要使用到的字段。

盡量原子化操作

盡量避免一個SQL包含複雜邏輯,可以使用中間表來完成複雜的邏輯

二. 用insert into替換union all如果union all的部分個數大于2,或者每個union部分資料量大,應該拆成多個insert into 語句,實際測試過程中,執行時間能提升50%。示例參考如下:

insert overwite table tablename partition (dt= ....)  

select ..... from ( select ... from A
union all  
select ... from B  union all select ... from C ) R  

where ...;           

可以改寫為:

insert into table tablename partition (dt= ....) select .... from A WHERE ...;
insert into table tablename partition (dt= ....) select .... from B  WHERE ...;
insert into table tablename partition (dt= ....) select .... from C WHERE ...;

           

三. order by & sort byorder by : 對查詢結果進行全局排序消耗時間長,需要set hive.mapred.mode=nostrictsort by : 局部排序,并非全局有序,提高效率。

四. transform+python一種嵌入在hive取數流程中的自定義函數,通過transform語句可以把在hive中不友善實作的功能在python中實作,然後寫入hive表中。示例文法如下:

select transform({column names1})

using '**.py'

as {column names2}

from {table name}           

如果除python腳本外還有其它依賴資源,可以使用ADD ARVHIVE。

五. limit 語句快速出結果一般情況下,Limit語句還是需要執行整個查詢語句,然後再傳回部分結果。有一個配置屬性可以開啟,避免這種情況—對資料源進行抽樣

hive.limit.optimize.enable=true --- 開啟對資料源進行采樣的功能
hive.limit.row.max.size --- 設定最小的采樣容量
hive.limit.optimize.limit.file --- 設定最大的采樣樣本數           

缺點:有可能部分資料永遠不會被處理到

六. 本地模式對于小資料集,為查詢觸發執行任務消耗的時間>實際執行job的時間,是以可以通過本地模式,在單台機器上(或某些時候在單個程序上)處理所有的任務。

set oldjobtracker=${hiveconf:mapred.job.tracker};
set mapred.job.tracker=local;  
set marped.tmp.dir=/home/edward/tmp;
set mapred.job.tracker=${oldjobtracker};           

可以通過設定屬性hive.exec.mode.local.auto的值為true,來讓Hive在适當的時候自動啟動這個優化,也可以将這個配置寫在$HOME/.hiverc檔案中。當一個job滿足如下條件才能真正使用本地模式:

job的輸入資料大小必須小于參數:hive.exec.mode.local.auto.inputbytes.max(預設128MB)
job的map數必須小于參數:hive.exec.mode.local.auto.tasks.max(預設4)
job的reduce數必須為0或者1
可用參數hive.mapred.local.mem(預設0)控制child的jvm使用的最大記憶體數。
           

七. 并行執行Hive會将一個查詢轉化為一個或多個階段,包括:MapReduce階段、抽樣階段、合并階段、limit階段等。預設情況下,一次隻執行一個階段。不過,如果某些階段不是互相依賴,是可以并行執行的。

set hive.exec.parallel=true,可以開啟并發執行。
set hive.exec.parallel.thread.number=16; //同一個sql允許最大并行度,預設為8。           

會比較耗系統資源。

八. 調整mapper和reducer的個數

  1. Map階段優化

    map個數的主要的決定因素有:input的檔案總個數,input的檔案大小,叢集設定的檔案塊大小(預設128M,不可自定義)。參考舉例如下:

假設input目錄下有1個檔案a,大小為780M,那麼hadoop會将該檔案a分隔成7個塊(6個128m的塊和1個12m的塊),進而産生7個map數假設input目錄下有3個檔案a,b,c,大小分别為10m,20m,130m,那麼hadoop會分隔成4個塊(10m,20m,128m,2m),進而産生4個map數。即如果檔案大于塊大小(128m),那麼會拆分,如果小于塊大小,則把該檔案當成一個塊。map執行時間:map任務啟動和初始化的時間+邏輯處理的時間。

減少map數若有大量小檔案(小于128M),會産生多個map,處理方法是:

set mapred.max.split.size=100000000;
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;           

前面三個參數确定合并檔案塊的大小,大于檔案塊大小128m的,按照128m來分隔,小于128m,大于100m的,按照100m來分隔,把那些小于100m的(包括小檔案和分隔大檔案剩下的)進行合并。

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; – 執行前進行小檔案合并。

增加map數當input的檔案都很大,任務邏輯複雜,map執行非常慢的時候,可以考慮增加Map數,來使得每個map處理的資料量減少,進而提高任務的執行效率。set mapred.reduce.tasks=?

  1. Reduce階段優化

    調整方式:

set mapred.reduce.tasks=?
set hive.exec.reducers.bytes.per.reducer = ?           

一般根據輸入檔案的總大小,用它的estimation函數來自動計算reduce的個數:reduce個數 = InputFileSize / bytes per reducer

九. 嚴格模式

set hive.marped.mode=strict --防止使用者執行那些可能意想不到的不好的影響的查詢
(1)分區表,必須標明分區範圍
(2)對于使用order by的查詢,要求必須使用limit語句。因為order by為了執行排序過程會将所有的結果資料分發到同一個reducer中進行處理
(3)限制笛卡爾積查詢:兩張表join時必須有on語句

           

十. 資料傾斜表現:任務進度長時間維持在99%(或100%),檢視任務監控頁面,發現隻有少量(1個或幾個)reduce子任務未完成。因為其處理的資料量和其他reduce差異過大。單一reduce的記錄數與平均記錄數差異過大,通常可能達到3倍甚至更多。最長時長遠大于平均時長。

原因:

(1)key分布不均勻
(2)業務資料本身的特性
(3)建表時考慮不周
(4)某些SQL語句本身就有資料傾斜複制代碼           
hive怎麼處理過濾掉滿足多個多個條件的記錄_Hive調優的一些總結

解決方案:參數調節

set hive.map.aggr=true           

更多資料分析内容,期待關注個人公衆号:

97年陳伯伯