試題總體概述
面試題包含時下流行的多個大資料工具和概念(spark、sparkStreaming、kafka、hadoop、hive、hbase、redis、flume、sqoop、zk、azkaban、kylin、Elatic Search)的原理介紹、實戰總結、調優方式等,我會不間斷的更新,維護,希望可以對正在找大資料工作的朋友們有所幫助.
第一章目錄
第一章 通用
1.1 資料傾斜
當某個job長時間運作沒有結束,可能發生了資料傾斜。
1.1.1 hive
設定map端聚合和二次group by保證reduce資料大概平均,然後再設定reduce數量減少每個reduce的資料量
盡量少用distinct,不僅吃不到map端聚合(distinct原理是全局排序去重),而且多個distinct也吃不到二次group的優化。
如果group by 多個字段,或者其它二次group失效的情況,可以走下方spark的解決方案,将hql分多個hql來做。
1.1.2 Spark
比起hive來說,spark對資料傾斜的優化可以更細一些(也可以說更麻煩些)。
- 先用sample(false,0.x)采樣key,找出傾斜的key
- 把資料集拆成傾斜的部分和不傾斜的部分,不傾斜的部分走正常流程
- 傾斜的部分key前面加上一個定長的随機字元串,然後執行重分區
- 重分區後進行一個聚合操作,然後去除定長字首再聚合一次。
- 如果是大表join大表,其中一個表有資料傾斜,就需要用膨脹法,将傾斜部分的key加上一個0-n的字首,一條資料膨脹成n條,然後将另一個表的這部分key也加上相應的字首,然後單獨對這部分資料進行一次雙重聚合,與不傾斜的資料進行union操作,完成聚合。
- 空值看作是特殊的key,空值多了一樣用3的方法去解決。
1.2 海量資料
1.2.1 如何調優
1.抽樣檢測分組字段,看看是否有傾斜
2.如果沒有傾斜,就正常增加reduce數量,設定中間ORC+SNAPPY壓縮
3.如果有傾斜,把傾斜的部分過濾出來加字首打散處理,不傾斜的部分正常處理。
4.如果是大表join小表,大表傾斜,可以使用map端join方法,小表傾斜直接無視。
5.如果是大表join大表,某表傾斜,可以使用膨脹法處理。
1.2.2 實時排序
跳表法快速排序
将值域劃分區間建有序連結清單,原本海量的周遊會被壓縮N倍。每個值域對應一個有序連結清單,是以是雙重連結清單。
前提:除了跳表外還有一張作品表,以作品id為key,value含分數
值變更時,先查詢作品表更新原值,通過原值到對應區間的連結清單中删除該值,新值插入對應區間的連結清單中。
求排名:1.每個區間連結清單除了儲存作品和分數外,還額外維護一個區間總長
2.每次新值插入時,都将原區間總長-1,新區間總長+1
3.求排名時周遊第一層連結清單将區間總長累加,然後周遊該值所在的區間求出區間内排名,兩者相加。
求TopN:第二層連結清單一路往下走N個。
缺點:事實上所有針對資料直接排序的方式都存在鎖問題,每次更新隻能有一個線程做,否則必然會亂。
平衡二叉樹排序
假設總分為5 [0,5]
1.将總分一分為二 [0,2],[3,5]
2.将劃分的部分繼續二分 [0,1],[1,2],[3,4],[4,5]
3.重複2直到劃分到底 [0,0],[1,1],[2,2],[3,3]...
采用連結清單的方式從頂點往下一路關聯起來,每個區間key對應一個人數value。
假設有某個值從2變成3。
先從頂點開始往下周遊查找[2,2],周遊過程中經過的節點人數全部-1,終點[2,2]的人數也-1;
然後從頂點開始往下周遊查找[3,3],經過的節點人數全部+1。終點也+1。
求排名:假設要求分數為3的排名,從起點開始往下查找[3,3],如果是往左走,就把右值累加上,如果往右走則不加。
優點:毫秒級響應排名,更新不需要鎖表,因為是單純的加減日志。
缺點:消耗大量記憶體存儲,每次更新都要改變N個值,每次更新值時還需要把排名添加到TopN表裡。
1.2.3 求交集
理論:
如果是k-v形式的資料,或者是可以提取key的資料,可以使用分組法比較。
假設我有x G記憶體,要求A,B的交集。
先把A,B分别分組拆分成多個小檔案,每個小檔案的大小為(x-300)/2,超過的部分再水準拆分成多個檔案。
然後按分組讀入A,B的資料,在記憶體中比較,求出交集部分。
如果分組太細,就按分組key的哈希進行分區,一次比較多個分組。
為了友善小檔案的合并,可以在輸出時将資料排序輸出,建立索引檔案,後續可以采用歸并排序的方式快速合并。
如果不能提取key,則按照hash分組。
實戰:假設已經把兩張表要求交集的部分字段拼接成"key"
select * from A join B on A.key = B.key
1.2.4 求差集
select * from A full outer join B on A.key = B.key where A.key is null or B.key is null;
1.3 布隆過濾器 原理
将字元串用哈希函數轉換為一個或多個整型值,将bit型數組中對應位置上的0改為1。判斷該字元串是否存在時,隻需要判斷這些位置上的值是否都為1,如果不是就說明一定不存在。但是反過來不能說明一定存在。
如:abc 轉換為3和5,就将arr[3]和arr[5]的值設定為1,隻要這兩個值都為1,就說明abc可能存在,如果它們不全為1,可以保證abc一定不存在。
1.4 存儲格式
TextFile
預設格式,按行存儲,可以壓縮但壓縮檔案不支援分片,反序列化開銷是SequenceFile的幾十倍(需要判斷分隔符和換行符)
SequenceFile
hadoop原生支援,将kv以二進制方式按行存儲,壓縮後的檔案支援壓縮。預設以record壓縮,可以改為block性能更好。壓縮率很低,查詢速度一般。
RCFile
按行分塊、按列存儲的存儲方式,反序列化速度較慢,但壓縮率和查詢速度最快。
ORC file
RC的改良版,每個Task輸出單檔案、存儲索引、支付複雜類型、支援塊壓縮、可以直接讀取,ORC比RC高效很多。
Parquet
列式存儲,是spark的預設存儲格式,壓縮和查詢性能比ORC稍差,但是支援的編碼更多,而且對嵌套式結構支援的更好(json)。
是以對結構化數倉來說ORC file格式更好,對靈活的spark作業來說Parquet格式更好。
1.5 壓縮格式
上面雖然提到了壓縮比,但隻不過是相對于純文字,在資料的存儲方式上帶來的資料量減少,并不是真正的壓縮方式。
下方介紹在這些存儲方式之上進一步減少資料量的壓縮方式。
gzip
spark預設壓縮方式,hadoop原生支援,壓縮比率很高、壓縮和解壓速度很快,支援純文字式編輯,使用友善,但是壓縮後不支援分片,是以适用于1個塊内大小的檔案。
lzo
hadoop流行的壓縮方式,需要安裝,壓縮解壓速度較快,壓縮率适中,建立索引後支援分片,壓縮單個大檔案時可以使用。
snappy
高速壓縮和解壓,壓縮率較低,需要安裝,不支援分片,推薦作為臨時資料的壓縮方式。
bzip2
非常高的壓縮率,但解壓速度很慢,支援分片。hadoop本身支援,但本地庫不支援。一般和lzo選其中一個作為資料源的壓縮格式。
1.6 手寫算法
1.6.1 四大排序
速度上:改良的歸并排序算法是n-nlogn,快速排序的時間複雜度是nlogn-n^2,冒泡和選擇都是n^2。
穩定性:快速、選擇都不穩定,冒泡和歸并都穩定
空間上:冒泡和選擇都是1,快速為logn,歸并為n
/**
* 一個完整的快速排序的方法
* 可以傳入任意類型的buffer對其排序後傳回
* 可以傳入一個比較器對自定義類型排序
*
* @author 孤星魅影
* @param buffer 任意類型的buffer
* @param isASC 是否正序排序
* @param ev 自定義比較器
* @return 傳回一個快速排序後的buffer
*/
def quickSort[T: ClassTag](buffer: mutable.Buffer[T])(implicit isASC: Boolean = true, ev: T => Comparable[T]): mutable.Buffer[T] = {
//為null、為空、長度為1時都傳回buffer
if (buffer == null || buffer.length <= 1)
return buffer
//1.将數組的第一個數head與後面所有的數進行比較,左邊放比head小的數,右邊放比head大的數(從小到大)
//2.遞歸執行拆分,直到數組隻剩下1個
//3.按順序将左、中、右三者拼接起來,完成排序
val (left, right) = buffer.tail.partition(t => {
if (isASC) t.compareTo(buffer.head) < 0
else t.compareTo(buffer.head) > 0
})
quickSort(left) += buffer.head ++= quickSort(right)
}
1.6.2 懶漢式單例
scala
object{
lazy val obj = new T()
}
java
class T{
private static T obj = null;
private T(){}
//先執行一次null檢驗,然後上鎖再執行一次null檢驗,防止同時有兩個線程通過了第一次null檢驗導緻異常。
public T getObj()={
if(obj==null){
sysconized(obj){
if(obj == null)
obj = new T();
}
}
return obj;
}
}
1.6.3 ip轉int
a,b,c,d = a*256*256*256+b*256*256+c*256+123
ip是256進制,是以a需要*3個256才能保證每個bcd不重複,b,c,d同理,使用位計算效率更高:*256相當于<<8
提供一段scala代碼:
def ip2long(ip:String)=ip.split(".").reverse.zipWithIndex.map(v=>v._1<<(8*v._2)).reduce(_+_)
可以提取轉換後的數字的前三位進行分組查詢,效率更高:
ip.groupby(_.substring(0,3))
1.6.4 求質數
/**
* 求min - max以内的所有質數
* 隻需要判斷一個數是否能被自身以外的大于1的質數數整除就可以了
* @return
*/
def countPrime(min:Int,max:Int):Array[Int]={
val arr = mutable.Set[Int](1)
(min to max).foreach(index=> {
if(!arr.exists(prime=>{
//某個數如果能被除自己和1以外的質數整除,說明該數不是質數
if(prime>1 && index != prime && index % prime == 0) true
else false
}))
arr += index
})
arr.toArray.sorted
}
1.6.5 判斷平衡二叉樹
原理:最底層的左節點和右節點相差為1(左右子樹相差為1),例子可以看上方海量資料實時排序部分
方法:從根節點開始,判斷左子樹與右子樹是否差1,然後對左右子樹不斷遞歸這個判斷直到沒有子樹。
1.6.6 Java 生産者和消費者
package third;
import static java.lang.Thread.sleep;
public class SellShaoBing {
private int shaoBingAmount=0;
private boolean isStop = false;
private int maxAmount;
public SellShaoBing(int max){
maxAmount = max;
}
private Thread createProductThread(){
return new Thread(()->{
while (!isStop) {
synchronized (this) {
//留個緩沖,做滿之前就開始賣
if (shaoBingAmount > maxAmount - 5) notifyAll();
waitMethod(shaoBingAmount == maxAmount);
sleepMethod();
System.out.println("做燒餅");
shaoBingAmount++;
System.out.println("目前燒餅數量"+shaoBingAmount);
}
}
});
}
private Thread createConsumerThread(){
return new Thread(()->{
while (!isStop) {
synchronized (this) {
//留個緩沖,燒餅賣完之間就通知生産
if (shaoBingAmount < maxAmount /4) notifyAll();
waitMethod(shaoBingAmount == 0);
sleepMethod();
System.out.println("賣燒餅");
shaoBingAmount--;
System.out.println("目前燒餅數量"+shaoBingAmount);
}
}
});
}
public void actions(){
Thread product = createProductThread();
Thread consumer = createConsumerThread();
consumer.start();
product.start();
try {
sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
isStop = true;
}
private synchronized void waitMethod(boolean iswait){
if (iswait) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
private void sleepMethod(){
try {
sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
1.6.7 Scala 生産者和消費者(推薦)
package utils
//賣燒餅(Java生産者消費者的Scala實作)
case class ShaoBing(var amount:Int=0,maxAmount:Int=30,isContinue:Boolean=true)
object ShaoBing{
/**
* 燒餅線程模闆
* @param isNotify 傳入一個喚醒所有線程的判斷方法
* @param isWait 傳入一個使目前線程等待的方法
* @param doMethod 傳入一個增減燒餅數量的方法
* @param shaoBing 傳入一個燒餅對象
*/
def createThread(isNotify:ShaoBing => Boolean , isWait:ShaoBing => Boolean , doMethod:ShaoBing => Unit)(shaoBing:ShaoBing)={
while (shaoBing.isContinue){
//notify和wait都需要在同步代碼塊中才能使用,java 的 synchronized(obj){...} = scala 的 obj.synchronized{...}
shaoBing.synchronized{
if (isNotify(shaoBing)) shaoBing.notifyAll()
//傳入等待條件,使線程等待
if (isWait(shaoBing)) shaoBing.wait()
//增加或減少燒餅數量時才上鎖
doMethod(shaoBing)
}
println("目前燒餅數量:"+shaoBing.amount)
Thread.sleep(1000)
}
}
//做燒餅
val createProducerThread = createThread(
//做滿之前就開始賣
isNotify = shaoBing=>shaoBing.amount > shaoBing.maxAmount-5,
//做滿之後就等
isWait = shaoBing=>shaoBing.amount == shaoBing.maxAmount,
_.amount += 1) _
//賣燒餅
val createConsumerThread = createThread(
//賣到剩四分之一時候提前開始生産燒餅
isNotify = shaoBing=>shaoBing.amount < shaoBing.maxAmount/4,
//賣光了就停止賣燒餅
isWait = shaoBing=>shaoBing.amount == 0,
_.amount -= 1) _
//啟動生産者和消費者
def main(args:Array[String]):Unit={
val shaoBing = ShaoBing()
val producer = new Thread(new Runnable {override def run(): Unit = createProducerThread(shaoBing)})
val consumer = new Thread(new Runnable {override def run(): Unit = createConsumerThread(shaoBing)})
producer.start()
consumer.start()
}
}
1.7 線程模型
IO共有四種模型:同步阻塞、同步非阻塞、異步阻塞、異步非阻塞
同步阻塞:系統核心做好讀寫資料的準備之前,使用者線程一直等待。
同步非阻塞:使用者線程使用一個循環不斷詢問核心是否準備就緒,在準備就緒以前會一直進行該循環。
異步阻塞:使用者線程啟動一個新的線程去同步阻塞,自己則做下一件事,新的線程不斷詢問系統核心并阻塞。
異步非阻塞:當使用者線程收到通知時,資料已經被核心讀取完畢,并放在了使用者線程指定的緩沖區内,核心在IO完成後通知使用者線程直接使用即可