天天看點

JAVA開發搞了一年多大資料的總結

作者:Java架構嘻嘻嘻

2021年7月份加入了目前項目組,以一個原汁原味的Java開發工程師的身份進來的,來了沒多久,項目組唯一一名大資料開發工程師要離職了,一時間一大堆的資料需求急需人來接手,此刻又招不來新的資料開發。沒轍,我和同組的另一位Java開發同僚算是臨危受命,接下了大資料方面的工作,開啟了Java工程師從0到1搞大資料的漫長旅途,開始的磕磕碰碰叫苦不堪到如今的還算得心應手,已經整整16個月了,16個月期間雙向支援着資料分析和後端開發的工作,兩者時而穿插時而并行處理,大資料工作占得比重之多,有時讓我懷疑我還是不是一名純粹的Java開發工作者,當我看見假期值班表中我的角色填寫一項變成“B端後端/資料”時,我就知道我已經不純粹了。

1.Sql -- 大資料分析的靈魂

搞大資料究竟每天在做些什麼?坦白講,情況和我想象的不太一樣,因為做大資料開發時最最主要的工作居然寫Sql,曾經我還以為它是有一套刁鑽困難冷門的牛逼技術,将海量資料玩弄于股掌之中。現在看來,我是每天和各種各樣的大資料表打交道,在大資料平台用sql提取出業務方想要的資訊,有時會出各式各樣的資料報表,有時是為C端項目服務,提供底層海量資料計算的支援,有時是為各種資料看闆服務,提供他們想要的銷量排行了、人群覆寫情況了諸類。工作前兩年純粹寫java時也是對sql有所研究的,畢竟資料持久層的互動離不開sql,然後搞了大資料才明白,之前寫的sql都是小兒科,現在一條sql寫上百行那都是常有的事,而且最開始解讀大sql時總是慢半拍,好久才能搞明白前輩留下的交接文檔表達的是什麼,現在不一樣了,看見那些sql都親切很多,很多需求提出來總能迅速想到sql解決的方案,下面呢,我就開始分享一些我在寫大sql時經常會使用的一些文法,這些文法可能針對于隻做Java的人并不會經常性的熟練使用。

1.1with.. as..

with temp1 as (
select * from ... where ..
),
temp2 as (
select * from ... where...
),
...
tempn as (
select * from ... inner join ... where
)
select a.*,b.*,c.* from temp1 a inner join temp2 b on a.id = b.id left join ..tempn c on a.iid = c.iid where ...
           

模闆中的temp1,temp2,tempn都可以看做這個sql執行過程中的臨時表,存在周期僅限于執行這條sql期間,sql執行完畢臨時表也銷毀,并且和其他的sql是互相隔離的,下面的sql都可以使用之前的産生的臨時表(temp2就可以使用temp1的結果),使用with時最後一定跟的select語句,當然,跟的是insert into table ...... select * from也是可以的。

使用with..as..文法大大提高了長Sql的解讀性。

之前一直以為這個HiveSql特有的文法,後來才發現在mysql中也可以使用,隻不過是mysql8.0以後的版本可以使用,之前的版本是沒有這個文法的。

1.2開窗函數:row_number() over(partition by file order by file2 desc/asc)

select  row_number() over(partition by userid order by pay_time desc) as rn,userid,name,order_cd,goods_name,pay_time
from  db_dw.table _order 
having rn = 1
           

這個sql的作用就是找出每個使用者的最新付款的那筆訂單的訂單資訊。

實作思路就是利用開窗函數按照使用者id分組,再按照付款時間倒叙排序,給每組的資料加上一個rn的編号,每組的第一條rn 都等于 1 ,第二條rn = 2,以此類推,再通過having函數将結果中rn = 1的資料全取出來,這樣就能通過單條sql完成取每一個使用者最新一條訂單的資料需求。

1.3開窗函數lag(field, num, defaultvalue) over(partition by ..order by ..) 與 lead() over()

select  lag(pay_time,1,NULL) over(partition by userid order by pay_time asc) as last_pay_time,userid,name,order_cd,goods_name,pay_time
from  db_dw.table _order 

select  lead(pay_time,1,NULL) over(partition by userid order by pay_time asc) as next_pay_time,userid,name,order_cd,goods_name,pay_time
from  db_dw.table _order 
           
  • lag(field, num, defaultvalue),其中fied是要查詢的字段,num是向前取幾行,defaultvalue是取不到值時的預設值。向上面案例中那樣,假設按照userid分組後又按照pay_time排序了,第一個查出來的使用者剛好也有多條不同pay_time的資料,那麼查詢結果應該是第一行資料為last_pay_time為NULL,pay_time為該使用者的最小的時間,第二行資料的last_pay_time等于第一行資料的pay_time值,而pay_time為第二小的時間。
  • lead(field, num, defaultvalue),其中fied是要查詢的字段,num是向後取幾行,defaultvalue是取不到值時的預設值。向上面案例中那樣,假設按照userid分組後又按照pay_time排序了,第一個查出來的使用者剛好也有多條不同pay_time的資料,那麼查詢結果應該是第一行資料為next_pay_time為第二行的pay_time,pay_time為該使用者的最小的時間,第二行資料的next_pay_time等于第三行資料的pay_time值,而pay_time為第二小的時間,而該使用者的最後一行的next_pay_time則為NULL。

1.4case when <條件1> then <結果1> when <條件2> then <結果2> else <剩餘資料的結果> end as 字段名

-- 将使用者年齡按照18歲及以下,18歲至65歲,65歲以上分類
select  case when age<=18 then '未成年' when age>18 and age <=65 then '青中年' else '老年' end as age_group,name,age,sex
from user
           

case when 文法其實就是java語言的if...else if ...else if...else,當滿足條件時就進入該分支,不滿足的話就一直進入下面的分支,最後所有條件都不滿足則進入else分支,通常在Sql中我們使用case when then進行一些歸納分類,譬如我們的電商涉及到的商品種類衆多,可能需要按照某些規則進行分類,就免不了使用該文法。

1.5union,union all

select name  from A
union
select name from B

select name from A
union all
select name from B
           
  • 想去重使用union,不去重完全放一起使用union all
  • 假設A表中某列有重複資料,然後A表和B表進行union,A表中的那列資料自動的去重,不僅僅是把B表中的那列和A表重複的資料去重。像案例中的union後的結果一樣,所得的name不會有一條重複資料,相當于整體的distinct了一下。
  • union 和 union all查詢資料結果隻以第一句sql的字段名稱為準,後續的sql隻按照順序比對,不會識别字段名稱

1.6partition分區使用

-- 建立hive分區表
create table db_demo.tb_demo (
filed1 string comment '字段1',
 filed2 int comment '字段2'
)PARTITIONED BY(l_date string) ;

-- 删除表分區
alter  table db_demo.tb_demo drop if exists partition(l_date = '${v_date}')

--将資料寫入表分區
insert into table db_demo.tb_demo partition(l_date = '${v_date}')
select * from db_demo.tb_demo_v0 where ......

--覆寫指定分區表資料
insert overwrite table db_demo.tb_demo partition(l_date = '${v_date}')
select * from db_demo.tb_demo_v0 where ......
           
  • 分區表指的是在建立表時指定的partition的分區空間。
  • 一個表可以擁有一個或者多個分區,每個分區以檔案夾的形式單獨存在表檔案夾的目錄下。
  • 分區字段會作為表的最後一個字段出現。

1.7JSON處理

-- 取出JSON串中指定key的value值
-- 文法
get_json_object('{key1:value1,key2:value2}','$.key')
--比如取出JSON串中的name資訊
select get_json_object('{"age":1089,"name":"tom"}','$.name')
           

1.8日期函數

-- to_date:日期時間轉日期
select to_date(create_time) from demo_db.demo_table;

-- current_date :目前日期
select current_date

-- date_sub : 傳回日期前n天的日期
select  date_sub(pay_time,9) from demo_db.demo_table

-- date_add : 傳回日期後n天的日期,即使放入時間參數,得到的也是日期,上一個同理,隻比較日期位。
select  date_add(pay_time,9) from demo_db.demo_table

-- unix_timestamp:擷取目前unix時間戳
select unix_timestamp('2022-10-10 10:22:11')

-- datediff:傳回開始日期減去結束日期的天數,隻比較日期位
select datediff('2022-10-10 23:22:11','2022-10-09 00:22:11')

-- 擷取目前月
select substr(current_date,1,7);

--擷取上個月最後一天
select DATE_SUB(FROM_UNIXTIME(UNIX_TIMESTAMP()),DAY(FROM_UNIXTIME(UNIX_TIMESTAMP())))
           

1.9炸裂函數

Hive版本:
select 
id,
type_id_new 
from table_one
lateral view explode(split(type_id,",")) table_one_temp as type_id_new
;

Mysql版本:
SELECT
   a.id, substring_index(substring_index(a.type_id,',',b.help_topic_id + 1    ),    ',' ,- 1    ) AS type_id
FROM
    (select id, type_id from table_one) a  
JOIN mysql.help_topic b ON b.help_topic_id <
(length(a.type_id) - length( replace(a.type_id, ',', '')  ) + 1)

           

簡而言之,炸裂函數從命名上就可以看出,這是一個由1到多的過程,由一個裂變成多個。具體場景大概是某條資料的某個字段裡面存放的是被相同符号分割的字元串,我們暫時用逗号分割來講述,拿我們案例來講,假設一條資料的id = 1,type_id = 1,2,3 ,通過以上的炸裂函數處理之後,該條查詢結果将變成3條,分别為id=1、type_id_new =1,id=1、type_id_new =2,id=1、type_id_new =3,也就是被炸裂的字段資料分割,剩餘字段全部保持不變。

1.10 COALESCE ( expression,value1,value2……,valuen)

select  coalesce(demo_id1,demo_id2,demo_id3) from demo_db.demo_table  ;

select  coalesce(case when demo_name like '%傑倫%' then '傑粉' when demo_name like '%許嵩% ' then '嵩鼠' else '小泷包' end,
                demo_name2);

           

coalesce函數其實就是找到第一個不為NULL的表達式,将其結果傳回,假設全部為NULL,最後隻能傳回NULL,從我以上案例可以看出來,每個參數不僅僅可以寫字段,也可以嵌入其他的表達式,像第二行嵌入了一串case when then,那也僅僅是一個參數而已。

注意點,coalesce函數隻是判斷是否為NULL,它不會判斷空串,假設第一個不為NULL的參數為空串‘’,那麼它也會将這個空串當做有值查出來的。

1.11 group by field1,field2 having

select  year,sex,goods_name,sum(goods_number) as num  
from demo_db.demo_table
group by year,sex,goods_name
having  num>1000

           

group by 用法确實比較常見,寫在這裡也是因為平時做資料統計基本每次都會用得到,想着寫上吧顯得沒什麼技術含量,不寫又對不起這個好用的聚合文法,以上的demo呢便是統計每一年男女各對每種商品的購買量是多少,并把銷量在1000以上的資料找出來,這個寫法便是先對年份分組,再對性别分組,然後又對商品名稱分組,分好組後便使用sum函數對商品銷量進行求和。

1.12随機抽樣:distribute by rand() sort by rand()

select * from ods_user_bucket_log distribute by rand() sort by rand() limit 10;

           

rand函數前的distribute和sort關鍵字可以保證資料在mapper和reducer階段是随機分布的,像用例中寫的那樣就是随機抽樣取10條資料。

1.13 A left join B on a.field1= b.filed1 where B.field1 is null

select a.*
from demo_db.demo_tableA a
left join demo_db.demo_tableB b
on a.demo_id = b.demo_id where b.demo_id is null

           

left join 這個寫法一讓我拿出來介紹屬實讓人笑掉大牙,左連接配接麼,誰不會,左表全查白。當然,我列出來這個當然不是通俗的告訴大家一下我會左連接配接哎,我好棒棒啊,其實就是因為平時工作時總是需要把某表A和另一表B中做比較,将A中存在的屬于B的部分給排除掉,此刻,我上述寫的萬能公式便能用到了,别笑,文法簡單,但是遇到這種情況時,不經常寫Sql的人可能都想不太到。

再補充一個知識點,在使用連接配接時,主表越小,那麼查詢效率越高,是以如果遇到一些inner join場景,主表次表換一下位置對查詢結果沒影響的話,可以記着将資料量字段量小的表放在主表位置上。

1.14 建立函數調用Jave-Jar中的方法

--文法
create temporary function 方法名 as 'java類的全限定名' using jar 'jar包在hdfs上的位置';

--案例
create temporary function decryption as 'com.zae.aes.Decrypt' using jar 'hdfs://namenodeha/user/zae/secret/decryption_demo.jar';

           

1.15 擺出一條大SQL看看

JAVA開發搞了一年多大資料的總結

這個是最近寫的一條中等規模大小的SQL吧,隻有50行左右而已,其他太長的也不好截屏,裡面就用到了一些前面講述的SQL文法,當然,這個SQL的業務場景需求我就不再贅述了,因為原本的SQL已經被我大批量的用新随便定義的表名和字段給替換了,目前已經面目全非了,畢竟不能暴露公司的一些業務的東西吧,之是以粘出來還是想實際的介紹下我前面那14條是怎樣的結合着嵌入到一個SQL中的,随便看看就好,不需要深究其意思。

2.Presto/Spark/Mapreduce 計算引擎對比

​ 平時一直使用大資料平台進行一些資料的處理,在執行查詢語句時,是可以選擇使用Presto,Spark,MapReduce不同的計算引擎進行工作,坦白講,初次接觸時也沒搞明白它們的差別是什麼,隻知道有些SQL放在Presto引擎上執行準報錯,但是放在Spark上就不會報錯,它們的文法還是有差異的,presto沒有spark内嵌的函數多。據資料前輩給交接介紹時,大緻就說,如果是單表查詢,查詢一些單表的資料量,聚合分組諸類,就使用Presto,相對來講是比較快的;但是要同時使用到多個大資料表查詢,那就使用Spark和MR是比較快些的;另外,Spark和MR相比,Spark運算速度應該有一些優勢,但是遇到了特别特别大的計算量級時,資源再不夠用,那麼可能就會發生一些job abort,time out等比較讓人牙疼的報錯,畢竟這些報錯不是由于SQL本身的編寫出現的問題,而是和資源不夠用相關,而且往往出現這個問題都是發生在SQL運作很久之後,記着我剛接手大資料沒一個月時,曾經寫了一條SQL執行了三個小時,最後給我了報了個time out,氣得我沒把鍵盤摔了!反過來看,使用MR的話好像很少會發生以上陳述問題,它可能會慢些,但它最後一定會不辱使命幫你執行完畢。

2.1 Presto

​ Presto我講不了特别深,畢竟我平時對于它的使用也僅僅是選擇了這個引擎,然後執行了我寫下的單表執行的SQL,不過有一點可以确定,他對你填寫的類型的要求是苛刻的,比如假設你定義了一個字段叫做user_id,給定的它的類型為string(不是誤寫,在hive中就是定義為string,你可以了解為mysql中的varchar類型),于是你寫了一條SQL:select * from demo_db.demo_user where user_id = 1001,那麼它對于presto引擎執行那将會報錯的,因為他檢查文法時會發現你輸入的1001是個整型數值,和string類型不比對,但是對于Spark引擎執行時就不會出這個問題,我覺得Spark底層應該是對1001做了轉化,将這個整型數值1001轉化為了字元串‘1001’,故可以去做正常的查詢。下面我将整理幾條關于Presto的介紹吧放在這裡,也是我從各類網站中了解來的,可能對實際開發用處不大,但起碼我們知道自己用了個什麼計算引擎吧。

  • Presto是一個facebook開源的分布式SQL查詢引擎,适用于互動式分析查詢,資料量支援GB到PB位元組
  • Presto是一款記憶體計算型的引擎,是以對于記憶體管理必須做到精細,才能保證query有序、順利的執行,部分發生餓死、死鎖等情況。也正是因為它是基于記憶體計算的,它的速度也是很快的。
  • Presto采用典型的master-slave模型,master主要負責對從節點的一些管理以及query的解析和排程,而slave則是負責一些計算和讀寫。

2.2 Spark

​ Spark是一個并行計算架構,适用于大規模的資料處理。Spark也是一個基于記憶體的計算引擎,專門解決大資料的分布式問題,它是hadoop的一個補充,是以可以在Haddop檔案系統中并行運作。

​ 平時使用最多的還是SparkSql,用于一些查詢,或者是搭建一些定時Job,使用SparkSql元件去完成對資料的抽取、轉換、加載,但是查閱了相關介紹,Spark的應用遠遠不止這些,它還有一些流計算、機器學習、圖計算的場景應用,下面有一段關于Spark應用場景介紹,是從<https://help.aliyun.com/document_detail/441938.html>中看來了,我就不用我笨拙的語言來編排了,放在下面供大家探讨一下:

  • 離線ETL
  • 離線ETL主要應用于資料倉庫,對大規模的資料進行抽取(Extract)、轉換(Transform)和加載(Load),其特點是資料量大,耗時較長,通常設定為定時任務執行。
  • 線上資料分析(OLAP)
  • 線上資料分析主要應用于BI(Business Intelligence)。分析人員互動式地送出查詢作業,Spark可以快速地傳回結果。除了Spark,常見的OLAP引擎包括Presto和Impala等。Spark 3.0的主要特性在EMR中的Spark 2.4版本已支援,更多特性詳情請參見Spark SQL Guide。
  • 流計算
  • 流計算主要應用于實時大屏、實時風控、實時推薦和實時報警監控等。流計算主要包括Spark Streaming和Flink引擎,Spark Streaming提供DStream和Structured Streaming兩種接口,Structured Streaming和Dataframe用法類似,門檻較低。Flink适合低延遲場景,而Spark Streaming更适合高吞吐的場景,詳情請參見Structured Streaming Programming Guide。
  • 機器學習
  • Spark的MLlib提供了較豐富的機器學習庫,包括分類、回歸、協同過濾、聚合,同時提供了模型選擇、自動調參和交叉驗證等工具來提高生産力。MLlib主要支援非深度學習的算法子產品,詳情請參見Machine Learning Library (MLlib) Guide。
  • 圖計算
  • Spark的GraphX支援圖計算的庫,支援豐富的圖計算的算子,包括屬性算子、結構算子、Join算子和鄰居聚合等。詳情請參見GraphX Programming Guide。

2.3 MapReduce

​ Hadoop MapReduce:一個分布式的離線并行計算架構,它是Hadoop全家桶的一部分,它的思想是分而治之,也就是說将一個大的複雜的問題切割成一個個小的問題加以解決,最後再彙總,這從MapReduce的字面就可以看出來。MapReduce處理任務過程是分為兩個階段的:

  • Map階段:Map階段的主要作用是“分”,即把複雜的任務分解為若幹個“簡單的任務”來并行處理。Map階段的這些任務可以并行計算,彼此間沒有依賴關系。
  • Reduce階段:Reduce階段的主要作用是“合”,即對map階段的結果進行全局彙總。

下面從一張圖例來看下MapReduce計算的處理過程:

JAVA開發搞了一年多大資料的總結

3.由資料同步想到的

​ 所謂大資料,必然和形形色色的資料表打交道,但是要清楚一點,對于一個規模還算可以的企業來講,那下面的項目組肯定是一片一片的,他們之間的資料沒有辦法做到百分之百的共享,有時你想要的去做一些資料的分析,可能就需要其他項目組甚至第三方企業的支援,從别的管道去拿到資料進行使用,是以,資料同步接入變成了大資料開發中必不可少的工作。

​ 其實資料同步接入的方式有很多,如果有資料庫的權限,可以直接使用大資料平台自帶的同步元件,編寫一定的規則,配置好接入的頻率,将資料接入過來;如果是第三方外部企業的資料,為了安全起見,我們通常也會選擇接口的方式進行資料的接入,再同步至大資料平台;當然,使用消息中間件也是很不錯的方式,比如Kafka,但是這東西總歸有些嚴格意義上的限制,很多企業為了安全是不會對外暴露自身的Kafka服務位址的;還有一些資料量過大的情況,可以考慮sftp伺服器的方式,直接将資料上傳到指定的伺服器的檔案夾裡,不過這個總歸有些依賴于手工支援的弊端在裡面,不過據說好像也可以編寫腳本完成自動化的上傳和拉取,對于我這個搞Java的來講,這方面的解決政策還是不太懂的;其他方法也可以使用DolphinScheduler(DS)裡面的一些小元件,去執行一些腳本來完成同步,當然腳本的編寫就類似于hadoop distcp -update hdfs://主機名/源資料路徑 hdfs://主機名/目标資料路徑,這是将資料表從hdfs的一個檔案目錄下複制到指定位置,同樣,flinkX也支援類似的功能。

​ 總之,方案不少,具體場景具體分析,資料同步的問題也有很多,比如上遊資料源斷了,導緻目标日期的資料沒有過來;使用的同步的伺服器當機了,那時候就需要詳細的排查了,盡快将資料同步修複。

4.任重道遠,仍需砥砺前行

​ 我清楚,要是徹頭徹尾的搞明白大資料,除了會寫寫複雜SQL是遠遠不夠的,我記着有些歸類中将ES和Kafka也作為大資料開發的範疇,當然,這兩塊的知識點我也是有所涉獵的,隻不過是Java後端代碼中使用的,也許這兩塊還有其他用法可以用于大資料,比如結合着Scala語言使用。Scala語言是函數式程式設計,因為在Java方向已經沉浸多年,是以看了幾天scala語言的文法也沒有那麼抗拒,都大緻了解了下,但是了解文法和實際使用這門語言進行工作上的開發又是另外一回事,由于各種原因沒有深入的去研究下去略表遺憾。 總的來講,目前我還是比較喜歡java的,但是因為最近這一年裡也做了不少大資料相關工作,是以總覺得不為它寫一篇部落格總歸對不起這一年的收獲,是以還是找個地方記錄下來吧,将來有一天如果我在java方向鑽研透了,想再探索大資料的廣袤無垠時,我想,我會認認真真系系統統的去學一遍,像scala,spark,flink,hadoop他們深層次技術,我一定要每一個都好好品嘗下。

原文連結:https://www.cnblogs.com/zaevn00001/p/16863255.html