天天看點

Spark 算子操作剖析 3

開發者學堂課程【大資料實時計算架構 Spark 快速入門:Spark 算子操作剖析3】學習筆記,與課程緊密聯系,讓使用者快速學習知識。

課程位址:

https://developer.aliyun.com/learning/course/100/detail/1689

Spark 算子操作剖析 3

一個 RDD 裡面有某個 partition,groupByKey 裡面傳參數可變為相應 partition。不傳參數,spark.default.parallelism 改變即可。

算子傳參優先級最高,其次是 spark.default.parallelism 的設定,若以上兩個都為空,則下一個 RDD 的并行度與上一個相同。

reduceoperator 算子:

reduce 是 action 操作,sum 在 driver 端,算子裡的邏輯在重節點。

numbers 指針在driver端,資料不一定在 driver 端,reduce方法本身在driver端執行,但 reduce 裡的匿名函數在 excutor 端執行。            

調動 foreach 本身是 driver 端調動,println 是在叢節點裡并行來執行的,并行的存入資料庫更好。

Reducebykey:

shuffle 操作都有 reduce 和 map 操作兩個階段。

public class ReduceOperator {

public static void main(String[] args){

SparkConf conf = new SparkConf().setAppName("ReduceOperator"〉

.setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

//有一個集合,裡面有1到10, 10個數字,現在我們通過 reduce 來進行累加

List numberList = Arrays.aslist(1,2,3,4,5);

JavaRDD numbers = sc.parallelize(numberList);

// reduce 操作的原理:首先将第一個和第二個元素,傳入 call 方法

//計算一個結果,接着把結果再和後面的元素依次累加

//以此類推

Int sum= numbers.reduce(new Function2(){

private static final long serialVersionUID = 1L;

@Override

public Integer call(Integer v1, Integer v2) throws Exception {

return v1+v2;

}

});

System.out.println(sum);

sc.close();

Spark 算子操作剖析 3

Reduces the elements of this RDD using the specified commutative and

associative binary operator.

def reduce(f:(T, T)=> T):T = withscope {

val cleanF = sc.clean(f)

val reducePartition: Iterator[T]=> Option[T]= iter =>{

if (iter.hasNext){

Some(iter.reduceleft(cleanF))

} else {

None

}

}

var jobResult: Option[T]= None

val mergeResult =(index: Int, taskResult: Option[T])=>{

if (taskResult.isDefined){

jobResult = jobResult match {

case Some(value)=> Some(f(value, taskResult.get))

case None => taskResult

}

}

}

sc.runJob(this, reducepartition, mergeResult)

//Get the final result out of our Option, or throw an exception if the RDD was empty

jobResult.getOrElse(throw new UnsupportedoperationException("empty collection"))

vimport java.util.Arrays;

FeduceBykey= groupBykey + reduce  

// shuffle 洗牌= map 端+ reduce 端

//spark 裡面這個 reduceByKey在 map 端自帶 Combiner  

public class ReduceByKeyOperaton {

public static void main(string[] args){

SparkConf conf = new SparkConf().setAppName("LineCount").setMaster(

"local");

JavaSparkContext sc = new JavaSparkContext(conf);

List> scoreList = Arrays.aslist(

new Tuple2("xuruyun", 150),

new Tuple2("liangyongqi", 100),

new Tuple2("wangfei”, 100),

new Tuple2("wangfei", 80));