Trident State 譯文
Trident針對狀态化的資料源的讀寫進行了一流的分裝。State可以包含在拓撲中-例如,儲存在記憶體中,有HDFS提供備份-也可以儲存在一個外部的資料庫中,像Memcached和Cassandra。針對以上的所有情況,Trident的API都是一樣的。
為了保證state在更新的過程中遇到重試或者失敗處理時任然能夠具有幂等性,storm采取了必要的容錯。也就是說,storm能夠做到每一條消息僅且僅被處理一次。
在進行state更新操作的時候,可以選擇不同等級的容錯方式;在看這些容錯方式之前,讓我們來用一個例子說明如何保證僅且僅被處理一次的語意。假設你正在某個流中進行累加的聚合操作,并且準備把聚合的結果儲存在資料庫中。 現在你在資料庫中儲存了一個值來表示累加的結果,每處理一個tuple你就對資料庫中的值進行一次累加操作。
當失敗處理發生的時候,tuples就會被重放。這就給state的更新操作(還有任何會帶來副作用的操作)帶來了問題--你将無法确定你是否已經基于這個被重發的tuple對state成功地進行了更新操作。也許你還從來沒有處理過這個tuple,在這種情況下你就需要對資料庫中的值進行一次累加操作。也許你已經成功處理過這個tuple并且對資料庫中的值進行過了一次累加操作,但是這個tuple在你更新state之後的某個環節出錯了;在這種情況下,你在接收到這個tuple的時候就不應該對資料庫中的值進行累加操作了。也可能這個tuple曾經出現過,但是在對資料庫中的值進行累加的時候出錯了,在這種情況下你需要對資料庫中的值進行累加操作。
僅僅在資料庫中儲存累加的值,你永遠無法确定這個tuple是否已經被處理過了。是以你需要更多的資訊來幫助你做出正确的決定。Trident提供了一下的語義來幫助使用者獲得僅且僅被處理一次的語義。
1.所有的tuple都是一小批一小批的發送的(以batch的方式發送)。
2.每一個批量的tuple都會被賦予一個唯一的"transaction id" (txid);加入該批tuple被重播,那麼該批tuple仍然保持相同的txid。
3.State的更新在各個批次的tuple之間是有序的,也就是說,隻有第2批成功更新以後,第3批才會執行對state的更新操作。
有了這些保障,你自己的state就能夠檢測到某一批tuple是否被處理過,并選擇正确的方式來更新state。你到底要采取什麼樣的方式來更新state依賴于輸入的spout也就是每一個批量的tuple所提供的一緻性語義。Storm提供三種容錯級别的soput:"non-transactional"(非事務型), "transactional"(事務型), 和 "opaque transactional"(透明事務型)。同樣的storm也提供了三種容錯級别的state:"non-transactional"(非事務型), "transactional"(事務型), 和 "opaque transactional"(透明事務型)。讓我們來看看每一種事務類型的spout,以及通過每種spout你所能獲得的容錯方式。
Transactional spouts
記住,Trident總是一小批一小批的處理tuple,并且每一個批次有一個唯一的事務ID。Spout的特性有其鎖提供的保障措施決定;事務型的spout具有一下特性:
1. 一個txid所對應的batch永遠是相同的。同一個txid的重放的batch永遠和之前該txid所對應的batch相同。
2. 不同batch中的tuple之間不會存在交集(一個tuple不是屬于這個batch,就是屬于另一個batch,永遠不能同時屬于兩個以上的batch)。
3. 每一個tuple都一定會在一個batch中被發送(沒有任何一個tuple被遺漏)。
這是一種很容易了解的spout類型,一個流被劃分成固定的批次,并且永遠不會改變。Storm提供了一個針對kafka的事務型spout。
你也許會問:為什麼我們不總是使用transactional spout?這很容易了解。一個原因是并不是所有的地方都需要容錯的。舉例來說,TransactionalTridentKafkaSpout 工作的方式是給定一個txid的batch所包含的一個屬于一個topic的來自于所有Kafka partition的tuple序列。一旦這個batch被發出,在任何時候如果這個batch被重新發出時,它必須包含原來所有的tuple以滿足 transactional spout的語義。現在我們假定一個batch被TransactionalTridentKafkaSpout所發出,這個batch沒有被成功處理,并且同時kafka的一個節點也down掉了。你就無法像之前一樣重播一個完全一樣的batch(因為kakfa的節點down掉,該topic的一部分partition可能會無法使用),整個處理會被中斷。
這也就是"opaque transactional" spouts(不透明事務spout)存在的原因- 他們對于丢失源節點這種情況是容錯的,仍然能夠幫你達到有且隻有一次處理的語義。後面會對這種spout有所介紹。
(當然,在Kafka開啟replication功能時,transactional spout也是可以做到容錯的;現在的kafka已經完全支援了,是以,上文中所說的當一個節點挂掉以後TransactionalTridentKafkaSpout無法正常工作的情況也就不存在了,也正是因為這樣,大部分時間都選擇使用了TransactionalTridentKafkaSpout,個人感覺在使用kafka的時候"opaque transactional" spouts确實沒有存在的意義)
在讨論 "opaque transactional" spouts之前,讓我們先來看看你該如何為transactional spout設計一個具有僅且僅處理一次的state。這個state的類型被稱為"transactional state" ,它利用任何txid都永遠對應與相同一個批次的tuple的特性。
假設你的拓撲是用來統計單詞個數的,并且你将要把統計結果儲存在一個key-value資料庫中。Key肯定就是對應的單詞了,值當然就是統計結果。你已經看到隻是存儲一個數量是不足以知道你是否已經處理過一個batch的。是以,你需要将txid和統計結果一起儲存在值中。那麼,當你需要更新統計結果的時候,你隻需要比較一下資料庫中的txid和目前batch的txid是否相同;如果相同,你就直接跳過更新操作--因為有強順序的保障,你可以肯定資料庫中的值已經包含了目前batch。如果不相同,你就修改統計結果。這個邏輯之是以能說的通是因為batch的txid永遠不會改變,并且batch之間有序地對state進行更新操作。
用一個例子來說明這個邏輯為什麼行得通,假如你發送了一個txid=3的batch,該batch中包含一下的tuple:
[“man”]
[“man”]
[“dog”]
假設現在資料庫中儲存這如下的key-value資料:
man => [count=3,txid=1]
dog => [count=4,txid=3]
apple =>[count=10,txid=2]
和man相關聯的txid是1;由于目前的batch的txid是3,那麼你就可以肯定這批tuple中man 的值還沒有累加到資料庫中。是以你可以給man的count累加2,并且更新txid為3。然而,dog對應的txid在資料庫中和目前batch中 一樣,是以你可以肯定對于dog來說目前batch中的值已經在資料庫中增加過了。那麼就選擇跳過更新。在該batch更新後,資料庫中的資料如下所示:
man => [count=5,txid=3]
dog => [count=4,txid=3]
apple =>[count=10,txid=2]
接下來我們一起再來看看 opaque transactional spout以及怎樣去為這種spout設計相應的state。
Opaque transactional spouts
opaque transactional spout并不能保證每一個txid永遠對應一個相同的batch,opaque transactional spout擁有如下特性:
1. 每一個tuple都隻會在一個batch中執行通過。也就是說,一個tuple在某一個batch處理失敗了,該tuple可能在之後的另一個新的batch中處理成功。
OpaqueTridentKafkaSpout就是一個擁有該特性的spout,該spout允許kafka節點挂掉。每當OpaqueTridentKafkaSpout要發送一個新的batch的時候,它将會從上一個batch所成功發送的tuple的結束點開始發送,這就保證了沒有tuple會被遺漏掉,也保證了一個tuple不會被多個batch成功處理。
在使用opaque transactional spouts的時候,再使用和transactional spout相同的處理方式:判斷資料庫中存放的txid和目前txid去做對比已經不好用了。這是因為在state的更新過程之間,batch可能已經變了。
你能做的就是在資料庫中儲存更多的狀态;除了儲存值和txid以外,你還需要儲存更新前的值(previous value)。讓我們還是用上面的例子來說明這個邏輯。假定你目前batch中的對應count是“2”, 并且我們需要進行一次狀态更新。而目前資料庫中存儲的資訊如下:
{
value = 4,
prevValue = 1,
txid = 2
}
假設目前的txid為3,和資料庫中的txid(2)不同。在這種情況下,你把“preValue”設定為“value”,然後将value增加2,并更新txid為3。操作過後的資料庫内容變成了下面的樣子:
{
value = 6,
prevValue = 4,
txid = 3
}
再假設目前的txid為2,和資料庫中的txid(2)相同。這時你可以确定資料庫中的“value”被之前擁有相同txid的batch更新過,但是之前的batch和現在的batch内容可能不同了。是以你要做的是讓“value”的值等于“preValue”加2,操作過後的資料庫内容變成了下面的樣子:
{
value = 3,
prevValue = 1,
txid = 2
}
--------------------------------------------------------------------------------------------------------------------------
注:這裡了解起來可能有些晦澀,舉個例子吧。
假設一個batch的大小為3,有下面這麼多tuple要進行累加:
[dog] [dog] [man] [man] [man]
假設資料庫中現在的資訊為:
dog =>{value=2,prevValue=1,txid=1}
man =>{value=3,prevValue=1,txid=1}
然後發送一個txid為2的batch {[dog] [dog] [man]}
然後進行儲存操作,
man 成功儲存,但是dog儲存的時候發生了錯誤,是以資料庫中的資訊變成了
dog =>{value=2,prevValue=1,txid=1}
man =>{value=4,prevValue=3,txid=2}
那麼失敗了,就會有batch的重發,恰好這是負責發送第一個 [dog]的kafka節點壞掉了,batch無法獲得第一個[dog]了,那麼就隻能從第二個dog開始發了,是以發送的batch的txid依然為2,内容為{[dog] [man] [man]}
到這裡,dog 的兩個txid不同,更新;但是man txid相同了,是以用prevValue+2來更新value;從這裡應該可以看出,為什麼是這樣做了。
更新後的結果如下:
dog =>{value=3,prevValue=2,txid=2}
man =>{value=5,prevValue=3,txid=2}
---------------------------------------------------------------------------------------------------------------------------------
因為Trident保證了batch之間的強順序性,是以這種方法是有效的。一旦Trident去處理一個新的batch,它就不會重新回到之前的任何一個batch。并且由于opaque transactional spout確定在各個batch之間是沒有共同成員的,每個tuple隻會在一個batch中被成功處理,你可以安全的在之前的值上進心更新。
Non-transactional spouts
Non-transactional spout(非事務spout)不提供任何的保障。是以在tuple處理失敗後不進行重發的情況下,一個tuple可能是最多被處理一次的。同時他也可能會是至少處理一次的,如果tuple在不同的batch中被多次成功處理的時候。無論怎樣,這種spout是不可能實作有且隻有一次被成功處理的語義的。
Summary of spout and state types
這個圖展示了哪些spout和state的組合能夠實作有且隻有一次被成功處理的語義:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiclRnblN0LclHdpZXYyd2LcBzNvwVZ2x2bzNXak9CX90TQNNkRrFlQKBTSvwFbslmZvwFMwQzLcVmepNHdu9mZvwFVywUNMZTY18CX052bm9CX90zdNd3a61EeZpWT4FEVkZXUYpVd1kmYr50MZV3YyI2cKJDT29GRjBjUIF2LcRHelR3LcJzLctmch1mclRXY39DN0gzMyMDM2EDNxYDM3EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
Opaque transactional state有着最為強大的容錯性。但是這是以存儲更多的資訊作為代價的。Transactional states 需要存儲較少的狀态資訊,但是僅能和 transactional spouts協同工作. 最後, non-transactional state所需要存儲的資訊最少,但是卻不能實作有且隻有一次被成功處理的語義。
State和Spout類型的選擇其實是一種在容錯性和存儲消耗之間的權衡,你的應用的需要會決定那種組合更适合你。
State APIs
在前面你已經看到了一些用來實作僅且僅執行一次語義的複雜方法,有一個關于Trident的好消息就是,Trident把所有容錯的邏輯都在state内部實作了。那麼作為一個使用者,你就從比較txid,儲存多餘的值到資料庫中,或者任何像它們兩個那樣的苦差事中脫離了出來。你隻需要像下面這樣寫代碼就可以了:
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(MemcachedState.opaque(serverLocations),new Count(),new Fields("count")) //重點就是這句了,這裡其實使用了mapState,用來做批量的聚合結果的保//存
.parallelismHint(6);
所有管理opaque transactional state的必要邏輯都在MemcachedState.opaque方法内部實作了。另外,更新操作是批量進行的,以減少對資料庫的壓力。
基礎的state接口隻有兩個方法:
public interface State {
void beginCommit(Long txid); // can be null for things like partitionPersist occurring off //a DRPC stream(放生在DRPC流中的partitionPersist操作中,txid可能為空)
void commit(Long txid);
}
在這個接口所提供的兩個方法中,你可以知道什麼時候開了對state的更新操作,什麼時候完成了對state的更新操作,在每個方法中你都能夠獲得txid。Trident對你的state是如何工作的沒有做出任何的假設(也就是說,你要自己寫更新和查詢方法)。
加入你自己有一套資料庫,并且希望通過Trident來在其中更新、查詢使用者的位置資訊。那麼你自己實作的state中就要自己去寫更新和查詢的方法了:
public class LocationDB implements State {
public void beginCommit(Long txid) {
}
public void commit(Long txid) {
}
public void setLocation(long userId, String location) {
// code to access database and set location
//自己寫的向資料庫中儲存使用者位置資訊的方法,這個方法會在你自己實作的
//BaseStateUpdater中調用(呵呵,自己實作然後自己調用)
}
public String getLocation(long userId) {
// code to get location from database
//自己寫的從資料庫中查找使用者位置資訊的方法,這個方法會在你自己實作的
//BaseQueryFunction中調用(也是自己實作自己調用)
}}
然後你就要實作一個Trdient定義的StateFactory ,使你能夠在Trient的task中建立你自己的state。下面是為LocationDB 實作的StateFactory:
public class LocationDBFactory implements StateFactory {
public State makeState(Map conf, int partitionIndex, int numPartitions) {
return new LocationDB();
} }
Trident提供了QueryFunction 用來對state進行查詢,提供了StateUpdater 用來對state進行更新操作。讓我們來寫一個QueryLocation的操作,該操作從LocationDB中查詢使用者的位置資訊。首先來看看那你該如何在拓撲中使用QueryLocation操作。假設你的拓撲接收一個使用者的id的輸入流。
TridentTopology topology = new TridentTopology();
TridentState locations = topology.newStaticState(new LocationDBFactory());
topology.newStream("myspout", spout)
.stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))
//.stateQuey就是查詢了,第一個參數指定了要查詢的state(這個state使用LocationDBFactory來建立的,這就是為什麼要為你的state建立一個stateFactory了,因為你無法在Trident的API中直接new你的state,你隻能new stateFactory,然後Trident會調用其中的makeState方法來建立state);第二個參數就是輸入的流的字段,這裡把userId輸入到操作中;第三個參數就是你自己實作的QueryFunction 用來執行查詢操作;第四個參數是輸出字段。
好了,現在可以來看看如何來實作一個自己的QueryFunction 了。
public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
//查詢的方法,下面的代碼都是要自己寫的
List<String> ret = new ArrayList();
for(TridentTuple input: inputs) {
ret.add(state.getLocation(input.getLong(0)));//每次查詢一個,效率不高
}
return ret;//這個ret的類型是你自己定義好的泛型(在類的開始處)
//傳回的ret會循環調用下面的execute方法來發送每一個location
}
public void execute(TridentTuple tuple, String location, TridentCollector collector) {
//發送輸出資料的方法,輸出字段的定義在上面已經完成了,說白了還是一個bolt節點 ps:在新的版本中String location已經變成了一個List了,也就是ret一次都傳進來了,在execute方法中進行周遊
collector.emit(new Values(location));
}
}
QueryFunction的執行分為兩步:第一步,Trident會收集一個batch的輸入資料然後把他們傳遞給batchRetrieve。在這個例子中,batchRetrieve會接收到很多的使用者ID。BatchRetrieve方法需要傳回和接收到的batch中的tuple的數量相同的一個list資料。List中的第一個元素對應第一個tuple查詢的結果,第二個元素對應第二個tuple查詢的結果,以此類推。
也許你會看出上面的代碼中沒有利用Trident所提供的batch的優勢,因為它每次隻從LocationDB 中查詢一條資料。是以可以把LocationDB 向下面這樣優化一下:
public class LocationDB implements State {
public void beginCommit(Long txid) {
}
public void commit(Long txid) {
}
public void setLocationsBulk(List<Long> userIds, List<String> locations) {
// set locations in bulk批量進行更新
}
public List<String> bulkGetLocations(List<Long> userIds) {
// get locations in bulk批量進行查詢
}}
有了上面優化後的LocationDB ,那麼QueryLocation 就也需要修改一下了:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
List<Long> userIds = new ArrayList<Long>();
for(TridentTuple input: inputs) {
userIds.add(input.getLong(0));
}
return state.bulkGetLocations(userIds);//一次查一批...
}
public void execute(TridentTuple tuple, String location, TridentCollector collector) {
collector.emit(new Values(location));
} }
将QueryLocation 修改為上面的樣子以後,就可以大大減少對資料庫的請求了。
查詢說完了,下面就是如何來更新state了。你要利用StateUpdater 接口來實作自己的目的。下面是例子:
public class LocationUpdater extends BaseStateUpdater<LocationDB> {
public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {//很簡單
List<Long> ids = new ArrayList<Long>();
List<String> locations = new ArrayList<String>();
for(TridentTuple t: tuples) {
ids.add(t.getLong(0));
locations.add(t.getString(1));
}
state.setLocationsBulk(ids, locations);
}}
有了上面的代碼,你就可以像下面這樣在Trident中來更新state了
TridentTopology topology = new TridentTopology();
TridentState locations =
topology.newStream("locations", locationsSpout)
.partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater())
第一個參數就是LocationDB對應的stateFactory;第二個參數是輸入的流的字段;第三個就是上面寫的更新操作了。
partitionPersist 操作會更新一個State。其内部是将 State和一批更新的tuple交給StateUpdater,由StateUpdater完成相應的更新操作。
在這段代碼中,隻是簡單的從輸入的tuple中提取處userid和對應的location,并一起更新到State中。
partitionPersist 會傳回一個TridentState對象來表示被這個Trident topoloy更新過的locationDB。 然後你就可以使用這個state在topology的任何地方進行查詢操作了。
同時,你也可以看到我們傳了一個TridentCollector給StateUpdaters。 emit到這個collector的tuple就會去往一個新的stream。在這個例子中,我們并沒有去往一個新的stream的需要,但是如果你在做一些事情,比如說更新資料庫中的某個count,你可以emit更新的count到這個新的stream。然後你可以通過調用TridentState#newValuesStream方法來通路這個新的stream來進行其他的處理。
persistentAggregate
persistentAggregate是另一個用來更新state的方法, 你在之前的word count例子中應該已經見過了,如下:
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
persistentAggregate是在partitionPersist之上的另外一層抽象。它知道怎麼去使用一個Trident 聚合器來更新State。在這個例子當中,因為這是一個group好的stream,Trident會期待你提供的state是實作了MapState接口的。用來進行group的字段會以key的形式存在于State當中,聚合後的結果會以value的形式存儲在State當中。MapState接口看上去如下所示:
public interface MapState<T> extends State {
List<T> multiGet(List<List<Object>> keys);
List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
void multiPut(List<List<Object>> keys, List<T> vals);}
當你在一個未經過group的stream上面進行聚合的話,Trident會期待你的state實作Snapshottable接口:
public interface Snapshottable<T> extends State {
T get();
T update(ValueUpdater updater);
void set(T o);
}
MemoryMapState 和 MemcachedState 都實作了上面的2個接口。(自己寫的mapState也會實作上面的兩個接口)
Implementing Map States
在Trident中實作MapState是非常簡單的,它幾乎幫你做了所有的事情。OpaqueMap, TransactionalMap, 和 NonTransactionalMap 類實作了所有相關的邏輯,包括容錯的邏輯。你隻需要将一個IBackingMap 的實作提供給這些類就可以了。IBackingMap接口看上去如下所示:
public interface IBackingMap<T> {
List<T> multiGet(List<List<Object>> keys);
void multiPut(List<List<Object>> keys, List<T> vals);
}
OpaqueMap's會用OpaqueValue的value來調用multiPut方法,TransactionalMap's會提供TransactionalValue中的value,而NonTransactionalMaps隻是簡單的把從Topology擷取的object傳遞給multiPut。
Trident還提供了一種CachedMap類來進行自動的LRU 緩存。
另外,Trident 提供了 SnapshottableMap 類将一個MapState 轉換成一個 Snapshottable 對象.(用來對沒有進行group by 的流進行全局彙總)
大家可以看看 MemcachedState的實作,進而學習一下怎樣将這些工具組合在一起形成一個高性能的MapState實作。MemcachedState是允許大家選擇使用opaque transactional, transactional, 還是 non-transactional 語義的。
Ps:翻譯的内容就這麼多了,其實網上翻譯的很多,但是看了以後并不能給很多新手帶來一些幫助(原文寫的太高深了)。努力翻譯了一下,但是還是覺得有很多沒有說清楚,下面會抽時間把storm官方提供的 hbase相關的trident state的源代碼解讀一下,我覺得隻有解讀一下這個源代碼,才會讓人更加清晰 state當地怎麼用,以及如何寫自己的state。