天天看點

27.Spark中transformation的介紹GroupBYkey案例SortByKey 案例join案例

Spark支援兩種RDD操作:transformation和action。transformation操作會針對已有的RDD建立一個新的RDD;

而action則主要是對RDD進行最後的操作,比如周遊、reduce、儲存到檔案等,并可以傳回結果給Driver程式。

例如,map就是一種transformation操作,它用于将已有RDD的每個元素傳入一個自定義的函數,并擷取一個新的元素,然後将所有的新元素組成一個新的RDD。

而reduce就是一種action操作,它用于對RDD中的所有元素進行聚合操作,并擷取一個最終的結果,然後傳回給Driver程式。

transformation的特點就是lazy特性。lazy特性指的是,如果一個spark應用中隻定義了transformation操作,那麼即使你執行該應用,

這些操作也不會執行。也就是說,transformation是不會觸發spark程式的執行的,它們隻是記錄了對RDD所做的操作,但是不會自發的執行。

隻有當transformation之後,接着執行了一個action操作,那麼所有的transformation才會執行。Spark通過這種lazy特性,來進行底層的spark應用執行的優化,避免産生過多中間結果。

action操作執行,會觸發一個spark job的運作,進而觸發這個action之前所有的transformation的執行。這是action的特性。

 常用transformation介紹

常用action介紹

 transformation操作開發實戰

1、map:将集合中每個元素乘以2

2、filter:過濾出集合中的偶數

3、flatMap:将行拆分為單詞

4、groupByKey:将每個班級的成績進行分組

5、reduceByKey:統計每個班級的總分

6、sortByKey:将學生分數進行排序

7、join:列印每個學生的成績

8、cogroup:列印每個學生的成績

map:将集合中每個元素乘以2

 建立TransforDemo類

package com.it19gong.sparkproject;


import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

public class TransforDemo {

    public static void main(String[] args) {
        map();

    }

    private static void map() {
        //建立SparkConf
        SparkConf conf = new SparkConf().setAppName("map").setMaster("local");       
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);    
        //建構集合
        List numbers = Arrays.asList(1,2,3,4,5);
        //并行化集合,建立初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);

           // 使用map算子,将集合中的每個元素都乘以2
        // map算子,是對任何類型的RDD,都可以調用的
        // 在java中,map算子接收的參數是Function對象
        // 建立的Function對象,一定會讓你設定第二個泛型參數,這個泛型類型,就是傳回的新元素的類型
        // 同時call()方法的傳回類型,也必須與第二個泛型類型同步
        // 在call()方法内部,就可以對原始RDD中的每一個元素進行各種處理和計算,并傳回一個新的元素
        // 所有新的元素就會組成一個新的RDD

        // 所有新的元素就會組成一個新的RDD
        JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1) throws Exception {
                
                return v1 * 2;
                
            }
        });

         multipleNumberRDD.foreach(new VoidFunction<Integer>() {
             
            private static final long serialVersionUID = 1L;

            @Override
            public void call(Integer t) throws Exception {
                 System.out.println(t);
            }
        });

        
    }
}      

運作代碼

filter:過濾出集合中的偶數

添加filter()方法

package com.it19gong.sparkproject;


import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;



public class TransforDemo {

    public static void main(String[] args) {
        //map();
        filter();
    }

    private static void filter() {
                // 建立SparkConf
                SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
                // 建立JavaSparkContext
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                // 模拟集合
                List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
                
                // 并行化集合,建立初始RDD
                JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
                
                // 對初始RDD執行filter算子,過濾出其中的偶數
                // filter算子,傳入的也是Function,其他的使用注意點,實際上和map是一樣的
                // 但是,唯一的不同,就是call()方法的傳回類型是Boolean
                // 每一個初始RDD中的元素,都會傳入call()方法,此時你可以執行各種自定義的計算邏輯
                // 來判斷這個元素是否是你想要的
                // 如果你想在新的RDD中保留這個元素,那麼就傳回true;否則,不想保留這個元素,傳回false

                JavaRDD<Integer> evenNumberRDD = numberRDD.filter(
                        
                        new Function<Integer, Boolean>() {

                            private static final long serialVersionUID = 1L;
                            
                            // 在這裡,1到10,都會傳入進來
                            // 但是根據我們的邏輯,隻有2,4,6,8,10這幾個偶數,會傳回true
                            // 是以,隻有偶數會保留下來,放在新的RDD中
                            @Override
                            public Boolean call(Integer v1) throws Exception {
                                return v1 % 2 == 0;
                            }
                            
                        });
                
                // 列印新的RDD
                evenNumberRDD.foreach(new VoidFunction<Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(Integer t) throws Exception {
                        System.out.println(t);
                    }
                    
                });
                
                // 關閉JavaSparkContext
                sc.close();
            }


    

    private static void map() {
        //建立SparkConf
        SparkConf conf = new SparkConf().setAppName("map").setMaster("local");       
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);    
        //建構集合
        List numbers = Arrays.asList(1,2,3,4,5);
        //并行化集合,建立初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);

           // 使用map算子,将集合中的每個元素都乘以2
        // map算子,是對任何類型的RDD,都可以調用的
        // 在java中,map算子接收的參數是Function對象
        // 建立的Function對象,一定會讓你設定第二個泛型參數,這個泛型類型,就是傳回的新元素的類型
        // 同時call()方法的傳回類型,也必須與第二個泛型類型同步
        // 在call()方法内部,就可以對原始RDD中的每一個元素進行各種處理和計算,并傳回一個新的元素
        // 所有新的元素就會組成一個新的RDD

        // 所有新的元素就會組成一個新的RDD
        JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1) throws Exception {
                
                return v1 * 2;
                
            }
        });

         multipleNumberRDD.foreach(new VoidFunction<Integer>() {
             
            private static final long serialVersionUID = 1L;

            @Override
            public void call(Integer t) throws Exception {
                 System.out.println(t);
            }
        });

        
    }
}      

運作filter()

flatMap案例

flatMap案例:将文本行拆分為多個單詞

package com.it19gong.sparkproject;


import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;



public class TransforDemo {

    public static void main(String[] args) {
        //map();
        //filter();
        flatMap() ;
    }

    private static void flatMap() {
        // 建立SparkConf
                SparkConf conf = new SparkConf()
                        .setAppName("flatMap")  
                        .setMaster("local");  
                // 建立JavaSparkContext
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                // 構造集合
                List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");  
                
                // 并行化集合,建立RDD
                JavaRDD<String> lines = sc.parallelize(lineList);
                
                // 對RDD執行flatMap算子,将每一行文本,拆分為多個單詞
                // flatMap算子,在java中,接收的參數是FlatMapFunction
                // 我們需要自己定義FlatMapFunction的第二個泛型類型,即,代表了傳回的新元素的類型
                // call()方法,傳回的類型,不是U,而是Iterable<U>,這裡的U也與第二個泛型類型相同
                // flatMap其實就是,接收原始RDD中的每個元素,并進行各種邏輯的計算和處理,可以傳回多個元素
                // 多個元素,即封裝在Iterable集合中,可以使用ArrayList等集合
                // 新的RDD中,即封裝了所有的新元素;也就是說,新的RDD的大小一定是 >= 原始RDD的大小
                JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

                    private static final long serialVersionUID = 1L;
                    
                    // 在這裡會,比如,傳入第一行,hello you
                    // 傳回的是一個Iterable<String>(hello, you)
                    @Override
                    public Iterable<String> call(String t) throws Exception {
                        return Arrays.asList(t.split(" "));
                    }
                    
                });
                
                // 列印新的RDD
                words.foreach(new VoidFunction<String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(String t) throws Exception {
                        System.out.println(t);
                    }
                });
                
                // 關閉JavaSparkContext
                sc.close();
        
    }

    
    private static void filter() {
                // 建立SparkConf
                SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
                // 建立JavaSparkContext
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                // 模拟集合
                List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
                
                // 并行化集合,建立初始RDD
                JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
                
                // 對初始RDD執行filter算子,過濾出其中的偶數
                // filter算子,傳入的也是Function,其他的使用注意點,實際上和map是一樣的
                // 但是,唯一的不同,就是call()方法的傳回類型是Boolean
                // 每一個初始RDD中的元素,都會傳入call()方法,此時你可以執行各種自定義的計算邏輯
                // 來判斷這個元素是否是你想要的
                // 如果你想在新的RDD中保留這個元素,那麼就傳回true;否則,不想保留這個元素,傳回false

                JavaRDD<Integer> evenNumberRDD = numberRDD.filter(
                        
                        new Function<Integer, Boolean>() {

                            private static final long serialVersionUID = 1L;
                            
                            // 在這裡,1到10,都會傳入進來
                            // 但是根據我們的邏輯,隻有2,4,6,8,10這幾個偶數,會傳回true
                            // 是以,隻有偶數會保留下來,放在新的RDD中
                            @Override
                            public Boolean call(Integer v1) throws Exception {
                                return v1 % 2 == 0;
                            }
                            
                        });
                
                // 列印新的RDD
                evenNumberRDD.foreach(new VoidFunction<Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(Integer t) throws Exception {
                        System.out.println(t);
                    }
                    
                });
                
                // 關閉JavaSparkContext
                sc.close();
            }


    

    private static void map() {
        //建立SparkConf
        SparkConf conf = new SparkConf().setAppName("map").setMaster("local");       
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);    
        //建構集合
        List numbers = Arrays.asList(1,2,3,4,5);
        //并行化集合,建立初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);

           // 使用map算子,将集合中的每個元素都乘以2
        // map算子,是對任何類型的RDD,都可以調用的
        // 在java中,map算子接收的參數是Function對象
        // 建立的Function對象,一定會讓你設定第二個泛型參數,這個泛型類型,就是傳回的新元素的類型
        // 同時call()方法的傳回類型,也必須與第二個泛型類型同步
        // 在call()方法内部,就可以對原始RDD中的每一個元素進行各種處理和計算,并傳回一個新的元素
        // 所有新的元素就會組成一個新的RDD

        // 所有新的元素就會組成一個新的RDD
        JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1) throws Exception {
                
                return v1 * 2;
                
            }
        });

         multipleNumberRDD.foreach(new VoidFunction<Integer>() {
             
            private static final long serialVersionUID = 1L;

            @Override
            public void call(Integer t) throws Exception {
                 System.out.println(t);
            }
        });

        
    }
}      

運作代碼

GroupBYkey案例

将每個班級的成績進行分組

package com.it19gong.sparkproject;


import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;



public class TransforDemo {

    public static void main(String[] args) {
        //map();
        //filter();
        //flatMap() ;
         groupByKey();
    }

    private static void groupByKey() {
        
        // 建立SparkConf
                SparkConf conf = new SparkConf()
                        .setAppName("groupByKey")  
                        .setMaster("local");
                // 建立JavaSparkContext
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                // 模拟集合
                List<Tuple2<String, Integer>> scoreList = Arrays.asList(
                        new Tuple2<String, Integer>("class1", 80),
                        new Tuple2<String, Integer>("class2", 75),
                        new Tuple2<String, Integer>("class1", 90),
                        new Tuple2<String, Integer>("class2", 65));
                
                // 并行化集合,建立JavaPairRDD
                JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
                
                // 針對scores RDD,執行groupByKey算子,對每個班級的成績進行分組
                // groupByKey算子,傳回的還是JavaPairRDD
                // 但是,JavaPairRDD的第一個泛型類型不變,第二個泛型類型變成Iterable這種集合類型
                // 也就是說,按照了key進行分組,那麼每個key可能都會有多個value,此時多個value聚合成了Iterable
                // 那麼接下來,我們是不是就可以通過groupedScores這種JavaPairRDD,很友善地處理某個分組内的資料
                JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey();
                
                // 列印groupedScores RDD
                groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(Tuple2<String, Iterable<Integer>> t)
                            throws Exception {
                        System.out.println("class: " + t._1);  
                        Iterator<Integer> ite = t._2.iterator();
                        while(ite.hasNext()) {
                            System.out.println(ite.next());  
                        }
                        System.out.println("==============================");   
                    }
                    
                });
                
                // 關閉JavaSparkContext
                sc.close();

    }

    private static void flatMap() {
        // 建立SparkConf
                SparkConf conf = new SparkConf()
                        .setAppName("flatMap")  
                        .setMaster("local");  
                // 建立JavaSparkContext
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                // 構造集合
                List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");  
                
                // 并行化集合,建立RDD
                JavaRDD<String> lines = sc.parallelize(lineList);
                
                // 對RDD執行flatMap算子,将每一行文本,拆分為多個單詞
                // flatMap算子,在java中,接收的參數是FlatMapFunction
                // 我們需要自己定義FlatMapFunction的第二個泛型類型,即,代表了傳回的新元素的類型
                // call()方法,傳回的類型,不是U,而是Iterable<U>,這裡的U也與第二個泛型類型相同
                // flatMap其實就是,接收原始RDD中的每個元素,并進行各種邏輯的計算和處理,可以傳回多個元素
                // 多個元素,即封裝在Iterable集合中,可以使用ArrayList等集合
                // 新的RDD中,即封裝了所有的新元素;也就是說,新的RDD的大小一定是 >= 原始RDD的大小
                JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

                    private static final long serialVersionUID = 1L;
                    
                    // 在這裡會,比如,傳入第一行,hello you
                    // 傳回的是一個Iterable<String>(hello, you)
                    @Override
                    public Iterable<String> call(String t) throws Exception {
                        return Arrays.asList(t.split(" "));
                    }
                    
                });
                
                // 列印新的RDD
                words.foreach(new VoidFunction<String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(String t) throws Exception {
                        System.out.println(t);
                    }
                });
                
                // 關閉JavaSparkContext
                sc.close();
        
    }

    private static void filter() {
                // 建立SparkConf
                SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
                // 建立JavaSparkContext
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                // 模拟集合
                List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
                
                // 并行化集合,建立初始RDD
                JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
                
                // 對初始RDD執行filter算子,過濾出其中的偶數
                // filter算子,傳入的也是Function,其他的使用注意點,實際上和map是一樣的
                // 但是,唯一的不同,就是call()方法的傳回類型是Boolean
                // 每一個初始RDD中的元素,都會傳入call()方法,此時你可以執行各種自定義的計算邏輯
                // 來判斷這個元素是否是你想要的
                // 如果你想在新的RDD中保留這個元素,那麼就傳回true;否則,不想保留這個元素,傳回false

                JavaRDD<Integer> evenNumberRDD = numberRDD.filter(
                        
                        new Function<Integer, Boolean>() {

                            private static final long serialVersionUID = 1L;
                            
                            // 在這裡,1到10,都會傳入進來
                            // 但是根據我們的邏輯,隻有2,4,6,8,10這幾個偶數,會傳回true
                            // 是以,隻有偶數會保留下來,放在新的RDD中
                            @Override
                            public Boolean call(Integer v1) throws Exception {
                                return v1 % 2 == 0;
                            }
                            
                        });
                
                // 列印新的RDD
                evenNumberRDD.foreach(new VoidFunction<Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(Integer t) throws Exception {
                        System.out.println(t);
                    }
                    
                });
                
                // 關閉JavaSparkContext
                sc.close();
            }


    private static void map() {
        //建立SparkConf
        SparkConf conf = new SparkConf().setAppName("map").setMaster("local");       
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);    
        //建構集合
        List numbers = Arrays.asList(1,2,3,4,5);
        //并行化集合,建立初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);

           // 使用map算子,将集合中的每個元素都乘以2
        // map算子,是對任何類型的RDD,都可以調用的
        // 在java中,map算子接收的參數是Function對象
        // 建立的Function對象,一定會讓你設定第二個泛型參數,這個泛型類型,就是傳回的新元素的類型
        // 同時call()方法的傳回類型,也必須與第二個泛型類型同步
        // 在call()方法内部,就可以對原始RDD中的每一個元素進行各種處理和計算,并傳回一個新的元素
        // 所有新的元素就會組成一個新的RDD

        // 所有新的元素就會組成一個新的RDD
        JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1) throws Exception {
                
                return v1 * 2;
                
            }
        });

         multipleNumberRDD.foreach(new VoidFunction<Integer>() {
             
            private static final long serialVersionUID = 1L;

            @Override
            public void call(Integer t) throws Exception {
                 System.out.println(t);
            }
        });

        
    }
}      

運作代碼

SortByKey 案例

将學生分數進行排序

package com.it19gong.sparkproject;


import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;



public class TransforDemo {

    public static void main(String[] args) {
        //map();
        //filter();
        //flatMap() ;
        // groupByKey();
        sortByKey();
    }

    private static void sortByKey() {
        // 建立SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("sortByKey")  
                .setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 模拟集合
        List<Tuple2<Integer, String>> scoreList = Arrays.asList(
                new Tuple2<Integer, String>(65, "leo"),
                new Tuple2<Integer, String>(50, "tom"),
                new Tuple2<Integer, String>(100, "marry"),
                new Tuple2<Integer, String>(80, "jack"));
        
        // 并行化集合,建立RDD
        JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
        
        // 對scores RDD執行sortByKey算子
        // sortByKey其實就是根據key進行排序,可以手動指定升序,或者降序
        // 傳回的,還是JavaPairRDD,其中的元素内容,都是和原始的RDD一模一樣的
        // 但是就是RDD中的元素的順序,不同了
        JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false);  
        
        // 列印sortedScored RDD
        sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public void call(Tuple2<Integer, String> t) throws Exception {
                System.out.println(t._1 + ": " + t._2);  
            }
            
        });
        
        // 關閉JavaSparkContext
        sc.close();

        
    }

    private static void groupByKey() {
        
        // 建立SparkConf
                SparkConf conf = new SparkConf()
                        .setAppName("groupByKey")  
                        .setMaster("local");
                // 建立JavaSparkContext
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                // 模拟集合
                List<Tuple2<String, Integer>> scoreList = Arrays.asList(
                        new Tuple2<String, Integer>("class1", 80),
                        new Tuple2<String, Integer>("class2", 75),
                        new Tuple2<String, Integer>("class1", 90),
                        new Tuple2<String, Integer>("class2", 65));
                
                // 并行化集合,建立JavaPairRDD
                JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
                
                // 針對scores RDD,執行groupByKey算子,對每個班級的成績進行分組
                // groupByKey算子,傳回的還是JavaPairRDD
                // 但是,JavaPairRDD的第一個泛型類型不變,第二個泛型類型變成Iterable這種集合類型
                // 也就是說,按照了key進行分組,那麼每個key可能都會有多個value,此時多個value聚合成了Iterable
                // 那麼接下來,我們是不是就可以通過groupedScores這種JavaPairRDD,很友善地處理某個分組内的資料
                JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey();
                
                // 列印groupedScores RDD
                groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(Tuple2<String, Iterable<Integer>> t)
                            throws Exception {
                        System.out.println("class: " + t._1);  
                        Iterator<Integer> ite = t._2.iterator();
                        while(ite.hasNext()) {
                            System.out.println(ite.next());  
                        }
                        System.out.println("==============================");   
                    }
                    
                });
                
                // 關閉JavaSparkContext
                sc.close();

    }

    private static void flatMap() {
        // 建立SparkConf
                SparkConf conf = new SparkConf()
                        .setAppName("flatMap")  
                        .setMaster("local");  
                // 建立JavaSparkContext
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                // 構造集合
                List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");  
                
                // 并行化集合,建立RDD
                JavaRDD<String> lines = sc.parallelize(lineList);
                
                // 對RDD執行flatMap算子,将每一行文本,拆分為多個單詞
                // flatMap算子,在java中,接收的參數是FlatMapFunction
                // 我們需要自己定義FlatMapFunction的第二個泛型類型,即,代表了傳回的新元素的類型
                // call()方法,傳回的類型,不是U,而是Iterable<U>,這裡的U也與第二個泛型類型相同
                // flatMap其實就是,接收原始RDD中的每個元素,并進行各種邏輯的計算和處理,可以傳回多個元素
                // 多個元素,即封裝在Iterable集合中,可以使用ArrayList等集合
                // 新的RDD中,即封裝了所有的新元素;也就是說,新的RDD的大小一定是 >= 原始RDD的大小
                JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

                    private static final long serialVersionUID = 1L;
                    
                    // 在這裡會,比如,傳入第一行,hello you
                    // 傳回的是一個Iterable<String>(hello, you)
                    @Override
                    public Iterable<String> call(String t) throws Exception {
                        return Arrays.asList(t.split(" "));
                    }
                    
                });
                
                // 列印新的RDD
                words.foreach(new VoidFunction<String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(String t) throws Exception {
                        System.out.println(t);
                    }
                });
                
                // 關閉JavaSparkContext
                sc.close();
        
    }

    private static void filter() {
                // 建立SparkConf
                SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
                // 建立JavaSparkContext
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                // 模拟集合
                List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
                
                // 并行化集合,建立初始RDD
                JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
                
                // 對初始RDD執行filter算子,過濾出其中的偶數
                // filter算子,傳入的也是Function,其他的使用注意點,實際上和map是一樣的
                // 但是,唯一的不同,就是call()方法的傳回類型是Boolean
                // 每一個初始RDD中的元素,都會傳入call()方法,此時你可以執行各種自定義的計算邏輯
                // 來判斷這個元素是否是你想要的
                // 如果你想在新的RDD中保留這個元素,那麼就傳回true;否則,不想保留這個元素,傳回false

                JavaRDD<Integer> evenNumberRDD = numberRDD.filter(
                        
                        new Function<Integer, Boolean>() {

                            private static final long serialVersionUID = 1L;
                            
                            // 在這裡,1到10,都會傳入進來
                            // 但是根據我們的邏輯,隻有2,4,6,8,10這幾個偶數,會傳回true
                            // 是以,隻有偶數會保留下來,放在新的RDD中
                            @Override
                            public Boolean call(Integer v1) throws Exception {
                                return v1 % 2 == 0;
                            }
                            
                        });
                
                // 列印新的RDD
                evenNumberRDD.foreach(new VoidFunction<Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(Integer t) throws Exception {
                        System.out.println(t);
                    }
                    
                });
                
                // 關閉JavaSparkContext
                sc.close();
            }


    private static void map() {
        //建立SparkConf
        SparkConf conf = new SparkConf().setAppName("map").setMaster("local");       
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);    
        //建構集合
        List numbers = Arrays.asList(1,2,3,4,5);
        //并行化集合,建立初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);

           // 使用map算子,将集合中的每個元素都乘以2
        // map算子,是對任何類型的RDD,都可以調用的
        // 在java中,map算子接收的參數是Function對象
        // 建立的Function對象,一定會讓你設定第二個泛型參數,這個泛型類型,就是傳回的新元素的類型
        // 同時call()方法的傳回類型,也必須與第二個泛型類型同步
        // 在call()方法内部,就可以對原始RDD中的每一個元素進行各種處理和計算,并傳回一個新的元素
        // 所有新的元素就會組成一個新的RDD

        // 所有新的元素就會組成一個新的RDD
        JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1) throws Exception {
                
                return v1 * 2;
                
            }
        });

         multipleNumberRDD.foreach(new VoidFunction<Integer>() {
             
            private static final long serialVersionUID = 1L;

            @Override
            public void call(Integer t) throws Exception {
                 System.out.println(t);
            }
        });

        
    }
}      

運作代碼

join案例

列印學生成績

package com.it19gong.sparkproject;


import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;



public class TransforDemo {

    public static void main(String[] args) {
        //map();
        //filter();
        //flatMap() ;
        // groupByKey();
        //sortByKey();
        join();
    }

    private static void join() {
        // 建立SparkConf
                SparkConf conf = new SparkConf()
                        .setAppName("join")  
                        .setMaster("local");
                // 建立JavaSparkContext
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                // 模拟集合
                List<Tuple2<Integer, String>> studentList = Arrays.asList(
                        new Tuple2<Integer, String>(1, "leo"),
                        new Tuple2<Integer, String>(2, "jack"),
                        new Tuple2<Integer, String>(3, "tom"));
                
                List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
                        new Tuple2<Integer, Integer>(1, 100),
                        new Tuple2<Integer, Integer>(2, 90),
                        new Tuple2<Integer, Integer>(3, 60));
                
                // 并行化兩個RDD
                JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
                JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
                
                // 使用join算子關聯兩個RDD
                // join以後,還是會根據key進行join,并傳回JavaPairRDD
                // 但是JavaPairRDD的第一個泛型類型是之前兩個JavaPairRDD的key的類型,因為是通過key進行join的
                // 第二個泛型類型,是Tuple2<v1, v2>的類型,Tuple2的兩個泛型分别為原始RDD的value的類型
                // join,就傳回的RDD的每一個元素,就是通過key join上的一個pair
                // 什麼意思呢?比如有(1, 1) (1, 2) (1, 3)的一個RDD
                    // 還有一個(1, 4) (2, 1) (2, 2)的一個RDD
                    // join以後,實際上會得到(1 (1, 4)) (1, (2, 4)) (1, (3, 4))
                JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);
                
                // 列印studnetScores RDD
                studentScores.foreach(
                        
                        new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {

                            private static final long serialVersionUID = 1L;
                
                            @Override
                            public void call(Tuple2<Integer, Tuple2<String, Integer>> t)
                                    throws Exception {
                                System.out.println("student id: " + t._1);  
                                System.out.println("student name: " + t._2._1);  
                                System.out.println("student score: " + t._2._2);
                                System.out.println("===============================");   
                            }
                            
                        });
                
                // 關閉JavaSparkContext
                sc.close();

        
    }

    private static void sortByKey() {
        // 建立SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("sortByKey")  
                .setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 模拟集合
        List<Tuple2<Integer, String>> scoreList = Arrays.asList(
                new Tuple2<Integer, String>(65, "leo"),
                new Tuple2<Integer, String>(50, "tom"),
                new Tuple2<Integer, String>(100, "marry"),
                new Tuple2<Integer, String>(80, "jack"));
        
        // 并行化集合,建立RDD
        JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
        
        // 對scores RDD執行sortByKey算子
        // sortByKey其實就是根據key進行排序,可以手動指定升序,或者降序
        // 傳回的,還是JavaPairRDD,其中的元素内容,都是和原始的RDD一模一樣的
        // 但是就是RDD中的元素的順序,不同了
        JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false);  
        
        // 列印sortedScored RDD
        sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public void call(Tuple2<Integer, String> t) throws Exception {
                System.out.println(t._1 + ": " + t._2);  
            }
            
        });
        
        // 關閉JavaSparkContext
        sc.close();

        
    }

    private static void groupByKey() {
        
        // 建立SparkConf
                SparkConf conf = new SparkConf()
                        .setAppName("groupByKey")  
                        .setMaster("local");
                // 建立JavaSparkContext
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                // 模拟集合
                List<Tuple2<String, Integer>> scoreList = Arrays.asList(
                        new Tuple2<String, Integer>("class1", 80),
                        new Tuple2<String, Integer>("class2", 75),
                        new Tuple2<String, Integer>("class1", 90),
                        new Tuple2<String, Integer>("class2", 65));
                
                // 并行化集合,建立JavaPairRDD
                JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
                
                // 針對scores RDD,執行groupByKey算子,對每個班級的成績進行分組
                // groupByKey算子,傳回的還是JavaPairRDD
                // 但是,JavaPairRDD的第一個泛型類型不變,第二個泛型類型變成Iterable這種集合類型
                // 也就是說,按照了key進行分組,那麼每個key可能都會有多個value,此時多個value聚合成了Iterable
                // 那麼接下來,我們是不是就可以通過groupedScores這種JavaPairRDD,很友善地處理某個分組内的資料
                JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey();
                
                // 列印groupedScores RDD
                groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(Tuple2<String, Iterable<Integer>> t)
                            throws Exception {
                        System.out.println("class: " + t._1);  
                        Iterator<Integer> ite = t._2.iterator();
                        while(ite.hasNext()) {
                            System.out.println(ite.next());  
                        }
                        System.out.println("==============================");   
                    }
                    
                });
                
                // 關閉JavaSparkContext
                sc.close();

    }

    private static void flatMap() {
        // 建立SparkConf
                SparkConf conf = new SparkConf()
                        .setAppName("flatMap")  
                        .setMaster("local");  
                // 建立JavaSparkContext
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                // 構造集合
                List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");  
                
                // 并行化集合,建立RDD
                JavaRDD<String> lines = sc.parallelize(lineList);
                
                // 對RDD執行flatMap算子,将每一行文本,拆分為多個單詞
                // flatMap算子,在java中,接收的參數是FlatMapFunction
                // 我們需要自己定義FlatMapFunction的第二個泛型類型,即,代表了傳回的新元素的類型
                // call()方法,傳回的類型,不是U,而是Iterable<U>,這裡的U也與第二個泛型類型相同
                // flatMap其實就是,接收原始RDD中的每個元素,并進行各種邏輯的計算和處理,可以傳回多個元素
                // 多個元素,即封裝在Iterable集合中,可以使用ArrayList等集合
                // 新的RDD中,即封裝了所有的新元素;也就是說,新的RDD的大小一定是 >= 原始RDD的大小
                JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

                    private static final long serialVersionUID = 1L;
                    
                    // 在這裡會,比如,傳入第一行,hello you
                    // 傳回的是一個Iterable<String>(hello, you)
                    @Override
                    public Iterable<String> call(String t) throws Exception {
                        return Arrays.asList(t.split(" "));
                    }
                    
                });
                
                // 列印新的RDD
                words.foreach(new VoidFunction<String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(String t) throws Exception {
                        System.out.println(t);
                    }
                });
                
                // 關閉JavaSparkContext
                sc.close();
        
    }

    private static void filter() {
                // 建立SparkConf
                SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
                // 建立JavaSparkContext
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                // 模拟集合
                List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
                
                // 并行化集合,建立初始RDD
                JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
                
                // 對初始RDD執行filter算子,過濾出其中的偶數
                // filter算子,傳入的也是Function,其他的使用注意點,實際上和map是一樣的
                // 但是,唯一的不同,就是call()方法的傳回類型是Boolean
                // 每一個初始RDD中的元素,都會傳入call()方法,此時你可以執行各種自定義的計算邏輯
                // 來判斷這個元素是否是你想要的
                // 如果你想在新的RDD中保留這個元素,那麼就傳回true;否則,不想保留這個元素,傳回false

                JavaRDD<Integer> evenNumberRDD = numberRDD.filter(
                        
                        new Function<Integer, Boolean>() {

                            private static final long serialVersionUID = 1L;
                            
                            // 在這裡,1到10,都會傳入進來
                            // 但是根據我們的邏輯,隻有2,4,6,8,10這幾個偶數,會傳回true
                            // 是以,隻有偶數會保留下來,放在新的RDD中
                            @Override
                            public Boolean call(Integer v1) throws Exception {
                                return v1 % 2 == 0;
                            }
                            
                        });
                
                // 列印新的RDD
                evenNumberRDD.foreach(new VoidFunction<Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(Integer t) throws Exception {
                        System.out.println(t);
                    }
                    
                });
                
                // 關閉JavaSparkContext
                sc.close();
            }


    private static void map() {
        //建立SparkConf
        SparkConf conf = new SparkConf().setAppName("map").setMaster("local");       
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);    
        //建構集合
        List numbers = Arrays.asList(1,2,3,4,5);
        //并行化集合,建立初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);

           // 使用map算子,将集合中的每個元素都乘以2
        // map算子,是對任何類型的RDD,都可以調用的
        // 在java中,map算子接收的參數是Function對象
        // 建立的Function對象,一定會讓你設定第二個泛型參數,這個泛型類型,就是傳回的新元素的類型
        // 同時call()方法的傳回類型,也必須與第二個泛型類型同步
        // 在call()方法内部,就可以對原始RDD中的每一個元素進行各種處理和計算,并傳回一個新的元素
        // 所有新的元素就會組成一個新的RDD

        // 所有新的元素就會組成一個新的RDD
        JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1) throws Exception {
                
                return v1 * 2;
                
            }
        });

         multipleNumberRDD.foreach(new VoidFunction<Integer>() {
             
            private static final long serialVersionUID = 1L;

            @Override
            public void call(Integer t) throws Exception {
                 System.out.println(t);
            }
        });

        
    }
}      

運作代碼

cogroup案例:列印學生成績

package com.it19gong.sparkproject;


import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;



public class TransforDemo {

    public static void main(String[] args) {
        //map();
        //filter();
        //flatMap() ;
        // groupByKey();
        //sortByKey();
        //join();
        cogroup();
        
    }

    private static void cogroup() {
        // 建立SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("cogroup")  
                .setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 模拟集合
        List<Tuple2<Integer, String>> studentList = Arrays.asList(
                new Tuple2<Integer, String>(1, "leo"),
                new Tuple2<Integer, String>(2, "jack"),
                new Tuple2<Integer, String>(3, "tom"));
        
        List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
                new Tuple2<Integer, Integer>(1, 100),
                new Tuple2<Integer, Integer>(2, 90),
                new Tuple2<Integer, Integer>(3, 60),
                new Tuple2<Integer, Integer>(1, 70),
                new Tuple2<Integer, Integer>(2, 80),
                new Tuple2<Integer, Integer>(3, 50));
        
        // 并行化兩個RDD
        JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
        JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
        
        // cogroup與join不同
        // 相當于是,一個key join上的所有value,都給放到一個Iterable裡面去了 
        // cogroup,不太好講解,希望大家通過動手編寫我們的案例,仔細體會其中的奧妙
        JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores = 
                students.cogroup(scores);
        
        // 列印studnetScores RDD
        studentScores.foreach(
                
                new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {

                    private static final long serialVersionUID = 1L;
        
                    @Override
                    public void call(
                            Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t)
                            throws Exception {
                        System.out.println("student id: " + t._1);  
                        System.out.println("student name: " + t._2._1);  
                        System.out.println("student score: " + t._2._2);
                        System.out.println("===============================");   
                    }
                    
                });
        
        // 關閉JavaSparkContext
        sc.close();
    }

    
    private static void join() {
        // 建立SparkConf
                SparkConf conf = new SparkConf()
                        .setAppName("join")  
                        .setMaster("local");
                // 建立JavaSparkContext
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                // 模拟集合
                List<Tuple2<Integer, String>> studentList = Arrays.asList(
                        new Tuple2<Integer, String>(1, "leo"),
                        new Tuple2<Integer, String>(2, "jack"),
                        new Tuple2<Integer, String>(3, "tom"));
                
                List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
                        new Tuple2<Integer, Integer>(1, 100),
                        new Tuple2<Integer, Integer>(2, 90),
                        new Tuple2<Integer, Integer>(3, 60));
                
                // 并行化兩個RDD
                JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
                JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
                
                // 使用join算子關聯兩個RDD
                // join以後,還是會根據key進行join,并傳回JavaPairRDD
                // 但是JavaPairRDD的第一個泛型類型是之前兩個JavaPairRDD的key的類型,因為是通過key進行join的
                // 第二個泛型類型,是Tuple2<v1, v2>的類型,Tuple2的兩個泛型分别為原始RDD的value的類型
                // join,就傳回的RDD的每一個元素,就是通過key join上的一個pair
                // 什麼意思呢?比如有(1, 1) (1, 2) (1, 3)的一個RDD
                    // 還有一個(1, 4) (2, 1) (2, 2)的一個RDD
                    // join以後,實際上會得到(1 (1, 4)) (1, (2, 4)) (1, (3, 4))
                JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);
                
                // 列印studnetScores RDD
                studentScores.foreach(
                        
                        new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {

                            private static final long serialVersionUID = 1L;
                
                            @Override
                            public void call(Tuple2<Integer, Tuple2<String, Integer>> t)
                                    throws Exception {
                                System.out.println("student id: " + t._1);  
                                System.out.println("student name: " + t._2._1);  
                                System.out.println("student score: " + t._2._2);
                                System.out.println("===============================");   
                            }
                            
                        });
                
                // 關閉JavaSparkContext
                sc.close();

        
    }

    private static void sortByKey() {
        // 建立SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("sortByKey")  
                .setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 模拟集合
        List<Tuple2<Integer, String>> scoreList = Arrays.asList(
                new Tuple2<Integer, String>(65, "leo"),
                new Tuple2<Integer, String>(50, "tom"),
                new Tuple2<Integer, String>(100, "marry"),
                new Tuple2<Integer, String>(80, "jack"));
        
        // 并行化集合,建立RDD
        JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
        
        // 對scores RDD執行sortByKey算子
        // sortByKey其實就是根據key進行排序,可以手動指定升序,或者降序
        // 傳回的,還是JavaPairRDD,其中的元素内容,都是和原始的RDD一模一樣的
        // 但是就是RDD中的元素的順序,不同了
        JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false);  
        
        // 列印sortedScored RDD
        sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public void call(Tuple2<Integer, String> t) throws Exception {
                System.out.println(t._1 + ": " + t._2);  
            }
            
        });
        
        // 關閉JavaSparkContext
        sc.close();

        
    }

    private static void groupByKey() {
        
        // 建立SparkConf
                SparkConf conf = new SparkConf()
                        .setAppName("groupByKey")  
                        .setMaster("local");
                // 建立JavaSparkContext
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                // 模拟集合
                List<Tuple2<String, Integer>> scoreList = Arrays.asList(
                        new Tuple2<String, Integer>("class1", 80),
                        new Tuple2<String, Integer>("class2", 75),
                        new Tuple2<String, Integer>("class1", 90),
                        new Tuple2<String, Integer>("class2", 65));
                
                // 并行化集合,建立JavaPairRDD
                JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
                
                // 針對scores RDD,執行groupByKey算子,對每個班級的成績進行分組
                // groupByKey算子,傳回的還是JavaPairRDD
                // 但是,JavaPairRDD的第一個泛型類型不變,第二個泛型類型變成Iterable這種集合類型
                // 也就是說,按照了key進行分組,那麼每個key可能都會有多個value,此時多個value聚合成了Iterable
                // 那麼接下來,我們是不是就可以通過groupedScores這種JavaPairRDD,很友善地處理某個分組内的資料
                JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey();
                
                // 列印groupedScores RDD
                groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(Tuple2<String, Iterable<Integer>> t)
                            throws Exception {
                        System.out.println("class: " + t._1);  
                        Iterator<Integer> ite = t._2.iterator();
                        while(ite.hasNext()) {
                            System.out.println(ite.next());  
                        }
                        System.out.println("==============================");   
                    }
                    
                });
                
                // 關閉JavaSparkContext
                sc.close();

    }

    private static void flatMap() {
        // 建立SparkConf
                SparkConf conf = new SparkConf()
                        .setAppName("flatMap")  
                        .setMaster("local");  
                // 建立JavaSparkContext
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                // 構造集合
                List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");  
                
                // 并行化集合,建立RDD
                JavaRDD<String> lines = sc.parallelize(lineList);
                
                // 對RDD執行flatMap算子,将每一行文本,拆分為多個單詞
                // flatMap算子,在java中,接收的參數是FlatMapFunction
                // 我們需要自己定義FlatMapFunction的第二個泛型類型,即,代表了傳回的新元素的類型
                // call()方法,傳回的類型,不是U,而是Iterable<U>,這裡的U也與第二個泛型類型相同
                // flatMap其實就是,接收原始RDD中的每個元素,并進行各種邏輯的計算和處理,可以傳回多個元素
                // 多個元素,即封裝在Iterable集合中,可以使用ArrayList等集合
                // 新的RDD中,即封裝了所有的新元素;也就是說,新的RDD的大小一定是 >= 原始RDD的大小
                JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

                    private static final long serialVersionUID = 1L;
                    
                    // 在這裡會,比如,傳入第一行,hello you
                    // 傳回的是一個Iterable<String>(hello, you)
                    @Override
                    public Iterable<String> call(String t) throws Exception {
                        return Arrays.asList(t.split(" "));
                    }
                    
                });
                
                // 列印新的RDD
                words.foreach(new VoidFunction<String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(String t) throws Exception {
                        System.out.println(t);
                    }
                });
                
                // 關閉JavaSparkContext
                sc.close();
        
    }

    private static void filter() {
                // 建立SparkConf
                SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
                // 建立JavaSparkContext
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                // 模拟集合
                List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
                
                // 并行化集合,建立初始RDD
                JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
                
                // 對初始RDD執行filter算子,過濾出其中的偶數
                // filter算子,傳入的也是Function,其他的使用注意點,實際上和map是一樣的
                // 但是,唯一的不同,就是call()方法的傳回類型是Boolean
                // 每一個初始RDD中的元素,都會傳入call()方法,此時你可以執行各種自定義的計算邏輯
                // 來判斷這個元素是否是你想要的
                // 如果你想在新的RDD中保留這個元素,那麼就傳回true;否則,不想保留這個元素,傳回false

                JavaRDD<Integer> evenNumberRDD = numberRDD.filter(
                        
                        new Function<Integer, Boolean>() {

                            private static final long serialVersionUID = 1L;
                            
                            // 在這裡,1到10,都會傳入進來
                            // 但是根據我們的邏輯,隻有2,4,6,8,10這幾個偶數,會傳回true
                            // 是以,隻有偶數會保留下來,放在新的RDD中
                            @Override
                            public Boolean call(Integer v1) throws Exception {
                                return v1 % 2 == 0;
                            }
                            
                        });
                
                // 列印新的RDD
                evenNumberRDD.foreach(new VoidFunction<Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(Integer t) throws Exception {
                        System.out.println(t);
                    }
                    
                });
                
                // 關閉JavaSparkContext
                sc.close();
            }


    private static void map() {
        //建立SparkConf
        SparkConf conf = new SparkConf().setAppName("map").setMaster("local");       
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);    
        //建構集合
        List numbers = Arrays.asList(1,2,3,4,5);
        //并行化集合,建立初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);

           // 使用map算子,将集合中的每個元素都乘以2
        // map算子,是對任何類型的RDD,都可以調用的
        // 在java中,map算子接收的參數是Function對象
        // 建立的Function對象,一定會讓你設定第二個泛型參數,這個泛型類型,就是傳回的新元素的類型
        // 同時call()方法的傳回類型,也必須與第二個泛型類型同步
        // 在call()方法内部,就可以對原始RDD中的每一個元素進行各種處理和計算,并傳回一個新的元素
        // 所有新的元素就會組成一個新的RDD

        // 所有新的元素就會組成一個新的RDD
        JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1) throws Exception {
                
                return v1 * 2;
                
            }
        });

         multipleNumberRDD.foreach(new VoidFunction<Integer>() {
             
            private static final long serialVersionUID = 1L;

            @Override
            public void call(Integer t) throws Exception {
                 System.out.println(t);
            }
        });

        
    }
}      

運作代碼