天天看點

Spark Action算子->foreachPartition

foreachPartition:周遊的資料是每個 partition 的資料。

  1. java
package action;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * @Author yqq
 * @Date 2021/12/10 11:17
 * @Version 1.0
 */
public class ForeachPartitionTest {
    public static void main(String[] args) {
        JavaSparkContext context = new JavaSparkContext(
                new SparkConf()
                        .setMaster("local")
                        .setAppName("ForeachPartition")
        );
        context.setLogLevel("Error");
        context.parallelize(Arrays.asList("a","b","c","d"),2)
                .foreachPartition(e->{//e:每個分區中有多個元素
                    List<String> list = new ArrayList<>();
                    System.out.println("建立資料庫");
                    while (e.hasNext())
                        list.add(e.next());
                    System.out.println("插入資料庫"+list.toString());
                    System.out.println("關閉資料庫連接配接");
                });
    }
}      
package action

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ListBuffer

/**
 * @Author yqq
 * @Date 2021/12/10 11:29
 * @Version 1.0
 */
object ForeachPartitionTest {
  def main(args: Array[String]): Unit = {
    val context = new SparkContext(
      new SparkConf()
        .setMaster("local")
        .setAppName("ForeachPartition")
    )
    context.setLogLevel("Error")
    context.parallelize(Array[String]("a","b","c","d"),2)
      .foreachPartition(e=>{
        val buffer = new ListBuffer[String]()
        println("建立資料庫連接配接")
        while (e.hasNext)
          buffer.append(e.next())
        println("插入資料庫")
        println("關閉資料庫")
      })
  }
}