天天看點

Spark基本的RDD算子之groupBy,groupByKey,mapValues

1. groupby

def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])]
def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])]
def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])]           

groupBy算子接收一個函數,這個函數傳回的值作為key,然後通過這個key來對裡面的元素進行分組。

val a = sc.parallelize(1 to 9, 3)
a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect
//傳回的even或者odd字元串作為key來group RDD裡面的值,
res42: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(2, 4, 6, 8)), (odd,ArrayBuffer(1, 3, 5, 7, 9)))           

2. groupbykey

def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
           

這個算子和group類似,不過和它不同的是他不接收一個函數,而是直接将鍵值對類型的資料的key作為group的key 值。同樣的,他也可以接收其他參數比如說partitioner

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
val b = a.keyBy(_.length) //将字元串的長度作為key值。
b.groupByKey.collect //根據相同key值來進行group操作

res11: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle)))
           

3.mapValues

同基本轉換操作中的map,隻不過mapValues是針對[K,V]中的V值進行map操作。

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)  
val b = a.map(x => (x.length, x))  
b.mapValues("x" + _ + "x").collect  



//結果 
Array( 
(3,xdogx), 
(5,xtigerx), 
(4,xlionx), 
(3,xcatx), 
(7,xpantherx), 
(5,xeaglex) 
)           

繼續閱讀