Spark支援兩種RDD操作:transformation和action。transformation操作會針對已有的RDD建立一個新的RDD;
而action則主要是對RDD進行最後的操作,比如周遊、reduce、儲存到檔案等,并可以傳回結果給Driver程式。
例如,map就是一種transformation操作,它用于将已有RDD的每個元素傳入一個自定義的函數,并擷取一個新的元素,然後将所有的新元素組成一個新的RDD。
而reduce就是一種action操作,它用于對RDD中的所有元素進行聚合操作,并擷取一個最終的結果,然後傳回給Driver程式。
transformation的特點就是lazy特性。lazy特性指的是,如果一個spark應用中隻定義了transformation操作,那麼即使你執行該應用,
這些操作也不會執行。也就是說,transformation是不會觸發spark程式的執行的,它們隻是記錄了對RDD所做的操作,但是不會自發的執行。
隻有當transformation之後,接着執行了一個action操作,那麼所有的transformation才會執行。Spark通過這種lazy特性,來進行底層的spark應用執行的優化,避免産生過多中間結果。
action操作執行,會觸發一個spark job的運作,進而觸發這個action之前所有的transformation的執行。這是action的特性。
常用transformation介紹
常用action介紹
transformation操作開發實戰
1、map:将集合中每個元素乘以2
2、filter:過濾出集合中的偶數
3、flatMap:将行拆分為單詞
4、groupByKey:将每個班級的成績進行分組
5、reduceByKey:統計每個班級的總分
6、sortByKey:将學生分數進行排序
7、join:列印每個學生的成績
8、cogroup:列印每個學生的成績
map:将集合中每個元素乘以2
建立TransforDemo類
package com.it19gong.sparkproject;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
public class TransforDemo {
public static void main(String[] args) {
map();
}
private static void map() {
//建立SparkConf
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//建構集合
List numbers = Arrays.asList(1,2,3,4,5);
//并行化集合,建立初始RDD
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 使用map算子,将集合中的每個元素都乘以2
// map算子,是對任何類型的RDD,都可以調用的
// 在java中,map算子接收的參數是Function對象
// 建立的Function對象,一定會讓你設定第二個泛型參數,這個泛型類型,就是傳回的新元素的類型
// 同時call()方法的傳回類型,也必須與第二個泛型類型同步
// 在call()方法内部,就可以對原始RDD中的每一個元素進行各種處理和計算,并傳回一個新的元素
// 所有新的元素就會組成一個新的RDD
// 所有新的元素就會組成一個新的RDD
JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
multipleNumberRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
}
}
運作代碼
filter:過濾出集合中的偶數
添加filter()方法
package com.it19gong.sparkproject;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
public class TransforDemo {
public static void main(String[] args) {
//map();
filter();
}
private static void filter() {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模拟集合
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 并行化集合,建立初始RDD
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 對初始RDD執行filter算子,過濾出其中的偶數
// filter算子,傳入的也是Function,其他的使用注意點,實際上和map是一樣的
// 但是,唯一的不同,就是call()方法的傳回類型是Boolean
// 每一個初始RDD中的元素,都會傳入call()方法,此時你可以執行各種自定義的計算邏輯
// 來判斷這個元素是否是你想要的
// 如果你想在新的RDD中保留這個元素,那麼就傳回true;否則,不想保留這個元素,傳回false
JavaRDD<Integer> evenNumberRDD = numberRDD.filter(
new Function<Integer, Boolean>() {
private static final long serialVersionUID = 1L;
// 在這裡,1到10,都會傳入進來
// 但是根據我們的邏輯,隻有2,4,6,8,10這幾個偶數,會傳回true
// 是以,隻有偶數會保留下來,放在新的RDD中
@Override
public Boolean call(Integer v1) throws Exception {
return v1 % 2 == 0;
}
});
// 列印新的RDD
evenNumberRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void map() {
//建立SparkConf
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//建構集合
List numbers = Arrays.asList(1,2,3,4,5);
//并行化集合,建立初始RDD
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 使用map算子,将集合中的每個元素都乘以2
// map算子,是對任何類型的RDD,都可以調用的
// 在java中,map算子接收的參數是Function對象
// 建立的Function對象,一定會讓你設定第二個泛型參數,這個泛型類型,就是傳回的新元素的類型
// 同時call()方法的傳回類型,也必須與第二個泛型類型同步
// 在call()方法内部,就可以對原始RDD中的每一個元素進行各種處理和計算,并傳回一個新的元素
// 所有新的元素就會組成一個新的RDD
// 所有新的元素就會組成一個新的RDD
JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
multipleNumberRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
}
}
運作filter()
flatMap案例
flatMap案例:将文本行拆分為多個單詞
package com.it19gong.sparkproject;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
public class TransforDemo {
public static void main(String[] args) {
//map();
//filter();
flatMap() ;
}
private static void flatMap() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("flatMap")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 構造集合
List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");
// 并行化集合,建立RDD
JavaRDD<String> lines = sc.parallelize(lineList);
// 對RDD執行flatMap算子,将每一行文本,拆分為多個單詞
// flatMap算子,在java中,接收的參數是FlatMapFunction
// 我們需要自己定義FlatMapFunction的第二個泛型類型,即,代表了傳回的新元素的類型
// call()方法,傳回的類型,不是U,而是Iterable<U>,這裡的U也與第二個泛型類型相同
// flatMap其實就是,接收原始RDD中的每個元素,并進行各種邏輯的計算和處理,可以傳回多個元素
// 多個元素,即封裝在Iterable集合中,可以使用ArrayList等集合
// 新的RDD中,即封裝了所有的新元素;也就是說,新的RDD的大小一定是 >= 原始RDD的大小
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
// 在這裡會,比如,傳入第一行,hello you
// 傳回的是一個Iterable<String>(hello, you)
@Override
public Iterable<String> call(String t) throws Exception {
return Arrays.asList(t.split(" "));
}
});
// 列印新的RDD
words.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
@Override
public void call(String t) throws Exception {
System.out.println(t);
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void filter() {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模拟集合
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 并行化集合,建立初始RDD
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 對初始RDD執行filter算子,過濾出其中的偶數
// filter算子,傳入的也是Function,其他的使用注意點,實際上和map是一樣的
// 但是,唯一的不同,就是call()方法的傳回類型是Boolean
// 每一個初始RDD中的元素,都會傳入call()方法,此時你可以執行各種自定義的計算邏輯
// 來判斷這個元素是否是你想要的
// 如果你想在新的RDD中保留這個元素,那麼就傳回true;否則,不想保留這個元素,傳回false
JavaRDD<Integer> evenNumberRDD = numberRDD.filter(
new Function<Integer, Boolean>() {
private static final long serialVersionUID = 1L;
// 在這裡,1到10,都會傳入進來
// 但是根據我們的邏輯,隻有2,4,6,8,10這幾個偶數,會傳回true
// 是以,隻有偶數會保留下來,放在新的RDD中
@Override
public Boolean call(Integer v1) throws Exception {
return v1 % 2 == 0;
}
});
// 列印新的RDD
evenNumberRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void map() {
//建立SparkConf
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//建構集合
List numbers = Arrays.asList(1,2,3,4,5);
//并行化集合,建立初始RDD
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 使用map算子,将集合中的每個元素都乘以2
// map算子,是對任何類型的RDD,都可以調用的
// 在java中,map算子接收的參數是Function對象
// 建立的Function對象,一定會讓你設定第二個泛型參數,這個泛型類型,就是傳回的新元素的類型
// 同時call()方法的傳回類型,也必須與第二個泛型類型同步
// 在call()方法内部,就可以對原始RDD中的每一個元素進行各種處理和計算,并傳回一個新的元素
// 所有新的元素就會組成一個新的RDD
// 所有新的元素就會組成一個新的RDD
JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
multipleNumberRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
}
}
運作代碼
GroupBYkey案例
将每個班級的成績進行分組
package com.it19gong.sparkproject;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class TransforDemo {
public static void main(String[] args) {
//map();
//filter();
//flatMap() ;
groupByKey();
}
private static void groupByKey() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("groupByKey")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模拟集合
List<Tuple2<String, Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("class1", 80),
new Tuple2<String, Integer>("class2", 75),
new Tuple2<String, Integer>("class1", 90),
new Tuple2<String, Integer>("class2", 65));
// 并行化集合,建立JavaPairRDD
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
// 針對scores RDD,執行groupByKey算子,對每個班級的成績進行分組
// groupByKey算子,傳回的還是JavaPairRDD
// 但是,JavaPairRDD的第一個泛型類型不變,第二個泛型類型變成Iterable這種集合類型
// 也就是說,按照了key進行分組,那麼每個key可能都會有多個value,此時多個value聚合成了Iterable
// 那麼接下來,我們是不是就可以通過groupedScores這種JavaPairRDD,很友善地處理某個分組内的資料
JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey();
// 列印groupedScores RDD
groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Iterable<Integer>> t)
throws Exception {
System.out.println("class: " + t._1);
Iterator<Integer> ite = t._2.iterator();
while(ite.hasNext()) {
System.out.println(ite.next());
}
System.out.println("==============================");
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void flatMap() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("flatMap")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 構造集合
List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");
// 并行化集合,建立RDD
JavaRDD<String> lines = sc.parallelize(lineList);
// 對RDD執行flatMap算子,将每一行文本,拆分為多個單詞
// flatMap算子,在java中,接收的參數是FlatMapFunction
// 我們需要自己定義FlatMapFunction的第二個泛型類型,即,代表了傳回的新元素的類型
// call()方法,傳回的類型,不是U,而是Iterable<U>,這裡的U也與第二個泛型類型相同
// flatMap其實就是,接收原始RDD中的每個元素,并進行各種邏輯的計算和處理,可以傳回多個元素
// 多個元素,即封裝在Iterable集合中,可以使用ArrayList等集合
// 新的RDD中,即封裝了所有的新元素;也就是說,新的RDD的大小一定是 >= 原始RDD的大小
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
// 在這裡會,比如,傳入第一行,hello you
// 傳回的是一個Iterable<String>(hello, you)
@Override
public Iterable<String> call(String t) throws Exception {
return Arrays.asList(t.split(" "));
}
});
// 列印新的RDD
words.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
@Override
public void call(String t) throws Exception {
System.out.println(t);
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void filter() {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模拟集合
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 并行化集合,建立初始RDD
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 對初始RDD執行filter算子,過濾出其中的偶數
// filter算子,傳入的也是Function,其他的使用注意點,實際上和map是一樣的
// 但是,唯一的不同,就是call()方法的傳回類型是Boolean
// 每一個初始RDD中的元素,都會傳入call()方法,此時你可以執行各種自定義的計算邏輯
// 來判斷這個元素是否是你想要的
// 如果你想在新的RDD中保留這個元素,那麼就傳回true;否則,不想保留這個元素,傳回false
JavaRDD<Integer> evenNumberRDD = numberRDD.filter(
new Function<Integer, Boolean>() {
private static final long serialVersionUID = 1L;
// 在這裡,1到10,都會傳入進來
// 但是根據我們的邏輯,隻有2,4,6,8,10這幾個偶數,會傳回true
// 是以,隻有偶數會保留下來,放在新的RDD中
@Override
public Boolean call(Integer v1) throws Exception {
return v1 % 2 == 0;
}
});
// 列印新的RDD
evenNumberRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void map() {
//建立SparkConf
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//建構集合
List numbers = Arrays.asList(1,2,3,4,5);
//并行化集合,建立初始RDD
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 使用map算子,将集合中的每個元素都乘以2
// map算子,是對任何類型的RDD,都可以調用的
// 在java中,map算子接收的參數是Function對象
// 建立的Function對象,一定會讓你設定第二個泛型參數,這個泛型類型,就是傳回的新元素的類型
// 同時call()方法的傳回類型,也必須與第二個泛型類型同步
// 在call()方法内部,就可以對原始RDD中的每一個元素進行各種處理和計算,并傳回一個新的元素
// 所有新的元素就會組成一個新的RDD
// 所有新的元素就會組成一個新的RDD
JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
multipleNumberRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
}
}
運作代碼
SortByKey 案例
将學生分數進行排序
package com.it19gong.sparkproject;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class TransforDemo {
public static void main(String[] args) {
//map();
//filter();
//flatMap() ;
// groupByKey();
sortByKey();
}
private static void sortByKey() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("sortByKey")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模拟集合
List<Tuple2<Integer, String>> scoreList = Arrays.asList(
new Tuple2<Integer, String>(65, "leo"),
new Tuple2<Integer, String>(50, "tom"),
new Tuple2<Integer, String>(100, "marry"),
new Tuple2<Integer, String>(80, "jack"));
// 并行化集合,建立RDD
JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
// 對scores RDD執行sortByKey算子
// sortByKey其實就是根據key進行排序,可以手動指定升序,或者降序
// 傳回的,還是JavaPairRDD,其中的元素内容,都是和原始的RDD一模一樣的
// 但是就是RDD中的元素的順序,不同了
JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false);
// 列印sortedScored RDD
sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<Integer, String> t) throws Exception {
System.out.println(t._1 + ": " + t._2);
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void groupByKey() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("groupByKey")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模拟集合
List<Tuple2<String, Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("class1", 80),
new Tuple2<String, Integer>("class2", 75),
new Tuple2<String, Integer>("class1", 90),
new Tuple2<String, Integer>("class2", 65));
// 并行化集合,建立JavaPairRDD
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
// 針對scores RDD,執行groupByKey算子,對每個班級的成績進行分組
// groupByKey算子,傳回的還是JavaPairRDD
// 但是,JavaPairRDD的第一個泛型類型不變,第二個泛型類型變成Iterable這種集合類型
// 也就是說,按照了key進行分組,那麼每個key可能都會有多個value,此時多個value聚合成了Iterable
// 那麼接下來,我們是不是就可以通過groupedScores這種JavaPairRDD,很友善地處理某個分組内的資料
JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey();
// 列印groupedScores RDD
groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Iterable<Integer>> t)
throws Exception {
System.out.println("class: " + t._1);
Iterator<Integer> ite = t._2.iterator();
while(ite.hasNext()) {
System.out.println(ite.next());
}
System.out.println("==============================");
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void flatMap() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("flatMap")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 構造集合
List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");
// 并行化集合,建立RDD
JavaRDD<String> lines = sc.parallelize(lineList);
// 對RDD執行flatMap算子,将每一行文本,拆分為多個單詞
// flatMap算子,在java中,接收的參數是FlatMapFunction
// 我們需要自己定義FlatMapFunction的第二個泛型類型,即,代表了傳回的新元素的類型
// call()方法,傳回的類型,不是U,而是Iterable<U>,這裡的U也與第二個泛型類型相同
// flatMap其實就是,接收原始RDD中的每個元素,并進行各種邏輯的計算和處理,可以傳回多個元素
// 多個元素,即封裝在Iterable集合中,可以使用ArrayList等集合
// 新的RDD中,即封裝了所有的新元素;也就是說,新的RDD的大小一定是 >= 原始RDD的大小
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
// 在這裡會,比如,傳入第一行,hello you
// 傳回的是一個Iterable<String>(hello, you)
@Override
public Iterable<String> call(String t) throws Exception {
return Arrays.asList(t.split(" "));
}
});
// 列印新的RDD
words.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
@Override
public void call(String t) throws Exception {
System.out.println(t);
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void filter() {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模拟集合
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 并行化集合,建立初始RDD
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 對初始RDD執行filter算子,過濾出其中的偶數
// filter算子,傳入的也是Function,其他的使用注意點,實際上和map是一樣的
// 但是,唯一的不同,就是call()方法的傳回類型是Boolean
// 每一個初始RDD中的元素,都會傳入call()方法,此時你可以執行各種自定義的計算邏輯
// 來判斷這個元素是否是你想要的
// 如果你想在新的RDD中保留這個元素,那麼就傳回true;否則,不想保留這個元素,傳回false
JavaRDD<Integer> evenNumberRDD = numberRDD.filter(
new Function<Integer, Boolean>() {
private static final long serialVersionUID = 1L;
// 在這裡,1到10,都會傳入進來
// 但是根據我們的邏輯,隻有2,4,6,8,10這幾個偶數,會傳回true
// 是以,隻有偶數會保留下來,放在新的RDD中
@Override
public Boolean call(Integer v1) throws Exception {
return v1 % 2 == 0;
}
});
// 列印新的RDD
evenNumberRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void map() {
//建立SparkConf
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//建構集合
List numbers = Arrays.asList(1,2,3,4,5);
//并行化集合,建立初始RDD
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 使用map算子,将集合中的每個元素都乘以2
// map算子,是對任何類型的RDD,都可以調用的
// 在java中,map算子接收的參數是Function對象
// 建立的Function對象,一定會讓你設定第二個泛型參數,這個泛型類型,就是傳回的新元素的類型
// 同時call()方法的傳回類型,也必須與第二個泛型類型同步
// 在call()方法内部,就可以對原始RDD中的每一個元素進行各種處理和計算,并傳回一個新的元素
// 所有新的元素就會組成一個新的RDD
// 所有新的元素就會組成一個新的RDD
JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
multipleNumberRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
}
}
運作代碼
join案例
列印學生成績
package com.it19gong.sparkproject;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class TransforDemo {
public static void main(String[] args) {
//map();
//filter();
//flatMap() ;
// groupByKey();
//sortByKey();
join();
}
private static void join() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("join")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模拟集合
List<Tuple2<Integer, String>> studentList = Arrays.asList(
new Tuple2<Integer, String>(1, "leo"),
new Tuple2<Integer, String>(2, "jack"),
new Tuple2<Integer, String>(3, "tom"));
List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
new Tuple2<Integer, Integer>(1, 100),
new Tuple2<Integer, Integer>(2, 90),
new Tuple2<Integer, Integer>(3, 60));
// 并行化兩個RDD
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
// 使用join算子關聯兩個RDD
// join以後,還是會根據key進行join,并傳回JavaPairRDD
// 但是JavaPairRDD的第一個泛型類型是之前兩個JavaPairRDD的key的類型,因為是通過key進行join的
// 第二個泛型類型,是Tuple2<v1, v2>的類型,Tuple2的兩個泛型分别為原始RDD的value的類型
// join,就傳回的RDD的每一個元素,就是通過key join上的一個pair
// 什麼意思呢?比如有(1, 1) (1, 2) (1, 3)的一個RDD
// 還有一個(1, 4) (2, 1) (2, 2)的一個RDD
// join以後,實際上會得到(1 (1, 4)) (1, (2, 4)) (1, (3, 4))
JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);
// 列印studnetScores RDD
studentScores.foreach(
new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<Integer, Tuple2<String, Integer>> t)
throws Exception {
System.out.println("student id: " + t._1);
System.out.println("student name: " + t._2._1);
System.out.println("student score: " + t._2._2);
System.out.println("===============================");
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void sortByKey() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("sortByKey")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模拟集合
List<Tuple2<Integer, String>> scoreList = Arrays.asList(
new Tuple2<Integer, String>(65, "leo"),
new Tuple2<Integer, String>(50, "tom"),
new Tuple2<Integer, String>(100, "marry"),
new Tuple2<Integer, String>(80, "jack"));
// 并行化集合,建立RDD
JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
// 對scores RDD執行sortByKey算子
// sortByKey其實就是根據key進行排序,可以手動指定升序,或者降序
// 傳回的,還是JavaPairRDD,其中的元素内容,都是和原始的RDD一模一樣的
// 但是就是RDD中的元素的順序,不同了
JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false);
// 列印sortedScored RDD
sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<Integer, String> t) throws Exception {
System.out.println(t._1 + ": " + t._2);
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void groupByKey() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("groupByKey")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模拟集合
List<Tuple2<String, Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("class1", 80),
new Tuple2<String, Integer>("class2", 75),
new Tuple2<String, Integer>("class1", 90),
new Tuple2<String, Integer>("class2", 65));
// 并行化集合,建立JavaPairRDD
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
// 針對scores RDD,執行groupByKey算子,對每個班級的成績進行分組
// groupByKey算子,傳回的還是JavaPairRDD
// 但是,JavaPairRDD的第一個泛型類型不變,第二個泛型類型變成Iterable這種集合類型
// 也就是說,按照了key進行分組,那麼每個key可能都會有多個value,此時多個value聚合成了Iterable
// 那麼接下來,我們是不是就可以通過groupedScores這種JavaPairRDD,很友善地處理某個分組内的資料
JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey();
// 列印groupedScores RDD
groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Iterable<Integer>> t)
throws Exception {
System.out.println("class: " + t._1);
Iterator<Integer> ite = t._2.iterator();
while(ite.hasNext()) {
System.out.println(ite.next());
}
System.out.println("==============================");
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void flatMap() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("flatMap")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 構造集合
List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");
// 并行化集合,建立RDD
JavaRDD<String> lines = sc.parallelize(lineList);
// 對RDD執行flatMap算子,将每一行文本,拆分為多個單詞
// flatMap算子,在java中,接收的參數是FlatMapFunction
// 我們需要自己定義FlatMapFunction的第二個泛型類型,即,代表了傳回的新元素的類型
// call()方法,傳回的類型,不是U,而是Iterable<U>,這裡的U也與第二個泛型類型相同
// flatMap其實就是,接收原始RDD中的每個元素,并進行各種邏輯的計算和處理,可以傳回多個元素
// 多個元素,即封裝在Iterable集合中,可以使用ArrayList等集合
// 新的RDD中,即封裝了所有的新元素;也就是說,新的RDD的大小一定是 >= 原始RDD的大小
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
// 在這裡會,比如,傳入第一行,hello you
// 傳回的是一個Iterable<String>(hello, you)
@Override
public Iterable<String> call(String t) throws Exception {
return Arrays.asList(t.split(" "));
}
});
// 列印新的RDD
words.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
@Override
public void call(String t) throws Exception {
System.out.println(t);
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void filter() {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模拟集合
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 并行化集合,建立初始RDD
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 對初始RDD執行filter算子,過濾出其中的偶數
// filter算子,傳入的也是Function,其他的使用注意點,實際上和map是一樣的
// 但是,唯一的不同,就是call()方法的傳回類型是Boolean
// 每一個初始RDD中的元素,都會傳入call()方法,此時你可以執行各種自定義的計算邏輯
// 來判斷這個元素是否是你想要的
// 如果你想在新的RDD中保留這個元素,那麼就傳回true;否則,不想保留這個元素,傳回false
JavaRDD<Integer> evenNumberRDD = numberRDD.filter(
new Function<Integer, Boolean>() {
private static final long serialVersionUID = 1L;
// 在這裡,1到10,都會傳入進來
// 但是根據我們的邏輯,隻有2,4,6,8,10這幾個偶數,會傳回true
// 是以,隻有偶數會保留下來,放在新的RDD中
@Override
public Boolean call(Integer v1) throws Exception {
return v1 % 2 == 0;
}
});
// 列印新的RDD
evenNumberRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void map() {
//建立SparkConf
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//建構集合
List numbers = Arrays.asList(1,2,3,4,5);
//并行化集合,建立初始RDD
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 使用map算子,将集合中的每個元素都乘以2
// map算子,是對任何類型的RDD,都可以調用的
// 在java中,map算子接收的參數是Function對象
// 建立的Function對象,一定會讓你設定第二個泛型參數,這個泛型類型,就是傳回的新元素的類型
// 同時call()方法的傳回類型,也必須與第二個泛型類型同步
// 在call()方法内部,就可以對原始RDD中的每一個元素進行各種處理和計算,并傳回一個新的元素
// 所有新的元素就會組成一個新的RDD
// 所有新的元素就會組成一個新的RDD
JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
multipleNumberRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
}
}
運作代碼
cogroup案例:列印學生成績
package com.it19gong.sparkproject;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class TransforDemo {
public static void main(String[] args) {
//map();
//filter();
//flatMap() ;
// groupByKey();
//sortByKey();
//join();
cogroup();
}
private static void cogroup() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("cogroup")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模拟集合
List<Tuple2<Integer, String>> studentList = Arrays.asList(
new Tuple2<Integer, String>(1, "leo"),
new Tuple2<Integer, String>(2, "jack"),
new Tuple2<Integer, String>(3, "tom"));
List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
new Tuple2<Integer, Integer>(1, 100),
new Tuple2<Integer, Integer>(2, 90),
new Tuple2<Integer, Integer>(3, 60),
new Tuple2<Integer, Integer>(1, 70),
new Tuple2<Integer, Integer>(2, 80),
new Tuple2<Integer, Integer>(3, 50));
// 并行化兩個RDD
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
// cogroup與join不同
// 相當于是,一個key join上的所有value,都給放到一個Iterable裡面去了
// cogroup,不太好講解,希望大家通過動手編寫我們的案例,仔細體會其中的奧妙
JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores =
students.cogroup(scores);
// 列印studnetScores RDD
studentScores.foreach(
new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(
Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t)
throws Exception {
System.out.println("student id: " + t._1);
System.out.println("student name: " + t._2._1);
System.out.println("student score: " + t._2._2);
System.out.println("===============================");
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void join() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("join")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模拟集合
List<Tuple2<Integer, String>> studentList = Arrays.asList(
new Tuple2<Integer, String>(1, "leo"),
new Tuple2<Integer, String>(2, "jack"),
new Tuple2<Integer, String>(3, "tom"));
List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
new Tuple2<Integer, Integer>(1, 100),
new Tuple2<Integer, Integer>(2, 90),
new Tuple2<Integer, Integer>(3, 60));
// 并行化兩個RDD
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
// 使用join算子關聯兩個RDD
// join以後,還是會根據key進行join,并傳回JavaPairRDD
// 但是JavaPairRDD的第一個泛型類型是之前兩個JavaPairRDD的key的類型,因為是通過key進行join的
// 第二個泛型類型,是Tuple2<v1, v2>的類型,Tuple2的兩個泛型分别為原始RDD的value的類型
// join,就傳回的RDD的每一個元素,就是通過key join上的一個pair
// 什麼意思呢?比如有(1, 1) (1, 2) (1, 3)的一個RDD
// 還有一個(1, 4) (2, 1) (2, 2)的一個RDD
// join以後,實際上會得到(1 (1, 4)) (1, (2, 4)) (1, (3, 4))
JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);
// 列印studnetScores RDD
studentScores.foreach(
new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<Integer, Tuple2<String, Integer>> t)
throws Exception {
System.out.println("student id: " + t._1);
System.out.println("student name: " + t._2._1);
System.out.println("student score: " + t._2._2);
System.out.println("===============================");
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void sortByKey() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("sortByKey")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模拟集合
List<Tuple2<Integer, String>> scoreList = Arrays.asList(
new Tuple2<Integer, String>(65, "leo"),
new Tuple2<Integer, String>(50, "tom"),
new Tuple2<Integer, String>(100, "marry"),
new Tuple2<Integer, String>(80, "jack"));
// 并行化集合,建立RDD
JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
// 對scores RDD執行sortByKey算子
// sortByKey其實就是根據key進行排序,可以手動指定升序,或者降序
// 傳回的,還是JavaPairRDD,其中的元素内容,都是和原始的RDD一模一樣的
// 但是就是RDD中的元素的順序,不同了
JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false);
// 列印sortedScored RDD
sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<Integer, String> t) throws Exception {
System.out.println(t._1 + ": " + t._2);
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void groupByKey() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("groupByKey")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模拟集合
List<Tuple2<String, Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("class1", 80),
new Tuple2<String, Integer>("class2", 75),
new Tuple2<String, Integer>("class1", 90),
new Tuple2<String, Integer>("class2", 65));
// 并行化集合,建立JavaPairRDD
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
// 針對scores RDD,執行groupByKey算子,對每個班級的成績進行分組
// groupByKey算子,傳回的還是JavaPairRDD
// 但是,JavaPairRDD的第一個泛型類型不變,第二個泛型類型變成Iterable這種集合類型
// 也就是說,按照了key進行分組,那麼每個key可能都會有多個value,此時多個value聚合成了Iterable
// 那麼接下來,我們是不是就可以通過groupedScores這種JavaPairRDD,很友善地處理某個分組内的資料
JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey();
// 列印groupedScores RDD
groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Iterable<Integer>> t)
throws Exception {
System.out.println("class: " + t._1);
Iterator<Integer> ite = t._2.iterator();
while(ite.hasNext()) {
System.out.println(ite.next());
}
System.out.println("==============================");
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void flatMap() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("flatMap")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 構造集合
List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");
// 并行化集合,建立RDD
JavaRDD<String> lines = sc.parallelize(lineList);
// 對RDD執行flatMap算子,将每一行文本,拆分為多個單詞
// flatMap算子,在java中,接收的參數是FlatMapFunction
// 我們需要自己定義FlatMapFunction的第二個泛型類型,即,代表了傳回的新元素的類型
// call()方法,傳回的類型,不是U,而是Iterable<U>,這裡的U也與第二個泛型類型相同
// flatMap其實就是,接收原始RDD中的每個元素,并進行各種邏輯的計算和處理,可以傳回多個元素
// 多個元素,即封裝在Iterable集合中,可以使用ArrayList等集合
// 新的RDD中,即封裝了所有的新元素;也就是說,新的RDD的大小一定是 >= 原始RDD的大小
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
// 在這裡會,比如,傳入第一行,hello you
// 傳回的是一個Iterable<String>(hello, you)
@Override
public Iterable<String> call(String t) throws Exception {
return Arrays.asList(t.split(" "));
}
});
// 列印新的RDD
words.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
@Override
public void call(String t) throws Exception {
System.out.println(t);
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void filter() {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模拟集合
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 并行化集合,建立初始RDD
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 對初始RDD執行filter算子,過濾出其中的偶數
// filter算子,傳入的也是Function,其他的使用注意點,實際上和map是一樣的
// 但是,唯一的不同,就是call()方法的傳回類型是Boolean
// 每一個初始RDD中的元素,都會傳入call()方法,此時你可以執行各種自定義的計算邏輯
// 來判斷這個元素是否是你想要的
// 如果你想在新的RDD中保留這個元素,那麼就傳回true;否則,不想保留這個元素,傳回false
JavaRDD<Integer> evenNumberRDD = numberRDD.filter(
new Function<Integer, Boolean>() {
private static final long serialVersionUID = 1L;
// 在這裡,1到10,都會傳入進來
// 但是根據我們的邏輯,隻有2,4,6,8,10這幾個偶數,會傳回true
// 是以,隻有偶數會保留下來,放在新的RDD中
@Override
public Boolean call(Integer v1) throws Exception {
return v1 % 2 == 0;
}
});
// 列印新的RDD
evenNumberRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
// 關閉JavaSparkContext
sc.close();
}
private static void map() {
//建立SparkConf
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//建構集合
List numbers = Arrays.asList(1,2,3,4,5);
//并行化集合,建立初始RDD
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 使用map算子,将集合中的每個元素都乘以2
// map算子,是對任何類型的RDD,都可以調用的
// 在java中,map算子接收的參數是Function對象
// 建立的Function對象,一定會讓你設定第二個泛型參數,這個泛型類型,就是傳回的新元素的類型
// 同時call()方法的傳回類型,也必須與第二個泛型類型同步
// 在call()方法内部,就可以對原始RDD中的每一個元素進行各種處理和計算,并傳回一個新的元素
// 所有新的元素就會組成一個新的RDD
// 所有新的元素就會組成一個新的RDD
JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
multipleNumberRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
}
}
運作代碼