開發者學堂課程【大資料實時計算架構 Spark 快速入門:Spark 算子操作剖析3】學習筆記,與課程緊密聯系,讓使用者快速學習知識。
課程位址:
https://developer.aliyun.com/learning/course/100/detail/1689Spark 算子操作剖析 3
一個 RDD 裡面有某個 partition,groupByKey 裡面傳參數可變為相應 partition。不傳參數,spark.default.parallelism 改變即可。
算子傳參優先級最高,其次是 spark.default.parallelism 的設定,若以上兩個都為空,則下一個 RDD 的并行度與上一個相同。
reduceoperator 算子:
reduce 是 action 操作,sum 在 driver 端,算子裡的邏輯在重節點。
numbers 指針在driver端,資料不一定在 driver 端,reduce方法本身在driver端執行,但 reduce 裡的匿名函數在 excutor 端執行。
調動 foreach 本身是 driver 端調動,println 是在叢節點裡并行來執行的,并行的存入資料庫更好。
Reducebykey:
shuffle 操作都有 reduce 和 map 操作兩個階段。
public class ReduceOperator {
public static void main(String[] args){
SparkConf conf = new SparkConf().setAppName("ReduceOperator"〉
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
//有一個集合,裡面有1到10, 10個數字,現在我們通過 reduce 來進行累加
List numberList = Arrays.aslist(1,2,3,4,5);
JavaRDD numbers = sc.parallelize(numberList);
// reduce 操作的原理:首先将第一個和第二個元素,傳入 call 方法
//計算一個結果,接着把結果再和後面的元素依次累加
//以此類推
Int sum= numbers.reduce(new Function2(){
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});
System.out.println(sum);
sc.close();
Reduces the elements of this RDD using the specified commutative and
associative binary operator.
def reduce(f:(T, T)=> T):T = withscope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T]=> Option[T]= iter =>{
if (iter.hasNext){
Some(iter.reduceleft(cleanF))
} else {
None
}
}
var jobResult: Option[T]= None
val mergeResult =(index: Int, taskResult: Option[T])=>{
if (taskResult.isDefined){
jobResult = jobResult match {
case Some(value)=> Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducepartition, mergeResult)
//Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedoperationException("empty collection"))
vimport java.util.Arrays;
FeduceBykey= groupBykey + reduce
// shuffle 洗牌= map 端+ reduce 端
//spark 裡面這個 reduceByKey在 map 端自帶 Combiner
public class ReduceByKeyOperaton {
public static void main(string[] args){
SparkConf conf = new SparkConf().setAppName("LineCount").setMaster(
"local");
JavaSparkContext sc = new JavaSparkContext(conf);
List> scoreList = Arrays.aslist(
new Tuple2("xuruyun", 150),
new Tuple2("liangyongqi", 100),
new Tuple2("wangfei”, 100),
new Tuple2("wangfei", 80));