天天看点

hbase源码系列(二)HTable 探秘

hbase的源码终于搞一个段落了,在接下来的一个月,着重于把看过的源码提炼一下,对一些有意思的主题进行分享一下。继上一篇讲了负载均衡之后,这一篇我们从client开始讲吧,从client到master再到region server,按照这个顺序来开展,网友也可以对自己感兴趣的部分给我留言或者直接联系我的qq。

现在我们讲一下htable吧,为什么讲htable,因为这是我们最常见的一个类,这是我们对hbase中数据的操作的入口。

下面是一个很简单往hbase插入一条记录的例子。

我们平常就是采用这种方式提交的数据,为了提高重用性采用htablepool,最新的api推荐使用hconnection.gettable("test")来获得htable,旧的htablepool已经被抛弃了。好,我们下面开始看看htable内部是如何实现的吧,首先我们看看它内部有什么属性。

主要是靠上面的这些家伙来干活的,这里面的connection、ap、rpccallerfactory是用来和后台通信的,htable只是做一个操作,数据进来之后,添加到writeasyncbuffer,满足条件就flush。

下面看看table.put是怎么执行的:

执行put操作,如果是autofush,就提交,先看doput的过程,如果之前的ap异步提交到有问题,就先进行后台提交,不过这次是同步的,如果没有错误,就把put添加到队列当中,然后检查一下当前的 buffer的大小,超过我们设置的内容的时候,就flush掉。

写下来,让我们看看backgroundflushcommits这个方法吧,它的核心就这么一句ap.submit(writeasyncbuffer, true) ,如果出错了的话,就报错了。所以网上所有关于客户端调优的方法里面无非就这么几种:

1)关闭autoflush

2)关闭wal日志

3)把writebuffersize设大一点,一般说是设置成5mb

经过实践,就第二条关闭日志的效果比较明显,其它的效果都不明显,因为提交的过程是异步的,所以提交的时候占用的时间并不多,提交到server端后,server还有一个写入的队列,(⊙o⊙)… 让人想起小米手机那恶心的排队了。。。所以大规模写入数据,别指望着用put来解决。。。mapreduce生成hfile,然后用bulk load的方式比较好。

不废话了,我们继续追踪ap.submit方法吧,f3进去。

循环遍历r,为每个r找到它的位置loc,loc是hregionlocation,里面记录着这行记录所在的目标region所在的位置,loc怎么获得呢,走进finddestlocation方法里面,看到了这么一句。

定位到它的位置之后,它把loc添加到了actionsbyserver,一个region server对应一组操作。(插句题外话为什么这里叫action呢,其实我们熟知的put、delete,以及不常用的append、increment都是继承自row的,在接口传递时候,其实都是视为一种操作,到了后台之后,才做区分)。

接下来,就是多线程的rpc提交了。

再深挖一点,把它们的实现都扒出来吧。

ok,看到了,先构造一个multiservercallable,然后再通过rpccallerfactory做最后的call操作。

好了,到这里再总结一下put操作吧,前面写得有点儿凌乱了。

1)把put操作添加到writeasyncbuffer队列里面,符合条件(自动flush或者超过了阀值writebuffersize)就通过asyncprocess异步批量提交。

2)在提交之前,我们要根据每个rowkey找到它们归属的region server,这个定位的过程是通过hconnection的locateregion方法获得的,然后再把这些rowkey按照hregionlocation分组。

3)通过多线程,一个hregionlocation构造multiservercallable<row>,然后通过rpccallerfactory.<multiresponse> newcaller()执行调用,忽略掉失败重新提交和错误处理,客户端的提交操作到此结束。

对于delete,我们也可以通过以下代码执行一个delete操作。

这个操作比较干脆,new一个regionservercallable<boolean>,直接走rpc了,爽快啊。

这里面注意一下这行mutateresponse response = getstub().mutate(null, request);

getstub()返回的是一个clientservice.blockinginterface接口,实现这个接口的类是hregionserver,这样子我们就知道它在服务端执行了hregionserver里面的mutate方法。

get操作也和delete一样简单。

get操作也没几行代码,还是直接走的rpc。

注意里面的protobufutil.get操作,它其实是构建了一个getrequest,需要的参数是regionname和get,然后走hregionserver的get方法,返回一个getresponse。

针对put、delete、get都有相应的操作的方式:

1.put(list)操作,很多童鞋以为这个可以提高写入速度,其实无效。。。为啥?因为你构造了一个list进去,它再遍历一下list,执行doput操作。。。。反而还慢点。

2.delete和get的批量操作走的都是connection.processbatchcallback(actions, tablename, pool, results, callback),具体的实现在hconnectionmanager的静态类hconnectionimplementation里面,结果我们惊人的发现:

它走的还是put一样的操作,既然是一样的,何苦代码写得那么绕呢?

现在讲一下scan吧,这个操作相对复杂点。还是老规矩,先上一下代码吧。

scan查询的时候,设置startrow和stoprow可是重头戏,假设我这里要查我01月01日到04月29日总共发了多少业务,中间是业务类型,但是我可能是所有的都查,或者只查一部分,在所有都查的情况下,我就不能设置了,那但是startrow和stoprow我不能空着啊,所以这里可以填00000-zzzzz,只要保证它在这个区间就可以了,然后我们加了一个rowfilter,然后引入了正则表达式,之前好多人一直在问啊问的,不过我这个例子,其实不要也可以,因为是查所有业务的,在startrow和stoprow之间的都可以要。

好的,我们接着看,f3进入getscanner方法。

这个scan还分大小, 没关系,我们进入clientscanner看一下吧, 在clientscanner的构造方法里面发现它会去调用nextscanner去初始化一个scannercallable。好的,我们接着来到scannercallable里面,这里需要注意的是它的两个方法,prepare和call方法。在prepare里面它主要干了两个事情,获得region的hregionlocation和clientservice.blockinginterface接口的实例,之前说过这个继承这个接口的只有region server的实现类。

ok,我们下面看看call方法吧。

在call方法里面,我们可以看得出来,实例化scanrequest,然后调用scan方法的时候把payloadcarryingrpccontroller传过去,这里跟踪了一下,如果设置了codec的就从payloadcarryingrpccontroller里面返回结果,否则从response里面返回。

好的,下面看next方法吧。

从next方法里面可以看出来,它是一次取caching条数据,然后下一次获取的时候,先把上次获取的最后一个给排除掉,再获取下来保存在cache当中,只要缓存不空,就一直在缓存里面取。

好了,至此scan到此结束。

继续阅读