天天看點

hbase源碼系列(二)HTable 探秘

hbase的源碼終于搞一個段落了,在接下來的一個月,着重于把看過的源碼提煉一下,對一些有意思的主題進行分享一下。繼上一篇講了負載均衡之後,這一篇我們從client開始講吧,從client到master再到region server,按照這個順序來開展,網友也可以對自己感興趣的部分給我留言或者直接聯系我的qq。

現在我們講一下htable吧,為什麼講htable,因為這是我們最常見的一個類,這是我們對hbase中資料的操作的入口。

下面是一個很簡單往hbase插入一條記錄的例子。

我們平常就是采用這種方式送出的資料,為了提高重用性采用htablepool,最新的api推薦使用hconnection.gettable("test")來獲得htable,舊的htablepool已經被抛棄了。好,我們下面開始看看htable内部是如何實作的吧,首先我們看看它内部有什麼屬性。

主要是靠上面的這些家夥來幹活的,這裡面的connection、ap、rpccallerfactory是用來和背景通信的,htable隻是做一個操作,資料進來之後,添加到writeasyncbuffer,滿足條件就flush。

下面看看table.put是怎麼執行的:

執行put操作,如果是autofush,就送出,先看doput的過程,如果之前的ap異步送出到有問題,就先進行背景送出,不過這次是同步的,如果沒有錯誤,就把put添加到隊列當中,然後檢查一下目前的 buffer的大小,超過我們設定的内容的時候,就flush掉。

寫下來,讓我們看看backgroundflushcommits這個方法吧,它的核心就這麼一句ap.submit(writeasyncbuffer, true) ,如果出錯了的話,就報錯了。是以網上所有關于用戶端調優的方法裡面無非就這麼幾種:

1)關閉autoflush

2)關閉wal日志

3)把writebuffersize設大一點,一般說是設定成5mb

經過實踐,就第二條關閉日志的效果比較明顯,其它的效果都不明顯,因為送出的過程是異步的,是以送出的時候占用的時間并不多,送出到server端後,server還有一個寫入的隊列,(⊙o⊙)… 讓人想起小米手機那惡心的排隊了。。。是以大規模寫入資料,别指望着用put來解決。。。mapreduce生成hfile,然後用bulk load的方式比較好。

不廢話了,我們繼續追蹤ap.submit方法吧,f3進去。

循環周遊r,為每個r找到它的位置loc,loc是hregionlocation,裡面記錄着這行記錄所在的目标region所在的位置,loc怎麼獲得呢,走進finddestlocation方法裡面,看到了這麼一句。

定位到它的位置之後,它把loc添加到了actionsbyserver,一個region server對應一組操作。(插句題外話為什麼這裡叫action呢,其實我們熟知的put、delete,以及不常用的append、increment都是繼承自row的,在接口傳遞時候,其實都是視為一種操作,到了背景之後,才做區分)。

接下來,就是多線程的rpc送出了。

再深挖一點,把它們的實作都扒出來吧。

ok,看到了,先構造一個multiservercallable,然後再通過rpccallerfactory做最後的call操作。

好了,到這裡再總結一下put操作吧,前面寫得有點兒淩亂了。

1)把put操作添加到writeasyncbuffer隊列裡面,符合條件(自動flush或者超過了閥值writebuffersize)就通過asyncprocess異步批量送出。

2)在送出之前,我們要根據每個rowkey找到它們歸屬的region server,這個定位的過程是通過hconnection的locateregion方法獲得的,然後再把這些rowkey按照hregionlocation分組。

3)通過多線程,一個hregionlocation構造multiservercallable<row>,然後通過rpccallerfactory.<multiresponse> newcaller()執行調用,忽略掉失敗重新送出和錯誤處理,用戶端的送出操作到此結束。

對于delete,我們也可以通過以下代碼執行一個delete操作。

這個操作比較幹脆,new一個regionservercallable<boolean>,直接走rpc了,爽快啊。

這裡面注意一下這行mutateresponse response = getstub().mutate(null, request);

getstub()傳回的是一個clientservice.blockinginterface接口,實作這個接口的類是hregionserver,這樣子我們就知道它在服務端執行了hregionserver裡面的mutate方法。

get操作也和delete一樣簡單。

get操作也沒幾行代碼,還是直接走的rpc。

注意裡面的protobufutil.get操作,它其實是建構了一個getrequest,需要的參數是regionname和get,然後走hregionserver的get方法,傳回一個getresponse。

針對put、delete、get都有相應的操作的方式:

1.put(list)操作,很多童鞋以為這個可以提高寫入速度,其實無效。。。為啥?因為你構造了一個list進去,它再周遊一下list,執行doput操作。。。。反而還慢點。

2.delete和get的批量操作走的都是connection.processbatchcallback(actions, tablename, pool, results, callback),具體的實作在hconnectionmanager的靜态類hconnectionimplementation裡面,結果我們驚人的發現:

它走的還是put一樣的操作,既然是一樣的,何苦代碼寫得那麼繞呢?

現在講一下scan吧,這個操作相對複雜點。還是老規矩,先上一下代碼吧。

scan查詢的時候,設定startrow和stoprow可是重頭戲,假設我這裡要查我01月01日到04月29日總共發了多少業務,中間是業務類型,但是我可能是所有的都查,或者隻查一部分,在所有都查的情況下,我就不能設定了,那但是startrow和stoprow我不能空着啊,是以這裡可以填00000-zzzzz,隻要保證它在這個區間就可以了,然後我們加了一個rowfilter,然後引入了正規表達式,之前好多人一直在問啊問的,不過我這個例子,其實不要也可以,因為是查所有業務的,在startrow和stoprow之間的都可以要。

好的,我們接着看,f3進入getscanner方法。

這個scan還分大小, 沒關系,我們進入clientscanner看一下吧, 在clientscanner的構造方法裡面發現它會去調用nextscanner去初始化一個scannercallable。好的,我們接着來到scannercallable裡面,這裡需要注意的是它的兩個方法,prepare和call方法。在prepare裡面它主要幹了兩個事情,獲得region的hregionlocation和clientservice.blockinginterface接口的執行個體,之前說過這個繼承這個接口的隻有region server的實作類。

ok,我們下面看看call方法吧。

在call方法裡面,我們可以看得出來,執行個體化scanrequest,然後調用scan方法的時候把payloadcarryingrpccontroller傳過去,這裡跟蹤了一下,如果設定了codec的就從payloadcarryingrpccontroller裡面傳回結果,否則從response裡面傳回。

好的,下面看next方法吧。

從next方法裡面可以看出來,它是一次取caching條資料,然後下一次擷取的時候,先把上次擷取的最後一個給排除掉,再擷取下來儲存在cache當中,隻要緩存不空,就一直在緩存裡面取。

好了,至此scan到此結束。

繼續閱讀