1、背景
A任務在淩晨1點到3點,平均耗時1.5h,且是核心公共任務,急需優化。
整體代碼邏輯示意:
// 從tableA讀取一次資料,放到臨時表t1
DROP TABLE IF EXISTS temp.tmp_xx_$date_1;
CREATE TABLE IF NOT EXISTS temp.tmp_xxx_$date_1
as
select
xxx
from tableA
where xxxx;
// 從臨時表t1讀取和轉換資料,得臨時表t2
DROP TABLE IF EXISTS temp.tmp_xx_$date_2;
CREATE TABLE IF NOT EXISTS temp.tmp_xxx_$date_2
as
select
xxx
from temp.tmp_xx_$date_1
where xxxx;
// 從臨時表t1讀取和轉換資料,得臨時表t3
DROP TABLE IF EXISTS temp.tmp_xx_$date_3;
CREATE TABLE IF NOT EXISTS temp.tmp_xxx_$date_3
as
select
xxx
from temp.tmp_xx_$date_1
where xxxx;
//合并t2,t3結果寫入最終結果表
INSERT OVERWRITE TABLE biads.xxxx
PARTITION (pt_d='$date')
select
xxx
from temp.tmp_xx_$date_2
union all
select
xxx
from temp.tmp_xx_$date_3
2、排查思路
2.1 stage耗時分布
問題1: 讀取tableA耗時20min , 讀取時間較長
問題2: 寫入臨時表t1耗時20min,寫入臨時表時間較長
問題3:建立和寫入臨時表t2,t3 耗時近20min,臨時表備援,
2.2 executor資源負載
問題4: executor中task分布不均,存在部分exectuor運作了20-30個task,而其餘隻運作了1個task
3、解決方法
問題1,2,4—參數優化
// 增大讀取task數量
spark.hadoop.mapreduce.input.fileinputformat.split.maxsize 67108864
spark.hadoop.mapreduce.input.fileinputformat.split.minsize 1
// 減小合并小檔案的大小,注意自研spark的合并小檔案大小參數
spark.sql.mergefile.maxSize 134217728
// 增大driver資源,減輕gc
spark.driver.memory 8G
spark.driver.cores 4
// 避免executor中task傾斜
spark.locality.wait.process 200
spark.locality.wait.node 200
spark.locality.wait.rack 200
問題3-- 邏輯優化
// 從tableA讀取一次資料,放到臨時表t1
DROP TABLE IF EXISTS temp.tmp_xx_$date_1;
CREATE TABLE IF NOT EXISTS temp.tmp_xxx_$date_1
as
select
xxx
from tableA
where xxxx;
//消除中間臨時表,直接讀取t1, 寫入最終結果表
INSERT OVERWRITE TABLE biads.xxxx
PARTITION (pt_d='$date')
select
xxx
from temp.tmp_xx_$date_1
where xxxx;
union all
select
xxx
from temp.tmp_xx_$date_1
where xxxx;
4、優化後效果
問題1,2耗時分布降低至10min左右;
問題3耗時直接消除。
問題4 task傾斜緩解。
總體耗時從100min減少為50min