天天看点

SparkStreming:使用Checkpoint创建StreamingContext修改executor-cores、executor-memory等资源信息不生效。

在使用SparkStreaming时,使用StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)创建StreamingContext。代码示例如下:

// Function to create and setup a new StreamingContext
    def functionToCreateContext(): StreamingContext = {

      val conf = new SparkConf().setAppName("UserBrowse")
      val ssc = new StreamingContext(conf, batchInterval)

      //通过LogHubCursorPosition.BEGIN_CURSOR指定消费Cursor的模式。
      val loghubStream = LoghubUtils.createStream(...)

      loghubStream.checkpoint(batchInterval * 5).foreachRDD { rdd =>

      val spark = SparkSession.builder.config(rdd.sparkContext.getConf)
          .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .getOrCreate()
      }

      ssc.checkpoint(checkpointDirectory) // set checkpoint directory
      ssc
    }
    // Get StreamingContext from checkpoint data or create a new one
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)           

此时通过控制台提交Spark任务命令如下:

--class com.test.StreamingText
--jars /spark-it/loghub-spark-0.6.13_2.4.3-1.0.4.jar,/spark-it/loghub-client-lib-0.6.13.jar
--driver-memory 1G 
--driver-cores 1
--executor-cores 3 
--executor-memory 3G
--num-executors 1
--name spark_on_loghub
/spark-it/sparkstreaming-0.0.1-SNAPSHOT.jar
/tmp/checkpoint_location_test            

其中/tmp/checkpoint_location_test 为StreamingContext的checkpoint路径。

运行一段时间后,用户期望修改executor-cores为4,executor-memory 为12G,num-executors为3。那如何修改呢?

由于SparkStreming的运行机制是长久运行,以及checkpoint的设置是为了任务异常能从checkpoint恢复数据。

首次提交任务后,StremingContext会把Spark的配置信息写入到Checkpoint中,包括:executor-cores、num-executors、executor-memory等配置信息。

当任务异常或者重启后,StremingContext会从Checkpoint中读取Spark的配置信息。所以这时如果在控制台修改executor-cores、executor-memory等配置信息,StremingContext不会读取的。

如果需要修改executor-cores、executor-memory等配置信息需要清除Checkpoint路径,或者重新指定一个新的Checkpoint路径。

继续阅读