天天看點

Flink Sink 接收資料的順序(Window發送資料順序)

Flink Sink 接收資料的順序(Window發送資料順序)

概述

  • InternalTimerServiceImpl.processingTimeTimersQueue存儲着同一個Window中所有Key,取第一個key,調用WindowOperator.onProcessingTime進行處理,并發送給Sink
  • InternalTimerServiceImpl.processingTimeTimersQueue key處理的順序是,先處理第一個,然後依次把最後一個元素放到第一個元素進行處理
  • Key,處理的順序,如 1 2 3 5 4,就會變成
    1
     4
     5
     3
     2           
  • 源碼: https://github.com/opensourceteams/fink-maven-scala-2

輸入資料

1 2 1 3 2 5 4           

源碼分析

RecordWriter.emit

  • 當WordCount中的資料經過Operator(Source,FlatMap,Map) 處理後,通過RecordWriter.emit()函數發射資料
  • 此時發這樣的資料格式發送
    WordWithCount(1,1)
    WordWithCount(2,1)
    WordWithCount(1,1)
    WordWithCount(3,1)
    WordWithCount(2,1)
    WordWithCount(5,1)
    WordWithCount(4,1)   
               
  • WindowOperator.processElement會接收并處理
private void emit(T record, int[] targetChannels) throws IOException, InterruptedException {
        serializer.serializeRecord(record);

        boolean pruneAfterCopying = false;
        for (int channel : targetChannels) {
            if (copyFromSerializerToTargetChannel(channel)) {
                pruneAfterCopying = true;
            }
        }

        // Make sure we don't hold onto the large intermediate serialization buffer for too long
        if (pruneAfterCopying) {
            serializer.prune();
        }
    }

           

WindowOperator.processElement(StreamRecord element)

  • WindowOperator.processElement,給每一個WordWithCount(1,1) 這樣的元素配置設定window,也就是确認每一個元素屬于哪一個視窗,因為需要對同一個視窗的相同key進行聚合操作
    final Collection<W> elementWindows = windowAssigner.assignWindows(
                element.getValue(), element.getTimestamp(), windowAssignerContext);           
  • 把目前元素增加到state中儲存,add函數中會對相同key進行聚合操作(reduce),對同一個window中相同key進行求和就是在這個方法中進行的
    windowState.add(element.getValue());           
  • triggerContext.onElement(element),對目前元素設定trigger,也就是目前元素的window在哪個時間點觸發(結束的時間點),

    把目前元素的key,增加到InternalTimerServiceImpl.processingTimeTimersQueue中,每一條資料會加一次,加完後會去重,相當于Set,對相同Key的處理,

後面發送給Sink的資料,就是周遊這個processingTimeTimersQueue中的資料,當然,每次發送第一個元素,發送後,會把最後一個元素放到第一個元素

TriggerResult triggerResult = triggerContext.onElement(element);           
public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

        //if element is handled by none of assigned elementWindows
        boolean isSkippedElement = true;

        final K key = this.<K>getKeyedStateBackend().getCurrentKey();

        if (windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet<W> mergingWindows = getMergingWindowSet();

            for (W window: elementWindows) {

                // adding the new window might result in a merge, in that case the actualWindow
                // is the merged window and we work with that. If we don't merge then
                // actualWindow == window
                W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
                    @Override
                    public void merge(W mergeResult,
                            Collection<W> mergedWindows, W stateWindowResult,
                            Collection<W> mergedStateWindows) throws Exception {

                        if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
                            throw new UnsupportedOperationException("The end timestamp of an " +
                                    "event-time window cannot become earlier than the current watermark " +
                                    "by merging. Current watermark: " + internalTimerService.currentWatermark() +
                                    " window: " + mergeResult);
                        } else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
                            throw new UnsupportedOperationException("The end timestamp of a " +
                                    "processing-time window cannot become earlier than the current processing time " +
                                    "by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
                                    " window: " + mergeResult);
                        }

                        triggerContext.key = key;
                        triggerContext.window = mergeResult;

                        triggerContext.onMerge(mergedWindows);

                        for (W m: mergedWindows) {
                            triggerContext.window = m;
                            triggerContext.clear();
                            deleteCleanupTimer(m);
                        }

                        // merge the merged state windows into the newly resulting state window
                        windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
                    }
                });

                // drop if the window is already late
                if (isWindowLate(actualWindow)) {
                    mergingWindows.retireWindow(actualWindow);
                    continue;
                }
                isSkippedElement = false;

                W stateWindow = mergingWindows.getStateWindow(actualWindow);
                if (stateWindow == null) {
                    throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
                }

                windowState.setCurrentNamespace(stateWindow);
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = actualWindow;

                TriggerResult triggerResult = triggerContext.onElement(element);

                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(actualWindow, contents);
                }

                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(actualWindow);
            }

            // need to make sure to update the merging state in state
            mergingWindows.persist();
        } else {
            for (W window: elementWindows) {

                // drop if the window is already late
                if (isWindowLate(window)) {
                    continue;
                }
                isSkippedElement = false;

                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = window;

                TriggerResult triggerResult = triggerContext.onElement(element);

                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }

                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(window);
            }
        }

        // side output input event if
        // element not handled by any window
        // late arriving tag has been set
        // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
        if (isSkippedElement && isElementLate(element)) {
            if (lateDataOutputTag != null){
                sideOutput(element);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

           

InternalTimerServiceImpl.onProcessingTime

  • processingTimeTimersQueue(HeapPriorityQueueSet) 該對象中存儲了所有的key,這些key是去重後,按處理順序排序
  • processingTimeTimersQueue.peek() 取出第一條資料進行處理
  • processingTimeTimersQueue.poll();會移除第一條資料,并且,拿最後一條資料,放第1一個元素,導緻,所有元素的處理順序是,先處理第一個元素,然後,把最後一個元素放第一個,

    最後一個就置為空,再循環處理所有資料,相當于處理完第一個元素,處後從最後一個元素開始處理,一直處理到完成,舉例

1 2 1 3 2 5 4
存為 1 2 3 5 4 
順序就變為
 1
 4
 5
 3
 2           
  • keyContext.setCurrentKey(timer.getKey());//設定目前的key,目前需要處理的
  • triggerTarget.onProcessingTime(timer);// 調用 WindowOperator.onProcessingTime(timer)處理
queue = {HeapPriorityQueueElement[129]@8184} 
 1 = {TimerHeapInternalTimer@12441} "Timer{timestamp=1551505439999, key=(1), namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
 2 = {TimerHeapInternalTimer@12442} "Timer{timestamp=1551505439999, key=(2), namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
 3 = {TimerHeapInternalTimer@12443} "Timer{timestamp=1551505439999, key=(3), namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
 5 = {TimerHeapInternalTimer@12443} "Timer{timestamp=1551505439999, key=(3), namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
 4 = {TimerHeapInternalTimer@12443} "Timer{timestamp=1551505439999, key=(3), namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
           
  • 調用 WindowOperator.onProcessingTime(timer)處理目前key;
public void onProcessingTime(long time) throws Exception {
        // null out the timer in case the Triggerable calls registerProcessingTimeTimer()
        // inside the callback.
        nextTimer = null;

        InternalTimer<K, N> timer;

        while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            processingTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onProcessingTime(timer);
        }

        if (timer != null && nextTimer == null) {
            nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
        }
    }           

WindowOperator.onProcessingTime

  • triggerResult.isFire()// 目前元素對應的window已經可以發射了,即過了結束時間
  • windowState.get() //取出目前key對應的(key,value)此時已經是相同key聚合後的值
  • emitWindowContents(triggerContext.window, contents);//發送給Sink進行處理
public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
        triggerContext.key = timer.getKey();
        triggerContext.window = timer.getNamespace();

        MergingWindowSet<W> mergingWindows;

        if (windowAssigner instanceof MergingWindowAssigner) {
            mergingWindows = getMergingWindowSet();
            W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
            if (stateWindow == null) {
                // Timer firing for non-existent window, this can only happen if a
                // trigger did not clean up timers. We have already cleared the merging
                // window and therefore the Trigger state, however, so nothing to do.
                return;
            } else {
                windowState.setCurrentNamespace(stateWindow);
            }
        } else {
            windowState.setCurrentNamespace(triggerContext.window);
            mergingWindows = null;
        }

        TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());

        if (triggerResult.isFire()) {
            ACC contents = windowState.get();
            if (contents != null) {
                emitWindowContents(triggerContext.window, contents);
            }
        }

        if (triggerResult.isPurge()) {
            windowState.clear();
        }

        if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
            clearAllState(triggerContext.window, windowState, mergingWindows);
        }

        if (mergingWindows != null) {
            // need to make sure to update the merging state in state
            mergingWindows.persist();
        }
    }           

繼續閱讀