天天看點

Flink 1.8.0中的狀态生存時間特性:如何自動清理應用程式的狀态

作者 | Fabian Hueske & Andrey Zagrebin

翻譯 | 唐雲

對于許多狀态流式計算程式來說,一個常見的需求是自動清理應用程式的狀态(state),以便有效地控制狀态大小,或者控制程式通路狀态的有效時間(例如受限于諸如GDPR等法律條規)。Apache Flink自1.6.0版本引入了狀态的生存時間(time-to-live,TTL)功能,使得應用程式的狀态清理和有效的狀态大小管理成為可能。

在本文中,我們将讨論引入狀态生存時間特性的動機并讨論其相關用例。此外,我們還将示範如何使用和配置該特性。同時,我們将會解釋Flink如何借用狀态生存時間特性在内部管理狀态,并對Flink 1.8.0中該功能引入的相關新特性進行一些展示。本文章最後對未來的改進和擴充作了展望。

狀态的暫時性

有兩個主要原因可以解釋為什麼狀态隻應該維持有限的時間。讓我們先設想一個Flink應用程式,它接收使用者登入事件流,并為每個使用者存儲上一次登入時的相關事件資訊和時間戳,以改善高頻通路使用者的體驗。

  • 控制狀态的大小。 狀态生存時間特性的主要使用場景,就是能夠有效地管理不斷增長的狀态大小。通常情況下,資料隻需要暫時儲存,例如使用者處在一次網絡連接配接會話中。當使用者通路事件結束時,我們實際上就沒有必要儲存該使用者的狀态,來減少無謂的狀态存儲空間占用。Flink 1.8.0引入的基于生存時間的背景狀态清理機制,使得我們能夠自動地對無用資料進行清理。此前,應用程式開發人員必須采取額外的操作并顯式地删除無用狀态以釋放存儲空間。這種手動清理過程不僅容易出錯,而且效率低下。以我們上述使用者登入的案例為例,因為這些不活躍使用者的相關資訊會被自動過期清理掉,我們就不再需要額外存儲上次登入的時間戳。
  • 符合(敏感)資料保護的要求。 随着資料隐私法規的發展(例如歐盟頒布的通用資料保護法規GDPR),遵守此類法規的相關要求,或将資料進行敏感處理已經成為許多應用程式的首要任務。此類使用場景的的一個典型案例,就需要僅在特定時間段内儲存資料并防止其後可以再次通路該資料。這對于為客戶提供短期服務的公司來說是一個常見的挑戰。狀态生存時間這一特性,就保證了應用程式僅可以在有限時間内進行通路,有助于遵守資料保護法規。

這兩個需求都可以通過狀态生存時間來解決,這個功能可以周期性地、持續地删除狀态中的鍵值,一旦它變得不必要或不重要,并且不再需要儲存在存儲中時。

對應用狀态的持續清理

Apache Flink的1.6.0版本引入了狀态生存時間特性。它使流處理應用程式的開發人員能夠配置算子的狀态,使其在定義的逾時(生存時間)後過期并被清除。在Flink 1.8.0中,該功能得到了進一步擴充,對RocksDB和堆記憶體狀态後端(

FsStateBackend

MemoryStateBackend

)的舊資料進行連續性的清理。

在Flink的

DataStream

API中,應用程式狀态是由狀态描述符(state descriptor)來定義的。狀态生存時間是通過将

StateTtlConfiguration

對象傳遞給狀态描述符來配置的。下面的Java示例示範了如何建立狀态生存時間的配置,并将其提供給狀态描述符,該狀态描述符将使用者的上次登入時間儲存為

Long

值:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.state.ValueStateDescriptor;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();

ValueStateDescriptor<Long> lastUserLogin =
    new ValueStateDescriptor<>("lastUserLogin", Long.class);

lastUserLogin.enableTimeToLive(ttlConfig);           

Flink提供了多個選項來配置狀态生存時間的行為:

  • 什麼時候重置生存時間? 預設情況下,當狀态被修改時,生存時間就會被更新。我們也可以在讀操作通路狀态時更新相關項的生存時間,但這樣要花費額外的寫操作來更新時間戳。
  • 已經過期的資料是否可以通路? 狀态生存時間機制使用的是惰性政策來清除過期狀态。這可能導緻應用程式會嘗試讀取過期但尚未删除的狀态。使用者可以配置對這樣的讀取請求是否傳回過期狀态。無論哪種情況,過期狀态都會在之後立即被删除。雖然傳回已經過期的狀态有利于資料可用性,但不傳回過期狀态更符合相關資料保護法規的要求。
  • 哪種時間語義被用于定義生存時間? 在Apache Flink 1.8.0中,使用者隻能根據處理時間(Processing Time)定義狀态生存時間。未來的Flink版本中計劃支援事件時間(Event Time)。

關于狀态生存時間的更多資訊,可以參考Flink

官方文檔

在實作上,狀态生存時間特性會額外存儲上一次相關狀态通路的時間戳。雖然這種方法增加了一些存儲開銷,但它允許Flink在通路狀态、建立檢查點、恢複或存儲清理過程時可以檢查過期狀态。

“取走垃圾資料”

在通路狀态對象時,Flink将檢查其時間戳,并在狀态過期時清除狀态(是否傳回過期狀态,則取決于配置的過期資料可見性)。由于這種通路時才删除的特性,除非被垃圾回收,否則那些永遠不被通路過期資料将仍然占用存儲空間。

那麼,在沒有顯示處理過期狀态的情況下,如何删除這些資料呢?通常,我們可以配置不同的政策進行背景删除。

保證完整快照中不包含過期資料

Flink 1.6.0已經支援在建立檢查點(checkpoint)或儲存點(savepoint)的完整快照時不包含過期狀态。需要注意的是,建立增量快照時并不支援剔除過期狀态。完整快照時的過期狀态剔除必須如下例所示進行顯示啟用:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    .cleanupFullSnapshot()
    .build();           

上述配置并不會影響本地狀态存儲的大小,但是整個作業的完整快照的大小将會減小。隻有當使用者從快照重新加載其狀态到本地時,才會清除使用者的本地狀态。

由于上述這些限制,在Flink 1.6.0中程式仍需要過期後主動删除狀态。為了改善使用者體驗,Flink1.8.0引入了兩種自主清理政策,分别針對兩種狀态後端類型:

堆記憶體狀态後端的增量清理

此方法隻适用于堆記憶體狀态後端(

FsStateBackend

MemoryStateBackend

)。其基本思路是在存儲後端的所有狀态條目上維護一個全局的惰性疊代器。某些事件(例如狀态通路)會觸發增量清理,而每次觸發增量清理時,疊代器都會向前周遊删除已周遊的過期資料。以下代碼示例展示了如何啟用增量清理:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    // check 10 keys for every state access
    .cleanupIncrementally(10, false)
    .build();           

如果啟用該功能,則每次狀态通路都會觸發清除。而每次清理時,都會檢查一定數量的狀态條目是否過期。其中有兩個調整參數。第一個定義了每次清理時要檢查的狀态條目數。第二個參數是一個标志位,用于表示是否在每條記錄處理(record processed)之後(而不僅僅是通路狀态,state accessed),都還額外觸發清除邏輯。

關于這種方法有兩個重要的注意事項:首先是增量清理所花費的時間會增加記錄處理的延遲。其次,如果沒有狀态被通路(state accessed)或者沒有記錄被處理(record processed),過期的狀态也将不會被删除。

RocksDB狀态後端利用背景壓縮來清理過期狀态

如果使用RocksDB狀态後端,則可以啟用另一種清理政策,該政策基于Flink定制的RocksDB壓縮過濾器(compaction filter)。RocksDB會定期運作異步的壓縮流程以合并資料并減少相關存儲的資料量,該定制的壓縮過濾器使用生存時間檢查狀态條目的過期時間戳,并丢棄所有過期值。

使用此功能的第一步,需要設定以下配置選項:

state.backend.rocksdb.ttl.compaction.filter.enabled

。一旦配置使用RocksDB狀态後端後,如以下代碼示例将會啟用壓縮清理政策:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    .cleanupInRocksdbCompactFilter()
    .build();           

需要注意的是啟用Flink的生存時間壓縮過濾機制後,會放緩RocksDB的壓縮速度。

使用定時器進行狀态清理

另一種手動清除狀态的方法是基于Flink的計時器,這也是社群評估的一個想法。使用這種方法,将為每個狀态通路注冊一個清除計時器。這種方法的清理更加精準,因為狀态一旦過期就會被立刻删除。但是由于計時器會與原始狀态一起存儲會消耗空間,開銷也更大一些。

未來展望

除了上面提到的基于計時器的清理政策之外,Flink社群還計劃進一步改進狀态生存時間特性。可能的改進包括為事件時間(event time)添加生存時間的支援(目前隻支援處理時間)和為可查詢狀态(queryable state)啟用狀态生存時間機制。

總結

狀态可通路時間的限制和應用程式狀态大小的控制,是狀态流處理領域的常見挑戰,Flink的1.8.0版本通過添加對過期狀态對象連續性背景清理的支援,顯著改進了狀态生存時間特性。新的清理機制可以不再需要手動實作狀态清理的工作,而且由于惰性清理的機制,執行效率也更高。總得來說,狀态生存時間友善使用者控制應用程式狀态的大小,使得使用者可以将精力集中在應用程式的核心邏輯開發上。

原文連結:

https://flink.apache.org/2019/05/19/state-ttl.html
Flink 1.8.0中的狀态生存時間特性:如何自動清理應用程式的狀态