今天學習完單value的算子和雙value算子的開始
(1)distinct
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
val sc = new SparkContext(sparkConf)
//distinct,去重
val rdd=sc.makeRDD(List(1,2,3,4,1,2,3,4))
val rdd1: RDD[Int] = rdd.distinct()
rdd1.collect().foreach(println)
sc.stop()
}
(2)coalesce
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
val sc = new SparkContext(sparkConf)
//coalesce,壓縮分區,将多個分區,分成指定分區,節約資源
val rdd=sc.makeRDD(List(1,2,3,4),4)
val newrdd: RDD[Int] = rdd.coalesce(2,true)//true是否打亂資料
newrdd.saveAsTextFile("outout")
sc.stop()
}
(3)repartition
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
val sc = new SparkContext(sparkConf)
//repartition,擴大分區
val rdd=sc.makeRDD(List(1,2,3,4),4)
val newrdd: RDD[Int] = rdd.repartition(3)
newrdd.saveAsTextFile("outout")
sc.stop()
}
(4)sortBy
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
val sc = new SparkContext(sparkConf)
//sortBy,根據規則排序
val rdd=sc.makeRDD(List(6,2,8,1,5,3),2)
val newrdd: RDD[Int] = rdd.sortBy(num => num)
newrdd.collect().foreach(println)
sc.stop()
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
val sc = new SparkContext(sparkConf)
//sortBy,根據規則排序
val rdd=sc.makeRDD(List(("1",1),("11",2),("2",3)),2)
//t._1是先比較第一個字元大小,加上toInt後是直接比較字元串代表的數字大小
val newrdd: RDD[(String, Int)] = rdd.sortBy(t => t._1.toInt)
newrdd.collect().foreach(println)
sc.stop()
}
(5)雙value
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
val sc = new SparkContext(sparkConf)
//雙value
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 7, 8))
//交集
val rdd3: RDD[Int] = rdd1.intersection(rdd2)
println(rdd3.collect().mkString(","))
//并集
val rdd4: RDD[Int] = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))
//差集
val rdd5: RDD[Int] = rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))
//拉鍊
val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))
sc.stop()
}