天天看點

大資料筆試真題——第一章:通用面試題

試題總體概述

面試題包含時下流行的多個大資料工具和概念(spark、sparkStreaming、kafka、hadoop、hive、hbase、redis、flume、sqoop、zk、azkaban、kylin、Elatic Search)的原理介紹、實戰總結、調優方式等,我會不間斷的更新,維護,希望可以對正在找大資料工作的朋友們有所幫助.

第一章目錄

大資料筆試真題——第一章:通用面試題

第一章 通用

1.1 資料傾斜

當某個job長時間運作沒有結束,可能發生了資料傾斜。

1.1.1 hive

設定map端聚合和二次group by保證reduce資料大概平均,然後再設定reduce數量減少每個reduce的資料量

盡量少用distinct,不僅吃不到map端聚合(distinct原理是全局排序去重),而且多個distinct也吃不到二次group的優化。

如果group by 多個字段,或者其它二次group失效的情況,可以走下方spark的解決方案,将hql分多個hql來做。

1.1.2 Spark

比起hive來說,spark對資料傾斜的優化可以更細一些(也可以說更麻煩些)。

  1. 先用sample(false,0.x)采樣key,找出傾斜的key
  2. 把資料集拆成傾斜的部分和不傾斜的部分,不傾斜的部分走正常流程
  3. 傾斜的部分key前面加上一個定長的随機字元串,然後執行重分區
  4. 重分區後進行一個聚合操作,然後去除定長字首再聚合一次。
  5. 如果是大表join大表,其中一個表有資料傾斜,就需要用膨脹法,将傾斜部分的key加上一個0-n的字首,一條資料膨脹成n條,然後将另一個表的這部分key也加上相應的字首,然後單獨對這部分資料進行一次雙重聚合,與不傾斜的資料進行union操作,完成聚合。
  6. 空值看作是特殊的key,空值多了一樣用3的方法去解決。

1.2 海量資料

1.2.1 如何調優

1.抽樣檢測分組字段,看看是否有傾斜
  2.如果沒有傾斜,就正常增加reduce數量,設定中間ORC+SNAPPY壓縮
  3.如果有傾斜,把傾斜的部分過濾出來加字首打散處理,不傾斜的部分正常處理。
  4.如果是大表join小表,大表傾斜,可以使用map端join方法,小表傾斜直接無視。
  5.如果是大表join大表,某表傾斜,可以使用膨脹法處理。
           

1.2.2 實時排序

跳表法快速排序

将值域劃分區間建有序連結清單,原本海量的周遊會被壓縮N倍。每個值域對應一個有序連結清單,是以是雙重連結清單。
  前提:除了跳表外還有一張作品表,以作品id為key,value含分數
  值變更時,先查詢作品表更新原值,通過原值到對應區間的連結清單中删除該值,新值插入對應區間的連結清單中。
  求排名:1.每個區間連結清單除了儲存作品和分數外,還額外維護一個區間總長
          2.每次新值插入時,都将原區間總長-1,新區間總長+1
          3.求排名時周遊第一層連結清單将區間總長累加,然後周遊該值所在的區間求出區間内排名,兩者相加。
          
  求TopN:第二層連結清單一路往下走N個。
  缺點:事實上所有針對資料直接排序的方式都存在鎖問題,每次更新隻能有一個線程做,否則必然會亂。
           

平衡二叉樹排序

假設總分為5                 [0,5]
  1.将總分一分為二            [0,2],[3,5]
  2.将劃分的部分繼續二分      [0,1],[1,2],[3,4],[4,5]
  3.重複2直到劃分到底         [0,0],[1,1],[2,2],[3,3]...
  ​
  采用連結清單的方式從頂點往下一路關聯起來,每個區間key對應一個人數value。
  假設有某個值從2變成3。
  先從頂點開始往下周遊查找[2,2],周遊過程中經過的節點人數全部-1,終點[2,2]的人數也-1;
  然後從頂點開始往下周遊查找[3,3],經過的節點人數全部+1。終點也+1。
  求排名:假設要求分數為3的排名,從起點開始往下查找[3,3],如果是往左走,就把右值累加上,如果往右走則不加。
  優點:毫秒級響應排名,更新不需要鎖表,因為是單純的加減日志。
  缺點:消耗大量記憶體存儲,每次更新都要改變N個值,每次更新值時還需要把排名添加到TopN表裡。
           

1.2.3 求交集

理論:
  如果是k-v形式的資料,或者是可以提取key的資料,可以使用分組法比較。
  假設我有x G記憶體,要求A,B的交集。
  先把A,B分别分組拆分成多個小檔案,每個小檔案的大小為(x-300)/2,超過的部分再水準拆分成多個檔案。
  然後按分組讀入A,B的資料,在記憶體中比較,求出交集部分。
  ​
  如果分組太細,就按分組key的哈希進行分區,一次比較多個分組。
  為了友善小檔案的合并,可以在輸出時将資料排序輸出,建立索引檔案,後續可以采用歸并排序的方式快速合并。
  ​
  如果不能提取key,則按照hash分組。
  ​
  實戰:假設已經把兩張表要求交集的部分字段拼接成"key"
  select * from A join B on A.key = B.key
           

1.2.4 求差集

select * from A full outer join B on A.key = B.key where A.key is null or B.key is null;
           

1.3 布隆過濾器 原理

将字元串用哈希函數轉換為一個或多個整型值,将bit型數組中對應位置上的0改為1。判斷該字元串是否存在時,隻需要判斷這些位置上的值是否都為1,如果不是就說明一定不存在。但是反過來不能說明一定存在。

如:abc 轉換為3和5,就将arr[3]和arr[5]的值設定為1,隻要這兩個值都為1,就說明abc可能存在,如果它們不全為1,可以保證abc一定不存在。

1.4 存儲格式

TextFile

預設格式,按行存儲,可以壓縮但壓縮檔案不支援分片,反序列化開銷是SequenceFile的幾十倍(需要判斷分隔符和換行符)

SequenceFile

hadoop原生支援,将kv以二進制方式按行存儲,壓縮後的檔案支援壓縮。預設以record壓縮,可以改為block性能更好。壓縮率很低,查詢速度一般。

RCFile

按行分塊、按列存儲的存儲方式,反序列化速度較慢,但壓縮率和查詢速度最快。

ORC file

RC的改良版,每個Task輸出單檔案、存儲索引、支付複雜類型、支援塊壓縮、可以直接讀取,ORC比RC高效很多。

Parquet

列式存儲,是spark的預設存儲格式,壓縮和查詢性能比ORC稍差,但是支援的編碼更多,而且對嵌套式結構支援的更好(json)。

是以對結構化數倉來說ORC file格式更好,對靈活的spark作業來說Parquet格式更好。

1.5 壓縮格式

上面雖然提到了壓縮比,但隻不過是相對于純文字,在資料的存儲方式上帶來的資料量減少,并不是真正的壓縮方式。

下方介紹在這些存儲方式之上進一步減少資料量的壓縮方式。

gzip

spark預設壓縮方式,hadoop原生支援,壓縮比率很高、壓縮和解壓速度很快,支援純文字式編輯,使用友善,但是壓縮後不支援分片,是以适用于1個塊内大小的檔案。

lzo

hadoop流行的壓縮方式,需要安裝,壓縮解壓速度較快,壓縮率适中,建立索引後支援分片,壓縮單個大檔案時可以使用。

snappy

高速壓縮和解壓,壓縮率較低,需要安裝,不支援分片,推薦作為臨時資料的壓縮方式。

bzip2

非常高的壓縮率,但解壓速度很慢,支援分片。hadoop本身支援,但本地庫不支援。一般和lzo選其中一個作為資料源的壓縮格式。

1.6 手寫算法

1.6.1 四大排序

速度上:改良的歸并排序算法是n-nlogn,快速排序的時間複雜度是nlogn-n^2,冒泡和選擇都是n^2。

穩定性:快速、選擇都不穩定,冒泡和歸并都穩定

空間上:冒泡和選擇都是1,快速為logn,歸并為n

/**
    * 一個完整的快速排序的方法
    * 可以傳入任意類型的buffer對其排序後傳回
    * 可以傳入一個比較器對自定義類型排序
    *
    * @author 孤星魅影
    * @param buffer 任意類型的buffer
    * @param isASC  是否正序排序
    * @param ev     自定義比較器
    * @return 傳回一個快速排序後的buffer
    */
  def quickSort[T: ClassTag](buffer: mutable.Buffer[T])(implicit isASC: Boolean = true, ev: T => Comparable[T]): mutable.Buffer[T] = {
    //為null、為空、長度為1時都傳回buffer
    if (buffer == null || buffer.length <= 1)
      return buffer
    
    //1.将數組的第一個數head與後面所有的數進行比較,左邊放比head小的數,右邊放比head大的數(從小到大)
    //2.遞歸執行拆分,直到數組隻剩下1個
    //3.按順序将左、中、右三者拼接起來,完成排序
    val (left, right) = buffer.tail.partition(t => {
      if (isASC) t.compareTo(buffer.head) < 0
      else t.compareTo(buffer.head) > 0
    })
    quickSort(left) += buffer.head ++= quickSort(right)
  }
           

1.6.2 懶漢式單例

scala
  object{
    lazy val obj = new T()
  }
  ​
  java
  class T{
    private static T obj = null;
    private T(){}
    
    //先執行一次null檢驗,然後上鎖再執行一次null檢驗,防止同時有兩個線程通過了第一次null檢驗導緻異常。
    public T getObj()={
      if(obj==null){
        sysconized(obj){
          if(obj == null)
            obj = new T();
        }
      }
      return obj;
    }
  }
           

1.6.3 ip轉int

a,b,c,d = a*256*256*256+b*256*256+c*256+123
  ip是256進制,是以a需要*3個256才能保證每個bcd不重複,b,c,d同理,使用位計算效率更高:*256相當于<<8
  ​
  提供一段scala代碼:
  def ip2long(ip:String)=ip.split(".").reverse.zipWithIndex.map(v=>v._1<<(8*v._2)).reduce(_+_)
  ​
  可以提取轉換後的數字的前三位進行分組查詢,效率更高:
  ip.groupby(_.substring(0,3))
           

1.6.4 求質數

/**
      * 求min - max以内的所有質數
      * 隻需要判斷一個數是否能被自身以外的大于1的質數數整除就可以了
      * @return
      */
    def countPrime(min:Int,max:Int):Array[Int]={
      val arr = mutable.Set[Int](1)
      (min to max).foreach(index=> {
        if(!arr.exists(prime=>{
          //某個數如果能被除自己和1以外的質數整除,說明該數不是質數
          if(prime>1 && index != prime && index % prime == 0) true
          else false
        }))
          arr += index
      })
      arr.toArray.sorted
    }
           

1.6.5 判斷平衡二叉樹

原理:最底層的左節點和右節點相差為1(左右子樹相差為1),例子可以看上方海量資料實時排序部分
  方法:從根節點開始,判斷左子樹與右子樹是否差1,然後對左右子樹不斷遞歸這個判斷直到沒有子樹。
           

1.6.6 Java 生産者和消費者

package third;
  ​
  import static java.lang.Thread.sleep;
  ​
  public class SellShaoBing {
      private int shaoBingAmount=0;
      private boolean isStop = false;
      private int maxAmount;
      public SellShaoBing(int max){
          maxAmount = max;
      }
      private Thread createProductThread(){
          return new Thread(()->{
              while (!isStop) {
                  synchronized (this) {
                      //留個緩沖,做滿之前就開始賣
                      if (shaoBingAmount > maxAmount - 5) notifyAll();
                      waitMethod(shaoBingAmount == maxAmount);
                      sleepMethod();
                      System.out.println("做燒餅");
                      shaoBingAmount++;
                      System.out.println("目前燒餅數量"+shaoBingAmount);
                  }
              }
          });
      }
      private Thread createConsumerThread(){
          return new Thread(()->{
              while (!isStop) {
                  synchronized (this) {
                      //留個緩沖,燒餅賣完之間就通知生産
                      if (shaoBingAmount < maxAmount /4) notifyAll();
                      waitMethod(shaoBingAmount == 0);
                      sleepMethod();
                      System.out.println("賣燒餅");
                      shaoBingAmount--;
                      System.out.println("目前燒餅數量"+shaoBingAmount);
                  }
              }
          });
      }
      public void actions(){
         Thread product =  createProductThread();
         Thread consumer =  createConsumerThread();
         consumer.start();
         product.start();
          try {
              sleep(5000);
          } catch (InterruptedException e) {
              e.printStackTrace();
              Thread.currentThread().interrupt();
          }
          isStop = true;
      }
  ​
      private synchronized void waitMethod(boolean iswait){
          if (iswait) {
              try {
                  wait();
              } catch (InterruptedException e) {
                  e.printStackTrace();
                  Thread.currentThread().interrupt();
              }
          }
      }
      private void sleepMethod(){
          try {
              sleep(100);
          } catch (InterruptedException e) {
              e.printStackTrace();
              Thread.currentThread().interrupt();
          }
      }
  }
  ​
           

1.6.7 Scala 生産者和消費者(推薦)

package utils
  ​
  //賣燒餅(Java生産者消費者的Scala實作)
  case class ShaoBing(var amount:Int=0,maxAmount:Int=30,isContinue:Boolean=true)
  object ShaoBing{
      /** 
       * 燒餅線程模闆
       * @param isNotify 傳入一個喚醒所有線程的判斷方法
       * @param isWait   傳入一個使目前線程等待的方法
       * @param doMethod 傳入一個增減燒餅數量的方法
       * @param shaoBing 傳入一個燒餅對象
       */
      def createThread(isNotify:ShaoBing => Boolean , isWait:ShaoBing => Boolean , doMethod:ShaoBing => Unit)(shaoBing:ShaoBing)={
          while (shaoBing.isContinue){
              //notify和wait都需要在同步代碼塊中才能使用,java 的 synchronized(obj){...} = scala 的 obj.synchronized{...}
              shaoBing.synchronized{
              if (isNotify(shaoBing)) shaoBing.notifyAll()
              //傳入等待條件,使線程等待
              if (isWait(shaoBing)) shaoBing.wait()
              //增加或減少燒餅數量時才上鎖
                  doMethod(shaoBing)
              }
              println("目前燒餅數量:"+shaoBing.amount)
              Thread.sleep(1000)
          }
      }
  ​
      //做燒餅
      val createProducerThread = createThread(
          //做滿之前就開始賣
          isNotify = shaoBing=>shaoBing.amount > shaoBing.maxAmount-5,
          //做滿之後就等
          isWait = shaoBing=>shaoBing.amount == shaoBing.maxAmount,
          _.amount += 1) _
  ​
      //賣燒餅
      val createConsumerThread = createThread(
          //賣到剩四分之一時候提前開始生産燒餅
          isNotify = shaoBing=>shaoBing.amount < shaoBing.maxAmount/4,
          //賣光了就停止賣燒餅
          isWait = shaoBing=>shaoBing.amount == 0,
          _.amount -= 1) _
      
      
      //啟動生産者和消費者
      def main(args:Array[String]):Unit={
          val shaoBing = ShaoBing()
          val producer = new Thread(new Runnable {override def run(): Unit = createProducerThread(shaoBing)})
          val consumer = new Thread(new Runnable {override def run(): Unit = createConsumerThread(shaoBing)})
          producer.start()
          consumer.start()
      }
  }
           

1.7 線程模型

IO共有四種模型:同步阻塞、同步非阻塞、異步阻塞、異步非阻塞

同步阻塞:系統核心做好讀寫資料的準備之前,使用者線程一直等待。

同步非阻塞:使用者線程使用一個循環不斷詢問核心是否準備就緒,在準備就緒以前會一直進行該循環。

異步阻塞:使用者線程啟動一個新的線程去同步阻塞,自己則做下一件事,新的線程不斷詢問系統核心并阻塞。

異步非阻塞:當使用者線程收到通知時,資料已經被核心讀取完畢,并放在了使用者線程指定的緩沖區内,核心在IO完成後通知使用者線程直接使用即可