![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicWZwpmL0QTMldTMlBDOlRDO3ETZ5EGOiBjMjN2YlJmZwYzY2YjM1UWZ4MTYk9CXt92Yu4GZjlGbh5SZslmZxl3Lc9CX6MHc0RHaiojIsJye.jpeg)
概述
- InternalTimerServiceImpl.processingTimeTimersQueue存儲着同一個Window中所有Key,取第一個key,調用WindowOperator.onProcessingTime進行處理,并發送給Sink
- InternalTimerServiceImpl.processingTimeTimersQueue key處理的順序是,先處理第一個,然後依次把最後一個元素放到第一個元素進行處理
- Key,處理的順序,如 1 2 3 5 4,就會變成
1 4 5 3 2
- 源碼: https://github.com/opensourceteams/fink-maven-scala-2
輸入資料
1 2 1 3 2 5 4
源碼分析
RecordWriter.emit
- 當WordCount中的資料經過Operator(Source,FlatMap,Map) 處理後,通過RecordWriter.emit()函數發射資料
- 此時發這樣的資料格式發送
WordWithCount(1,1) WordWithCount(2,1) WordWithCount(1,1) WordWithCount(3,1) WordWithCount(2,1) WordWithCount(5,1) WordWithCount(4,1)
- WindowOperator.processElement會接收并處理
private void emit(T record, int[] targetChannels) throws IOException, InterruptedException {
serializer.serializeRecord(record);
boolean pruneAfterCopying = false;
for (int channel : targetChannels) {
if (copyFromSerializerToTargetChannel(channel)) {
pruneAfterCopying = true;
}
}
// Make sure we don't hold onto the large intermediate serialization buffer for too long
if (pruneAfterCopying) {
serializer.prune();
}
}
WindowOperator.processElement(StreamRecord element)
- WindowOperator.processElement,給每一個WordWithCount(1,1) 這樣的元素配置設定window,也就是确認每一個元素屬于哪一個視窗,因為需要對同一個視窗的相同key進行聚合操作
final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext);
- 把目前元素增加到state中儲存,add函數中會對相同key進行聚合操作(reduce),對同一個window中相同key進行求和就是在這個方法中進行的
windowState.add(element.getValue());
-
triggerContext.onElement(element),對目前元素設定trigger,也就是目前元素的window在哪個時間點觸發(結束的時間點),
把目前元素的key,增加到InternalTimerServiceImpl.processingTimeTimersQueue中,每一條資料會加一次,加完後會去重,相當于Set,對相同Key的處理,
後面發送給Sink的資料,就是周遊這個processingTimeTimersQueue中的資料,當然,每次發送第一個元素,發送後,會把最後一個元素放到第一個元素
TriggerResult triggerResult = triggerContext.onElement(element);
public void processElement(StreamRecord<IN> element) throws Exception {
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
MergingWindowSet<W> mergingWindows = getMergingWindowSet();
for (W window: elementWindows) {
// adding the new window might result in a merge, in that case the actualWindow
// is the merged window and we work with that. If we don't merge then
// actualWindow == window
W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
@Override
public void merge(W mergeResult,
Collection<W> mergedWindows, W stateWindowResult,
Collection<W> mergedStateWindows) throws Exception {
if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
throw new UnsupportedOperationException("The end timestamp of an " +
"event-time window cannot become earlier than the current watermark " +
"by merging. Current watermark: " + internalTimerService.currentWatermark() +
" window: " + mergeResult);
} else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
throw new UnsupportedOperationException("The end timestamp of a " +
"processing-time window cannot become earlier than the current processing time " +
"by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
" window: " + mergeResult);
}
triggerContext.key = key;
triggerContext.window = mergeResult;
triggerContext.onMerge(mergedWindows);
for (W m: mergedWindows) {
triggerContext.window = m;
triggerContext.clear();
deleteCleanupTimer(m);
}
// merge the merged state windows into the newly resulting state window
windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
}
});
// drop if the window is already late
if (isWindowLate(actualWindow)) {
mergingWindows.retireWindow(actualWindow);
continue;
}
isSkippedElement = false;
W stateWindow = mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
}
windowState.setCurrentNamespace(stateWindow);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = actualWindow;
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(actualWindow, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(actualWindow);
}
// need to make sure to update the merging state in state
mergingWindows.persist();
} else {
for (W window: elementWindows) {
// drop if the window is already late
if (isWindowLate(window)) {
continue;
}
isSkippedElement = false;
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = window;
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(window);
}
}
// side output input event if
// element not handled by any window
// late arriving tag has been set
// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
if (isSkippedElement && isElementLate(element)) {
if (lateDataOutputTag != null){
sideOutput(element);
} else {
this.numLateRecordsDropped.inc();
}
}
}
InternalTimerServiceImpl.onProcessingTime
- processingTimeTimersQueue(HeapPriorityQueueSet) 該對象中存儲了所有的key,這些key是去重後,按處理順序排序
- processingTimeTimersQueue.peek() 取出第一條資料進行處理
-
processingTimeTimersQueue.poll();會移除第一條資料,并且,拿最後一條資料,放第1一個元素,導緻,所有元素的處理順序是,先處理第一個元素,然後,把最後一個元素放第一個,
最後一個就置為空,再循環處理所有資料,相當于處理完第一個元素,處後從最後一個元素開始處理,一直處理到完成,舉例
1 2 1 3 2 5 4
存為 1 2 3 5 4
順序就變為
1
4
5
3
2
- keyContext.setCurrentKey(timer.getKey());//設定目前的key,目前需要處理的
- triggerTarget.onProcessingTime(timer);// 調用 WindowOperator.onProcessingTime(timer)處理
queue = {HeapPriorityQueueElement[129]@8184}
1 = {TimerHeapInternalTimer@12441} "Timer{timestamp=1551505439999, key=(1), namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
2 = {TimerHeapInternalTimer@12442} "Timer{timestamp=1551505439999, key=(2), namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
3 = {TimerHeapInternalTimer@12443} "Timer{timestamp=1551505439999, key=(3), namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
5 = {TimerHeapInternalTimer@12443} "Timer{timestamp=1551505439999, key=(3), namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
4 = {TimerHeapInternalTimer@12443} "Timer{timestamp=1551505439999, key=(3), namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
- 調用 WindowOperator.onProcessingTime(timer)處理目前key;
public void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
InternalTimer<K, N> timer;
while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onProcessingTime(timer);
}
if (timer != null && nextTimer == null) {
nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
}
}
WindowOperator.onProcessingTime
- triggerResult.isFire()// 目前元素對應的window已經可以發射了,即過了結束時間
- windowState.get() //取出目前key對應的(key,value)此時已經是相同key聚合後的值
- emitWindowContents(triggerContext.window, contents);//發送給Sink進行處理
public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();
MergingWindowSet<W> mergingWindows;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
// window and therefore the Trigger state, however, so nothing to do.
return;
} else {
windowState.setCurrentNamespace(stateWindow);
}
} else {
windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;
}
TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents != null) {
emitWindowContents(triggerContext.window, contents);
}
}
if (triggerResult.isPurge()) {
windowState.clear();
}
if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
clearAllState(triggerContext.window, windowState, mergingWindows);
}
if (mergingWindows != null) {
// need to make sure to update the merging state in state
mergingWindows.persist();
}
}