1.Watermark简介
Watermark是flink为了处理eventTime窗口计算提出的一种机制,本质上也是一种时间戳.
2.Watermark 作用
watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。
我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。
3.如何利用Watermark
3.1 StreamExecutionEnvironment设置时间标识
// --------------------------------------------------------------------------------------------
// Time characteristic
// --------------------------------------------------------------------------------------------
/**
* Sets the time characteristic for all streams create from this environment, e.g., processing
* time, event time, or ingestion time.
*
* <p>If you set the characteristic to IngestionTime of EventTime this will set a default
* watermark update interval of 200 ms. If this is not applicable for your application
* you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
*
* @param characteristic The time characteristic.
*/
@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().setAutoWatermarkInterval(0);
} else {
getConfig().setAutoWatermarkInterval(200);
}
}
/**
* Gets the time characteristic.
*
* @see #setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)
*
* @return The time characteristic.
*/
@PublicEvolving
public TimeCharacteristic getStreamTimeCharacteristic() {
return timeCharacteristic;
}
其中,TimeCharacteristic是个枚举类,定义了系统的基准时间
/**
* The time characteristic defines how the system determines time for time-dependent
* order and operations that depend on time (such as time windows).
*/
@PublicEvolving
public enum TimeCharacteristic {
/**
* Processing time for operators means that the operator uses the system clock of the machine
* to determine the current time of the data stream. Processing-time windows trigger based
* on wall-clock time and include whatever elements happen to have arrived at the operator at
* that point in time.
*
* <p>Using processing time for window operations results in general in quite non-deterministic
* results, because the contents of the windows depends on the speed in which elements arrive.
* It is, however, the cheapest method of forming windows and the method that introduces the
* least latency.
*/
ProcessingTime,
/**
* Ingestion time means that the time of each individual element in the stream is determined
* when the element enters the Flink streaming data flow. Operations like windows group the
* elements based on that time, meaning that processing speed within the streaming dataflow
* does not affect windowing, but only the speed at which sources receive elements.
*
* <p>Ingestion time is often a good compromise between processing time and event time.
* It does not need any special manual form of watermark generation, and events are typically
* not too much out-or-order when they arrive at operators; in fact, out-of-orderness can
* only be introduced by streaming shuffles or split/join/union operations. The fact that
* elements are not very much out-of-order means that the latency increase is moderate,
* compared to event
* time.
*/
IngestionTime,
/**
* Event time means that the time of each individual element in the stream (also called event)
* is determined by the event's individual custom timestamp. These timestamps either exist in
* the elements from before they entered the Flink streaming dataflow, or are user-assigned at
* the sources. The big implication of this is that it allows for elements to arrive in the
* sources and in all operators out of order, meaning that elements with earlier timestamps may
* arrive after elements with later timestamps.
*
* <p>Operators that window or order data with respect to event time must buffer data until they
* can be sure that all timestamps for a certain time interval have been received. This is
* handled by the so called "time watermarks".
*
* <p>Operations based on event time are very predictable - the result of windowing operations
* is typically identical no matter when the window is executed and how fast the streams
* operate. At the same time, the buffering and tracking of event time is also costlier than
* operating with processing time, and typically also introduces more latency. The amount of
* extra cost depends mostly on how much out of order the elements arrive, i.e., how long the
* time span between the arrival of early and late elements is. With respect to the
* "time watermarks", this means that the cost typically depends on how early or late the
* watermarks can be generated for their timestamp.
*
* <p>In relation to {@link #IngestionTime}, the event time is similar, but refers the the
* event's original time, rather than the time assigned at the data source. Practically, that
* means that event time has generally more meaning, but also that it takes longer to determine
* that all elements for a certain time have arrived.
*/
EventTime
}
三个时间分别代表:事件生成时间EventTime,事件接入时间IngestionTime,事件处理时间ProcessingTime
3.2 Watermark的产生
/**
* A Watermark tells operators that no elements with a timestamp older or equal
* to the watermark timestamp should arrive at the operator. Watermarks are emitted at the
* sources and propagate through the operators of the topology. Operators must themselves emit
* watermarks to downstream operators using
* {@link org.apache.flink.streaming.api.operators.Output#emitWatermark(Watermark)}. Operators that
* do not internally buffer elements can always forward the watermark that they receive. Operators
* that buffer elements, such as window operators, must forward a watermark after emission of
* elements that is triggered by the arriving watermark.
*
* <p>In some cases a watermark is only a heuristic and operators should be able to deal with
* late elements. They can either discard those or update the result and emit updates/retractions
* to downstream operations.
*
* <p>When a source closes it will emit a final watermark with timestamp {@code Long.MAX_VALUE}.
* When an operator receives this it will know that no more input will be arriving in the future.
*/
其中,Output定义如下:
/**
* A {@link org.apache.flink.streaming.api.operators.StreamOperator} is supplied with an object
* of this interface that can be used to emit elements and other messages, such as barriers
* and watermarks, from an operator.
*
* @param <T> The type of the elements that can be emitted.
*/
Watermark的产生方法
/**
* Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream
* operators.
*
* <p>A watermark specifies that no element with a timestamp lower or equal to the watermark
* timestamp will be emitted in the future.
*/
void emitWatermark(Watermark mark);
3.3 Operator 处理Watermark
OneInputStreamOperator#processElement
TwoInputStreamOperator#processElement1
TwoInputStreamOperator#processElement2
operator的关系类图
以WindowOperator为例
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
MergingWindowSet<W> mergingWindows = getMergingWindowSet();
for (W window: elementWindows) {
// adding the new window might result in a merge, in that case the actualWindow
// is the merged window and we work with that. If we don't merge then
// actualWindow == window
W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
@Override
public void merge(W mergeResult,
Collection<W> mergedWindows, W stateWindowResult,
Collection<W> mergedStateWindows) throws Exception {
if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
throw new UnsupportedOperationException("The end timestamp of an " +
"event-time window cannot become earlier than the current watermark " +
"by merging. Current watermark: " + internalTimerService.currentWatermark() +
" window: " + mergeResult);
} else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
throw new UnsupportedOperationException("The end timestamp of a " +
"processing-time window cannot become earlier than the current processing time " +
"by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
" window: " + mergeResult);
}
triggerContext.key = key;
triggerContext.window = mergeResult;
triggerContext.onMerge(mergedWindows);
for (W m: mergedWindows) {
triggerContext.window = m;
triggerContext.clear();
deleteCleanupTimer(m);
}
// merge the merged state windows into the newly resulting state window
windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
}
});
// drop if the window is already late
if (isWindowLate(actualWindow)) {
mergingWindows.retireWindow(actualWindow);
continue;
}
isSkippedElement = false;
W stateWindow = mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
}
windowState.setCurrentNamespace(stateWindow);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = actualWindow;
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(actualWindow, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(actualWindow);
}
// need to make sure to update the merging state in state
mergingWindows.persist();
} else {
for (W window: elementWindows) {
// drop if the window is already late
if (isWindowLate(window)) {
continue;
}
isSkippedElement = false;
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = window;
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(window);
}
}
// side output input event if
// element not handled by any window
// late arriving tag has been set
// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
if (isSkippedElement && isElementLate(element)) {
if (lateDataOutputTag != null){
sideOutput(element);
} else {
this.numLateRecordsDropped.inc();
}
}
}
总结:
流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。此时就是watermark发挥作用了,它表示当达到watermark到达之后,在watermark之前的数据已经全部达到(即使后面还有延迟的数据)
参考资料
【1】https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html
【2】https://blog.csdn.net/lmalds/article/details/52704170
【3】https://www.jianshu.com/p/7d524ef8143c