天天看點

Trident State譯文Trident State 譯文Transactional spoutsOpaque transactional spoutsNon-transactional spoutsSummary of spout and state typesState APIspersistentAggregateImplementing Map States

Trident State 譯文

Trident針對狀态化的資料源的讀寫進行了一流的分裝。State可以包含在拓撲中-例如,儲存在記憶體中,有HDFS提供備份-也可以儲存在一個外部的資料庫中,像Memcached和Cassandra。針對以上的所有情況,Trident的API都是一樣的。

為了保證state在更新的過程中遇到重試或者失敗處理時任然能夠具有幂等性,storm采取了必要的容錯。也就是說,storm能夠做到每一條消息僅且僅被處理一次。

在進行state更新操作的時候,可以選擇不同等級的容錯方式;在看這些容錯方式之前,讓我們來用一個例子說明如何保證僅且僅被處理一次的語意。假設你正在某個流中進行累加的聚合操作,并且準備把聚合的結果儲存在資料庫中。 現在你在資料庫中儲存了一個值來表示累加的結果,每處理一個tuple你就對資料庫中的值進行一次累加操作。

當失敗處理發生的時候,tuples就會被重放。這就給state的更新操作(還有任何會帶來副作用的操作)帶來了問題--你将無法确定你是否已經基于這個被重發的tuple對state成功地進行了更新操作。也許你還從來沒有處理過這個tuple,在這種情況下你就需要對資料庫中的值進行一次累加操作。也許你已經成功處理過這個tuple并且對資料庫中的值進行過了一次累加操作,但是這個tuple在你更新state之後的某個環節出錯了;在這種情況下,你在接收到這個tuple的時候就不應該對資料庫中的值進行累加操作了。也可能這個tuple曾經出現過,但是在對資料庫中的值進行累加的時候出錯了,在這種情況下你需要對資料庫中的值進行累加操作。

僅僅在資料庫中儲存累加的值,你永遠無法确定這個tuple是否已經被處理過了。是以你需要更多的資訊來幫助你做出正确的決定。Trident提供了一下的語義來幫助使用者獲得僅且僅被處理一次的語義。

1.所有的tuple都是一小批一小批的發送的(以batch的方式發送)。

2.每一個批量的tuple都會被賦予一個唯一的"transaction id" (txid);加入該批tuple被重播,那麼該批tuple仍然保持相同的txid。

3.State的更新在各個批次的tuple之間是有序的,也就是說,隻有第2批成功更新以後,第3批才會執行對state的更新操作。

有了這些保障,你自己的state就能夠檢測到某一批tuple是否被處理過,并選擇正确的方式來更新state。你到底要采取什麼樣的方式來更新state依賴于輸入的spout也就是每一個批量的tuple所提供的一緻性語義。Storm提供三種容錯級别的soput:"non-transactional"(非事務型), "transactional"(事務型), 和 "opaque transactional"(透明事務型)。同樣的storm也提供了三種容錯級别的state:"non-transactional"(非事務型), "transactional"(事務型), 和 "opaque transactional"(透明事務型)。讓我們來看看每一種事務類型的spout,以及通過每種spout你所能獲得的容錯方式。

Transactional spouts

記住,Trident總是一小批一小批的處理tuple,并且每一個批次有一個唯一的事務ID。Spout的特性有其鎖提供的保障措施決定;事務型的spout具有一下特性:

1. 一個txid所對應的batch永遠是相同的。同一個txid的重放的batch永遠和之前該txid所對應的batch相同。

2. 不同batch中的tuple之間不會存在交集(一個tuple不是屬于這個batch,就是屬于另一個batch,永遠不能同時屬于兩個以上的batch)。

3. 每一個tuple都一定會在一個batch中被發送(沒有任何一個tuple被遺漏)。

這是一種很容易了解的spout類型,一個流被劃分成固定的批次,并且永遠不會改變。Storm提供了一個針對kafka的事務型spout。

你也許會問:為什麼我們不總是使用transactional spout?這很容易了解。一個原因是并不是所有的地方都需要容錯的。舉例來說,TransactionalTridentKafkaSpout 工作的方式是給定一個txid的batch所包含的一個屬于一個topic的來自于所有Kafka partition的tuple序列。一旦這個batch被發出,在任何時候如果這個batch被重新發出時,它必須包含原來所有的tuple以滿足 transactional spout的語義。現在我們假定一個batch被TransactionalTridentKafkaSpout所發出,這個batch沒有被成功處理,并且同時kafka的一個節點也down掉了。你就無法像之前一樣重播一個完全一樣的batch(因為kakfa的節點down掉,該topic的一部分partition可能會無法使用),整個處理會被中斷。

這也就是"opaque transactional" spouts(不透明事務spout)存在的原因- 他們對于丢失源節點這種情況是容錯的,仍然能夠幫你達到有且隻有一次處理的語義。後面會對這種spout有所介紹。

(當然,在Kafka開啟replication功能時,transactional spout也是可以做到容錯的;現在的kafka已經完全支援了,是以,上文中所說的當一個節點挂掉以後TransactionalTridentKafkaSpout無法正常工作的情況也就不存在了,也正是因為這樣,大部分時間都選擇使用了TransactionalTridentKafkaSpout,個人感覺在使用kafka的時候"opaque transactional" spouts确實沒有存在的意義)

在讨論 "opaque transactional" spouts之前,讓我們先來看看你該如何為transactional spout設計一個具有僅且僅處理一次的state。這個state的類型被稱為"transactional state" ,它利用任何txid都永遠對應與相同一個批次的tuple的特性。

假設你的拓撲是用來統計單詞個數的,并且你将要把統計結果儲存在一個key-value資料庫中。Key肯定就是對應的單詞了,值當然就是統計結果。你已經看到隻是存儲一個數量是不足以知道你是否已經處理過一個batch的。是以,你需要将txid和統計結果一起儲存在值中。那麼,當你需要更新統計結果的時候,你隻需要比較一下資料庫中的txid和目前batch的txid是否相同;如果相同,你就直接跳過更新操作--因為有強順序的保障,你可以肯定資料庫中的值已經包含了目前batch。如果不相同,你就修改統計結果。這個邏輯之是以能說的通是因為batch的txid永遠不會改變,并且batch之間有序地對state進行更新操作。

用一個例子來說明這個邏輯為什麼行得通,假如你發送了一個txid=3的batch,該batch中包含一下的tuple:

[“man”]

[“man”]

[“dog”]

假設現在資料庫中儲存這如下的key-value資料:

man => [count=3,txid=1]

dog => [count=4,txid=3]

apple =>[count=10,txid=2]

和man相關聯的txid是1;由于目前的batch的txid是3,那麼你就可以肯定這批tuple中man 的值還沒有累加到資料庫中。是以你可以給man的count累加2,并且更新txid為3。然而,dog對應的txid在資料庫中和目前batch中 一樣,是以你可以肯定對于dog來說目前batch中的值已經在資料庫中增加過了。那麼就選擇跳過更新。在該batch更新後,資料庫中的資料如下所示:

man => [count=5,txid=3]

dog => [count=4,txid=3]

apple =>[count=10,txid=2]

接下來我們一起再來看看 opaque transactional spout以及怎樣去為這種spout設計相應的state。

Opaque transactional spouts

opaque transactional spout并不能保證每一個txid永遠對應一個相同的batch,opaque transactional spout擁有如下特性:

1. 每一個tuple都隻會在一個batch中執行通過。也就是說,一個tuple在某一個batch處理失敗了,該tuple可能在之後的另一個新的batch中處理成功。

OpaqueTridentKafkaSpout就是一個擁有該特性的spout,該spout允許kafka節點挂掉。每當OpaqueTridentKafkaSpout要發送一個新的batch的時候,它将會從上一個batch所成功發送的tuple的結束點開始發送,這就保證了沒有tuple會被遺漏掉,也保證了一個tuple不會被多個batch成功處理。

在使用opaque transactional spouts的時候,再使用和transactional spout相同的處理方式:判斷資料庫中存放的txid和目前txid去做對比已經不好用了。這是因為在state的更新過程之間,batch可能已經變了。

你能做的就是在資料庫中儲存更多的狀态;除了儲存值和txid以外,你還需要儲存更新前的值(previous value)。讓我們還是用上面的例子來說明這個邏輯。假定你目前batch中的對應count是“2”, 并且我們需要進行一次狀态更新。而目前資料庫中存儲的資訊如下:

{

      value = 4,

  prevValue = 1,

  txid = 2

}

假設目前的txid為3,和資料庫中的txid(2)不同。在這種情況下,你把“preValue”設定為“value”,然後将value增加2,并更新txid為3。操作過後的資料庫内容變成了下面的樣子:

{

  value = 6,

  prevValue = 4,

  txid = 3

}

再假設目前的txid為2,和資料庫中的txid(2)相同。這時你可以确定資料庫中的“value”被之前擁有相同txid的batch更新過,但是之前的batch和現在的batch内容可能不同了。是以你要做的是讓“value”的值等于“preValue”加2,操作過後的資料庫内容變成了下面的樣子:

{

  value = 3,

  prevValue = 1,

  txid = 2

}

--------------------------------------------------------------------------------------------------------------------------

注:這裡了解起來可能有些晦澀,舉個例子吧。

假設一個batch的大小為3,有下面這麼多tuple要進行累加:

[dog] [dog] [man] [man] [man]

假設資料庫中現在的資訊為:

dog =>{value=2,prevValue=1,txid=1}

man =>{value=3,prevValue=1,txid=1}

然後發送一個txid為2的batch {[dog] [dog] [man]}

然後進行儲存操作,

man 成功儲存,但是dog儲存的時候發生了錯誤,是以資料庫中的資訊變成了

dog =>{value=2,prevValue=1,txid=1}

man =>{value=4,prevValue=3,txid=2}

那麼失敗了,就會有batch的重發,恰好這是負責發送第一個 [dog]的kafka節點壞掉了,batch無法獲得第一個[dog]了,那麼就隻能從第二個dog開始發了,是以發送的batch的txid依然為2,内容為{[dog] [man] [man]}

到這裡,dog 的兩個txid不同,更新;但是man txid相同了,是以用prevValue+2來更新value;從這裡應該可以看出,為什麼是這樣做了。

更新後的結果如下:

dog =>{value=3,prevValue=2,txid=2}

man =>{value=5,prevValue=3,txid=2}

---------------------------------------------------------------------------------------------------------------------------------

因為Trident保證了batch之間的強順序性,是以這種方法是有效的。一旦Trident去處理一個新的batch,它就不會重新回到之前的任何一個batch。并且由于opaque transactional spout確定在各個batch之間是沒有共同成員的,每個tuple隻會在一個batch中被成功處理,你可以安全的在之前的值上進心更新。

Non-transactional spouts

Non-transactional spout(非事務spout)不提供任何的保障。是以在tuple處理失敗後不進行重發的情況下,一個tuple可能是最多被處理一次的。同時他也可能會是至少處理一次的,如果tuple在不同的batch中被多次成功處理的時候。無論怎樣,這種spout是不可能實作有且隻有一次被成功處理的語義的。

Summary of spout and state types

這個圖展示了哪些spout和state的組合能夠實作有且隻有一次被成功處理的語義:

Trident State譯文Trident State 譯文Transactional spoutsOpaque transactional spoutsNon-transactional spoutsSummary of spout and state typesState APIspersistentAggregateImplementing Map States

Opaque transactional state有着最為強大的容錯性。但是這是以存儲更多的資訊作為代價的。Transactional states 需要存儲較少的狀态資訊,但是僅能和 transactional spouts協同工作. 最後, non-transactional state所需要存儲的資訊最少,但是卻不能實作有且隻有一次被成功處理的語義。

State和Spout類型的選擇其實是一種在容錯性和存儲消耗之間的權衡,你的應用的需要會決定那種組合更适合你。

State APIs

在前面你已經看到了一些用來實作僅且僅執行一次語義的複雜方法,有一個關于Trident的好消息就是,Trident把所有容錯的邏輯都在state内部實作了。那麼作為一個使用者,你就從比較txid,儲存多餘的值到資料庫中,或者任何像它們兩個那樣的苦差事中脫離了出來。你隻需要像下面這樣寫代碼就可以了:

TridentTopology topology = new TridentTopology();        

TridentState wordCounts =

      topology.newStream("spout1", spout)

        .each(new Fields("sentence"), new Split(), new Fields("word"))

        .groupBy(new Fields("word"))

        .persistentAggregate(MemcachedState.opaque(serverLocations),new Count(),new Fields("count")) //重點就是這句了,這裡其實使用了mapState,用來做批量的聚合結果的保//存               

        .parallelismHint(6);

所有管理opaque transactional state的必要邏輯都在MemcachedState.opaque方法内部實作了。另外,更新操作是批量進行的,以減少對資料庫的壓力。

基礎的state接口隻有兩個方法:

public interface State {

    void beginCommit(Long txid); // can be null for things like partitionPersist occurring off //a DRPC stream(放生在DRPC流中的partitionPersist操作中,txid可能為空)

    void commit(Long txid);

}

在這個接口所提供的兩個方法中,你可以知道什麼時候開了對state的更新操作,什麼時候完成了對state的更新操作,在每個方法中你都能夠獲得txid。Trident對你的state是如何工作的沒有做出任何的假設(也就是說,你要自己寫更新和查詢方法)。

加入你自己有一套資料庫,并且希望通過Trident來在其中更新、查詢使用者的位置資訊。那麼你自己實作的state中就要自己去寫更新和查詢的方法了:

public class LocationDB implements State {

    public void beginCommit(Long txid) {    

    }

    public void commit(Long txid) {    

    }

    public void setLocation(long userId, String location) {

      // code to access database and set location

 //自己寫的向資料庫中儲存使用者位置資訊的方法,這個方法會在你自己實作的

//BaseStateUpdater中調用(呵呵,自己實作然後自己調用)

    }

    public String getLocation(long userId) {

      // code to get location from database

          //自己寫的從資料庫中查找使用者位置資訊的方法,這個方法會在你自己實作的

//BaseQueryFunction中調用(也是自己實作自己調用)

    }}

然後你就要實作一個Trdient定義的StateFactory ,使你能夠在Trient的task中建立你自己的state。下面是為LocationDB 實作的StateFactory:

public class LocationDBFactory implements StateFactory {

   public State makeState(Map conf, int partitionIndex, int numPartitions) {

      return new LocationDB();

   } }

Trident提供了QueryFunction 用來對state進行查詢,提供了StateUpdater 用來對state進行更新操作。讓我們來寫一個QueryLocation的操作,該操作從LocationDB中查詢使用者的位置資訊。首先來看看那你該如何在拓撲中使用QueryLocation操作。假設你的拓撲接收一個使用者的id的輸入流。

TridentTopology topology = new TridentTopology();

TridentState locations = topology.newStaticState(new LocationDBFactory());

topology.newStream("myspout", spout)

.stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))

//.stateQuey就是查詢了,第一個參數指定了要查詢的state(這個state使用LocationDBFactory來建立的,這就是為什麼要為你的state建立一個stateFactory了,因為你無法在Trident的API中直接new你的state,你隻能new stateFactory,然後Trident會調用其中的makeState方法來建立state);第二個參數就是輸入的流的字段,這裡把userId輸入到操作中;第三個參數就是你自己實作的QueryFunction 用來執行查詢操作;第四個參數是輸出字段。

好了,現在可以來看看如何來實作一個自己的QueryFunction 了。

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {

   public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {

//查詢的方法,下面的代碼都是要自己寫的

        List<String> ret = new ArrayList();

        for(TridentTuple input: inputs) {

            ret.add(state.getLocation(input.getLong(0)));//每次查詢一個,效率不高

        }

        return ret;//這個ret的類型是你自己定義好的泛型(在類的開始處)

//傳回的ret會循環調用下面的execute方法來發送每一個location

    }

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {

//發送輸出資料的方法,輸出字段的定義在上面已經完成了,說白了還是一個bolt節點 ps:在新的版本中String location已經變成了一個List了,也就是ret一次都傳進來了,在execute方法中進行周遊

        collector.emit(new Values(location));

    }    

}

QueryFunction的執行分為兩步:第一步,Trident會收集一個batch的輸入資料然後把他們傳遞給batchRetrieve。在這個例子中,batchRetrieve會接收到很多的使用者ID。BatchRetrieve方法需要傳回和接收到的batch中的tuple的數量相同的一個list資料。List中的第一個元素對應第一個tuple查詢的結果,第二個元素對應第二個tuple查詢的結果,以此類推。

也許你會看出上面的代碼中沒有利用Trident所提供的batch的優勢,因為它每次隻從LocationDB 中查詢一條資料。是以可以把LocationDB 向下面這樣優化一下:

public class LocationDB implements State {

    public void beginCommit(Long txid) {    

    }

    public void commit(Long txid) {    

    }

    public void setLocationsBulk(List<Long> userIds, List<String> locations) {

      // set locations in bulk批量進行更新

    }

    public List<String> bulkGetLocations(List<Long> userIds) {

      // get locations in bulk批量進行查詢

}}

有了上面優化後的LocationDB ,那麼QueryLocation 就也需要修改一下了:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {

    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {

        List<Long> userIds = new ArrayList<Long>();

        for(TridentTuple input: inputs) {

            userIds.add(input.getLong(0));

        }

        return state.bulkGetLocations(userIds);//一次查一批...

    }

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {

        collector.emit(new Values(location));

    }    }

将QueryLocation 修改為上面的樣子以後,就可以大大減少對資料庫的請求了。

查詢說完了,下面就是如何來更新state了。你要利用StateUpdater 接口來實作自己的目的。下面是例子:

public class LocationUpdater extends BaseStateUpdater<LocationDB> {

    public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {//很簡單

        List<Long> ids = new ArrayList<Long>();

        List<String> locations = new ArrayList<String>();

        for(TridentTuple t: tuples) {

            ids.add(t.getLong(0));

            locations.add(t.getString(1));

        }

        state.setLocationsBulk(ids, locations);

    }}

有了上面的代碼,你就可以像下面這樣在Trident中來更新state了

TridentTopology topology = new TridentTopology();

TridentState locations =

topology.newStream("locations", locationsSpout)

.partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater())

第一個參數就是LocationDB對應的stateFactory;第二個參數是輸入的流的字段;第三個就是上面寫的更新操作了。

partitionPersist 操作會更新一個State。其内部是将 State和一批更新的tuple交給StateUpdater,由StateUpdater完成相應的更新操作。

在這段代碼中,隻是簡單的從輸入的tuple中提取處userid和對應的location,并一起更新到State中。

partitionPersist 會傳回一個TridentState對象來表示被這個Trident topoloy更新過的locationDB。 然後你就可以使用這個state在topology的任何地方進行查詢操作了。

同時,你也可以看到我們傳了一個TridentCollector給StateUpdaters。 emit到這個collector的tuple就會去往一個新的stream。在這個例子中,我們并沒有去往一個新的stream的需要,但是如果你在做一些事情,比如說更新資料庫中的某個count,你可以emit更新的count到這個新的stream。然後你可以通過調用TridentState#newValuesStream方法來通路這個新的stream來進行其他的處理。

persistentAggregate

persistentAggregate是另一個用來更新state的方法, 你在之前的word count例子中應該已經見過了,如下:

TridentTopology topology = new TridentTopology();

TridentState wordCounts =

      topology.newStream("spout1", spout)

        .each(new Fields("sentence"), new Split(), new Fields("word"))

        .groupBy(new Fields("word"))

        .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))

persistentAggregate是在partitionPersist之上的另外一層抽象。它知道怎麼去使用一個Trident 聚合器來更新State。在這個例子當中,因為這是一個group好的stream,Trident會期待你提供的state是實作了MapState接口的。用來進行group的字段會以key的形式存在于State當中,聚合後的結果會以value的形式存儲在State當中。MapState接口看上去如下所示:

public interface  MapState<T> extends State {

    List<T> multiGet(List<List<Object>> keys);

    List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);

    void multiPut(List<List<Object>> keys, List<T> vals);}

當你在一個未經過group的stream上面進行聚合的話,Trident會期待你的state實作Snapshottable接口:

public interface  Snapshottable<T> extends State {

    T get();

    T update(ValueUpdater updater);

    void set(T o);

}

MemoryMapState 和 MemcachedState 都實作了上面的2個接口。(自己寫的mapState也會實作上面的兩個接口)

Implementing Map States

在Trident中實作MapState是非常簡單的,它幾乎幫你做了所有的事情。OpaqueMap, TransactionalMap, 和 NonTransactionalMap 類實作了所有相關的邏輯,包括容錯的邏輯。你隻需要将一個IBackingMap 的實作提供給這些類就可以了。IBackingMap接口看上去如下所示:

public interface IBackingMap<T> {

    List<T> multiGet(List<List<Object>> keys);

    void multiPut(List<List<Object>> keys, List<T> vals);

}

OpaqueMap's會用OpaqueValue的value來調用multiPut方法,TransactionalMap's會提供TransactionalValue中的value,而NonTransactionalMaps隻是簡單的把從Topology擷取的object傳遞給multiPut。

Trident還提供了一種CachedMap類來進行自動的LRU 緩存。

另外,Trident 提供了 SnapshottableMap 類将一個MapState 轉換成一個 Snapshottable 對象.(用來對沒有進行group by 的流進行全局彙總)

大家可以看看 MemcachedState的實作,進而學習一下怎樣将這些工具組合在一起形成一個高性能的MapState實作。MemcachedState是允許大家選擇使用opaque transactional, transactional, 還是 non-transactional 語義的。

Ps:翻譯的内容就這麼多了,其實網上翻譯的很多,但是看了以後并不能給很多新手帶來一些幫助(原文寫的太高深了)。努力翻譯了一下,但是還是覺得有很多沒有說清楚,下面會抽時間把storm官方提供的 hbase相關的trident state的源代碼解讀一下,我覺得隻有解讀一下這個源代碼,才會讓人更加清晰 state當地怎麼用,以及如何寫自己的state。