天天看點

Spark擷取目前分區的partitionId

版權聲明:本文由董可倫首發于https://dongkelun.com,非商業轉載請注明作者及原創出處。商業轉載請聯系作者本人。 https://blog.csdn.net/dkl12/article/details/80943341

我的原創位址:

https://dongkelun.com/2018/06/28/sparkGetPartitionId/

前言

本文講解Spark如何擷取目前分區的partitionId,這是一位群友提出的問題,其實隻要通過TaskContext.get.partitionId(我是在官網上看到的),下面給出一些示例。

1、代碼

下面的代碼主要測試SparkSession,SparkContext建立的rdd和df是否都支援。

package com.dkl.leanring.partition

import org.apache.spark.sql.SparkSession
import org.apache.spark.TaskContext

/**
 * 擷取目前分區的partitionId
 */
object GetPartitionIdDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("GetPartitionIdDemo").master("local").getOrCreate()
    val sc = spark.sparkContext
    val data = Seq(1, 2, 3, 4)

    // 測試rdd,三個分區
    val rdd = sc.parallelize(data, 3)
    rdd.foreach(i => {
      println("partitionId:" + TaskContext.get.partitionId)
    })

    import spark.implicits._
    // 測試df,三個分區
    val df = rdd.toDF("id")
    df.show
    df.foreach(row => {
      println("partitionId:" + TaskContext.get.partitionId)
    })
    // 測試df,兩個分區
    val data1 = Array((1, 2), (3, 4))
    val df1 = spark.createDataFrame(data1).repartition(2)
    df1.show()
    df1.foreach(row => {
      println("partitionId:" + TaskContext.get.partitionId)
    })

  }
}           

2、結果

繼續閱讀