天天看點

Scala實踐Spark(二) pair(鍵值對操作)

文章目錄

  • ​​轉化​​
  • ​​單個pair​​
  • ​​兩個pair​​
  • ​​聚合​​
  • ​​分組​​
  • ​​連接配接​​
  • ​​排序​​
  • ​​action​​
  • ​​資料分區​​
  • ​​擷取RDD的分區方式​​
  • ​​示例:PageRank​​
  • ​​自定義分區方式​​

轉化

單個pair

  • reduceByKey(func) 合并相同建的value
  • groupByKey() 對相同鍵的value分組
  • combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner) 合并相同key的value并傳回不同的類型
  • aggregate()
  • mapValues(func)
  • flatMapValues(func)
  • keys 沒有括号
  • values 沒有括号
  • sortByKey()

兩個pair

  • subtractByKey
  • join 内連接配接
  • rightOuterJoin 右外連接配接
  • leftOuterJoin 左外連接配接
  • cogroup

聚合

  • reduceByKey()
  • foldByKey()

    下面是幾個聚合的例子

scala> val input = sc.parallelize(Array(("panda",0),("pink",3),("private",3),("panda",1),("pink",4)))
input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> input.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))
res0: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[2] at reduceByKey at <console>:26

scala> val lines = sc.textFile("file:///home/hadoop/software/spark/spark-2.4.4-bin-hadoop2.7/README.md")
lines: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/software/spark/spark-2.4.4-bin-hadoop2.7/README.md MapPartitionsRDD[4] at textFile at <console>:24

scala> val words = lines.flatMap(line=>line.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at flatMap at <console>:25

scala> val result = words.map(x=>(x,1)).reduceByKey((x,y)=>x+y)
result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at <console>:25

scala> result.take(5).foreach(println)
(package,1)                                                                     
(For,3)
(Programs,1)
(processing.,1)
(Because,1)      

combineByKey()有多個參數對應聚合操作的各個階段,可以用來解釋聚合操作各個階段的功能劃分。

scala> val input = sc.parallelize(Array(("panda",0),("pink",3),("private",3),("panda",1),("pink",4)))
input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val result = input.combineByKey(
  (v)=>(v,1), //建立 createCombiner V=>C
  (acc:(Int,Int),v)=>(acc._1+v,acc._2+1), //合并值 mergeValue
  (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._1+acc2._2)) // 合并C
.map{case (key,value)=>(key,value._1/value._2.toFloat)}
result: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[2] at map at <console>:25

scala> result.collectAsMap().map(println(_))
(private,3.0)                                                                   
(pink,3.5)
(panda,0.5)
res0: Iterable[Unit] = ArrayBuffer((), (), ())      

​​Spark核心RDD:combineByKey函數詳解​​

并行度調優:在執行聚合或分組操作時,可以要求Spark使用給定的分區數,Spark會嘗試根據叢集的大小推斷出一個有意義的預設值。

scala> val data = Seq(("a",3),("b",4),("a",1))
data: Seq[(String, Int)] = List((a,3), (b,4), (a,1))

scala> sc.parallelize(data).reduceByKey((x,y)=>x+y)
res2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:27

scala> sc.parallelize(data).reduceByKey((x,y)=>x+y,10)
res3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:27      

關于分區

  • ​rdd.partitions.size​

    ​擷取分區數
  • ​repartition()​

    ​建立新的分區集合

分組

  • groupByKey()
  • cogroup() =>[(K,(Iterable[V],Iterable[W]))]

連接配接

  • join()
  • rightOuterJoin()
  • leftOuterJoin()

排序

  • sortByKey() 支援自定義比較函數
val input:RDD[(Int,Venue)] = ...
implicit val sortIntegerByString = new Ordering[Int]{
  override def compare(a:Int,b:Int) = a.toString.compare(b.toString)
}
rdd.sortByKey()      

action

  • countByKey() 對key計數
  • collectAsMap() 将結果以映射表形式傳回,現在是不是都為​

    ​collect()​

    ​?
  • lookup(key) 傳回key對應的value

資料分區

合理的分組

以下是一個簡單的應用,假設過去存在一個​​

​[UserID,UserInfo]​

​​使用者訂閱表,周期性的會産生一個過去五分鐘的事件,是​

​[UserID,LinkInfo]​

​的表。

val sc = new Sparkcontext(...)
val userData = sc.sequenceFile[UserID,UserInfo]("hdfs://...").persist()

def processNewLogs(logFileName:String){
  val events = sc.sequenceFile[UserID,LinkInfo](logFileName)
  val joined = userData.join(events)
  val offTopicVisits = joined.filter{
    case (userId,(userInfo,linkInfo)) => !userInfo.topics.contains(linkInfo.topic)
  }.count()
  println("Number of visits to non-subscribed topics:" + offTopicVisits)
}      

上述代碼不夠高效,在于join()執行,每次都會計算所有鍵的哈希值傳輸到同一台機器然後對所有鍵相同的記錄連接配接操作。(哈希值計算和跨節點資料混洗)

通過​​

​partitionBy()​

​将表轉化為哈希分區。

val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID,UserInfo]("hdfs://...").partitionBy(new HashPartitioner(100)).persist()      
  • 由于建構userData時調用了partitionBy(),Spark就知道RDD根據鍵的哈希值來分區的,這樣在調用join()時,Spark隻會對events混洗操作,将events中特定UserID的記錄發送到userdata的對應分區所在的機器上。
  • 事實上,除了join()的很多操作也會利用已有的分區資訊,比如sortByKey()和groupByKey()會分别生成範圍分區的RDD和哈希分區的RDD;另一方面,map()這樣的操作會導緻新的RDD失去父RDD的分區資訊
  • 如果沒有将partitionBy()轉化操作的結果持久化,RDD都會重複地對資料進行分區操作。

擷取RDD的分區方式

通過partitoner屬性擷取分區資訊

scala> import org.apache.spark
import org.apache.spark

scala> val pairs = sc.parallelize(List((1,1),(2,2),(3,3)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:25

scala> pairs.partitioner
res4: Option[org.apache.spark.Partitioner] = None

scala> val partitioned = pairs.partitionBy(new spark.HashPartitioner(2))
partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[8] at partitionBy at <console>:26

scala> partitioned.partitioner
res5: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)      
  • Spark内部知道各操作如何影響分區方式,并将對資料進行分區的操作的結果RDD自動設定為對應的分區器。如join()連接配接兩個RDD,由于鍵相同的元素會被hash到同一台機器上,Spark直到輸出結果也是hash分區的;不過轉化操作不一定按已知方式分區,如map會改變鍵值,此時就不會有固定的分區方式,不過我們可以采用另外兩個操作mapValues()和flatMapValues()作為替代方法,可以保證每個二進制組的鍵保持不變。以下是為生成的RDD設定好分區方式的操作:cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、partitionBy()、sort()、mapValues()、flatMapValues()、filter()

示例:PageRank

算法維護兩個資料集一個(pageID,linkList)包含每個頁面的相鄰頁面表,一個(pageID,rank)。計算每個頁面的PageRank值

  • 1.排序值初始化為1.0
  • 2.每次疊代向其相鄰頁面發送一個值為​

    ​rank(p)/numNeighbors(p)​

  • 3.将每個頁面的排序值設為​

    ​0.15+0.85*contributionReceived​

  • 4.重複循環,算法會逐漸收斂于每個頁面的實際PageRank值
val links = sc.objectFile[(String,Seq[String])]("links").partitionBy(new HashPartitioner(100)).persist()
val ranks = links.mapValues(v=>1.0)
for(i <- 0 until 10){
  val contributions = links.join(ranks).flatMap{
    // dest相鄰的link  rank/links.size傳遞的值
    case (pageId,(links,rank))=>links.map(dest=>(dest,rank/links.size))
  }
  ranks = contributions.reduceByKey((x,y)=>x+y).mapValues(v=>0.15+0.85*v)
}
result.saveAsTextFile("ranks")      

這裡解釋一下,根據pageid合并上述兩個表=>​

​(pageId,(links,rank))​

​​,将links中的每一個項更新為一個​

​pair​

​​:​

​(dest,rank/links/size)​

​​,即每一個項的計算貢獻值,最後合并貢獻值更新rank值。

上述代碼看起來簡單,還是做了不少事情來保持高效的方式:

  • links是一個靜态資料集,是以我們程式一開始進行了分區操作,這樣就不需要把它通過網絡進行資料混洗,相比于普通MapReduce節省了相當可觀的網絡通信開銷。
  • 我們對links調用了持久化,将它儲存在記憶體中以供每次疊代使用
  • 第一次建立ranks,我們通過mapValues()而不是map()來保留父RDD的分區方式,這樣對于它進行的第一次連接配接操作開銷很小
  • 循環體中,我們用reduceByKey()後使用mapValues():因為reduceByKey()的結果已經是哈希分區,下一次循環将映射操作的結果再次與links進行連接配接操作更加高效。
  • 短短的幾行代碼簡直great!!

注意:為了最大化分區優化的潛在作用,在無需改變鍵的情況下金亮使用mapValues()或flatMapValues()

自定義分區方式

  • numPartitions:Int:傳回建立出來的分區數
  • getPartition(key:Any):Int:傳回給定鍵的分區編号(0到numPartition-1)
  • equals():Java判斷相等性的方法,Spark需要用這個方法檢查你的分區器對象是否和其他分區器執行個體相同,這樣Spark才能判斷RDD的分區方式是否相同。

    下面編寫一個基于域名的分區器,這個分區器隻對url中的域名部分hash

class DomainNamePartitioner(numParts:Int) extends Partitioner{
  override def numpartitions: Int = numParts
  override def getPartition(key:Any): Int = {
  val domain = new Java.net.URL(key.toString).getHost()
  val code = (domain.hashCode % numPartitions)
  if(code<0){
    code + numPartitions
    }
  else{
    code
    }
  }
  override def equals(other:Any):Boolean = other match{
    case dnp:DomainNamePartitioner=>dnp.numPartitions == numPartitions
    case _ => false
    }
}      

繼續閱讀