天天看點

spark streaming job 耗時監控

在寫spark streaming job時,有時候我們需要對job資訊進行監控,比如監控目前streaming job的每個batch的process time和delay time等,當然通過spark提供的管理背景(預設4040端口)可以檢視job的詳情,但是并不太友善,我們可以将資訊全部輸出到自定義的metrics裡,然後進一步統計。實作代碼如下:

JavaStreamingContext javaStreamingContext =
                new JavaStreamingContext(sparkConfig, batchInterval);
           
do xxx
           
javaStreamingContext.addStreamingListener(new JobListener());

        javaStreamingContext.start();
        try {
            javaStreamingContext.awaitTermination();
        } catch (InterruptedException e) {
            logger.error(e.getStackTrace().toString());
        }
           

如上述,添加一個JobListener即可,JobListener實作如下

private static class JobListener implements StreamingListener {

        @Override
        public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
            try {
               
                        batchCompleted.batchInfo().totalDelay().get().toString();
			batchCompleted.batchInfo().processingDelay().get().toString();
                	batchCompleted.batchInfo().schedulingDelay().get().toString();
                	batchCompleted.batchInfo().numRecords();
                	Map<Object, OutputOperationInfo> map = JavaConverters.mapAsJavaMapConverter(batchCompleted.batchInfo().outputOperationInfos()).asJava();
                for (OutputOperationInfo outputOperationInfo : map.values()) {
                    System.out.println(outputOperationInfo.name()),
		    Double.valueOf(outputOperationInfo.duration().get().toString());
                }

            } catch (Exception e) {
                logger.error("JobListener onBatchCompleted", e);
            }
        }

        @Override
        public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {
        }

        @Override
        public void onReceiverError(StreamingListenerReceiverError receiverError) {
        }

        @Override
        public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
        }

        @Override
        public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {
        }

        @Override
        public void onBatchStarted(StreamingListenerBatchStarted batchStarted) {
        }

        @Override
        public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted arg0) {
        }

        @Override
        public void onOutputOperationStarted(StreamingListenerOutputOperationStarted arg0) {
        }

    }
           

上面是在onBatchComplted的時候實作的,是針對batch級别的監控,當然還有更好的實作方式,那就是在onOutputOperationCompleted,這是針對每個batch裡的job output輸出監控

try {
                MetricsWriter metricsWriter = new MetricsWriter();
                metricsWriter.addRequest(arg0.outputOperationInfo().name(), Double.valueOf(arg0.outputOperationInfo().duration().get().toString()) / 1000);
                metricsWriter.send();
            }catch (Exception e){
                logger.error("JobListener onOutputOperationCompleted", e);
            }
           

注意,以上代碼需要進一步修改,隻是個思路,需要自己加入實作的metrics

繼續閱讀