天天看点

StreamingListener技术点

以下是对StreamingListene的研究,由于比较简单,故只贴代码,不做解释

/**
  * Created by gabry.wu on 2016/5/27.
  * 实现StreamingListener,以监控spark作业状态
  * 传入StreamingContext可以在某种出错时退出当前的SparkStreaming
  */
class StreamingMonitor(ssc:StreamingContext) extends StreamingListener{
  private val log =  LoggerFactory.getLogger("SparkStreamingMonitor")
  // Receiver启动
  override def onReceiverStarted(receiverStarted : StreamingListenerReceiverStarted): Unit = {
    log.warn("onReceiverStarted")
    log.warn(s"active=${receiverStarted.receiverInfo.active},executorId=${receiverStarted.receiverInfo.executorId}," +
      s"lastError=${receiverStarted.receiverInfo.lastError},lastErrorMessage=${receiverStarted.receiverInfo.lastErrorMessage}," +
      s"location=${receiverStarted.receiverInfo.location},name=${receiverStarted.receiverInfo.name}," +
      s"streamId=${receiverStarted.receiverInfo.streamId}")
  }
  // Receiver报错
  override def onReceiverError(receiverError : StreamingListenerReceiverError): Unit = {
    log.warn("onReceiverError")
    //可在该函数处理Receiver失败
    log.warn(s"active=${receiverError.receiverInfo.active},executorId=${receiverError.receiverInfo.executorId}," +
      s"lastError=${receiverError.receiverInfo.lastError},lastErrorMessage=${receiverError.receiverInfo.lastErrorMessage}," +
      s"location=${receiverError.receiverInfo.location},name=${receiverError.receiverInfo.name}," +
      s"streamId=${receiverError.receiverInfo.streamId}")
  }
  // Receiver停止
  override def onReceiverStopped(receiverStopped : StreamingListenerReceiverStopped): Unit = {
    log.warn("onReceiverStopped")
    log.warn(s"active=${receiverStopped.receiverInfo.active},executorId=${receiverStopped.receiverInfo.executorId}," +
      s"lastError=${receiverStopped.receiverInfo.lastError},lastErrorMessage=${receiverStopped.receiverInfo.lastErrorMessage}," +
      s"location=${receiverStopped.receiverInfo.location},name=${receiverStopped.receiverInfo.name}," +
      s"streamId=${receiverStopped.receiverInfo.streamId}")
  }
  // Batch提交作业
  override def onBatchSubmitted(batchSubmitted : StreamingListenerBatchSubmitted): Unit = {
    log.warn("onBatchSubmitted")
    // 提交作业之前已经知道有多少数据
    // batchSubmitted.batchInfo.numRecords是此次batch的数据量
    log.warn(s"batchTime=${batchSubmitted.batchInfo.batchTime},numRecords=${batchSubmitted.batchInfo.numRecords}," +
      s"processingDelay=${batchSubmitted.batchInfo.processingDelay},processingEndTime=${batchSubmitted.batchInfo.processingEndTime}," +
      s"processingStartTime=${batchSubmitted.batchInfo.processingStartTime},schedulingDelay=${batchSubmitted.batchInfo.schedulingDelay}," +
      s"submissionTime=${batchSubmitted.batchInfo.submissionTime},totalDelay=${batchSubmitted.batchInfo.totalDelay}")
  }
  // Batch启动
  override def onBatchStarted(batchStarted : StreamingListenerBatchStarted): Unit = {
    log.warn("onBatchStarted")
    //batchStarted.batchInfo.schedulingDelay:从提交到正式启动batch的间隔时间
    log.warn(s"batchTime=${batchStarted.batchInfo.batchTime},numRecords=${batchStarted.batchInfo.numRecords}," +
      s"processingDelay=${batchStarted.batchInfo.processingDelay},processingEndTime=${batchStarted.batchInfo.processingEndTime}," +
      s"processingStartTime=${batchStarted.batchInfo.processingStartTime},schedulingDelay=${batchStarted.batchInfo.schedulingDelay}," +
      s"submissionTime=${batchStarted.batchInfo.submissionTime},totalDelay=${batchStarted.batchInfo.totalDelay}")
  }
  // Batch完成
  override def onBatchCompleted(batchCompleted : StreamingListenerBatchCompleted): Unit = {
    log.warn("onBatchCompleted")
    //batchCompleted.batchInfo.processingDelay:批量处理时间
    //batchCompleted.batchInfo.totalDelay:此次批处理从提交,到最后结束总耗时
    log.warn(s"batchTime=${batchCompleted.batchInfo.batchTime},numRecords=${batchCompleted.batchInfo.numRecords}," +
      s"processingDelay=${batchCompleted.batchInfo.processingDelay},processingEndTime=${batchCompleted.batchInfo.processingEndTime}," +
      s"processingStartTime=${batchCompleted.batchInfo.processingStartTime},schedulingDelay=${batchCompleted.batchInfo.schedulingDelay}," +
      s"submissionTime=${batchCompleted.batchInfo.submissionTime},totalDelay=${batchCompleted.batchInfo.totalDelay}")
  }
  // 输出操作开始
  override def onOutputOperationStarted(outputOperationStarted : StreamingListenerOutputOperationStarted): Unit = {
    log.warn("onOutputOperationStarted")
    //outputOperationStarted.outputOperationInfo.description:其实就是Stack的部分信息,可用于输出Action的定位
    //outputOperationStarted.outputOperationInfo.name:Action的函数名称
    log.warn(s"batchTime=${outputOperationStarted.outputOperationInfo.batchTime},description=${outputOperationStarted.outputOperationInfo.description}," +
      s"duration=${outputOperationStarted.outputOperationInfo.duration},endTime=${outputOperationStarted.outputOperationInfo.endTime}," +
      s"failureReason=${outputOperationStarted.outputOperationInfo.failureReason},id=${outputOperationStarted.outputOperationInfo.id}," +
      s"name=${outputOperationStarted.outputOperationInfo.name},startTime=${outputOperationStarted.outputOperationInfo.startTime}")
  }
  // 输出操作完成
  override def onOutputOperationCompleted(outputOperationCompleted : StreamingListenerOutputOperationCompleted): Unit = {
    log.warn("onOutputOperationCompleted")
    //outputOperationCompleted.outputOperationInfo.duration:Action的耗时
    //outputOperationCompleted.outputOperationInfo.failureReason:Action失败的原因。可以在该函数中处理Batch失败
    log.warn(s"batchTime=${outputOperationCompleted.outputOperationInfo.batchTime},description=${outputOperationCompleted.outputOperationInfo.description}," +
      s"duration=${outputOperationCompleted.outputOperationInfo.duration},endTime=${outputOperationCompleted.outputOperationInfo.endTime}," +
      s"failureReason=${outputOperationCompleted.outputOperationInfo.failureReason},id=${outputOperationCompleted.outputOperationInfo.id}," +
      s"name=${outputOperationCompleted.outputOperationInfo.name},startTime=${outputOperationCompleted.outputOperationInfo.startTime}")
  }
}
      

下面是添加StreamingListene的代码

val ssc = new StreamingContext(sparkConf, new Duration(batchDuration))
    ssc.addStreamingListener(new StreamingMonitor(ssc))
      

  

各个函数的调用顺序

onReceiverStarted->[接收到数据]->onBatchSubmitted->onBatchStarted->onOutputOperationStarted->onOutputOperationCompleted->onBatchCompleted->[接收到数据]->onBatchSubmitted->onBatchStarted->onOutputOperationStarted->onOutputOperationCompleted->onBatchCompleted->.......->onReceiverStopped

其中[接收到数据]是可选项,并不是每次都会接收到数据。

继续阅读