天天看点

HBase源码分析之HRegionServer上compact流程分析

        前面三篇文章中,我们详细叙述了compact流程是如何在HRegion上进行的,了解了它的很多细节方面的问题。但是,这个compact在HRegionServer上是如何进行的?合并时文件是如何选择的呢?在这篇文章中,你将找到答案!

        首先,在HRegionServer内部,我们发现,它定义了一个CompactSplitThread类型的成员变量compactSplitThread,单看字面意思,这就是一个合并分裂线程,那么它会不会就是HRegionServer上具体执行合并的工作线程呢?我们一步一步来看。

        要了解它是什么,能够做什么,那么就必须要看看它的实现,找到CompactSplitThread类,so,开始我们的分析之旅吧!

        首先,看下CompactSplitThread中都定义可哪些变量,如下:

        其中,关于Region的Spilt、Merge相关的成员变量我们暂时忽略,等到专门讲解split、merge时再单独介绍。这里,先了解下CompactSplitThread中都有哪些关于compact的成员变量,大体可以分为三类:

        1、第一类是配置参数及其默认值相关的,涉及到large、small合并线程数参数和其默认值以及HBase整体配置变量Configuration类型的conf;

        2、第二类是线程池,包括long合并线程池longCompactions和short合并线程池shortCompactions,它们统一使用的Java中的ThreadPoolExecutor;

        3、第三类是CompactSplitThread的载体,或者说工作的环境,HRegionServer实例server。

        既然已经存在合并的线程池,那么很简单,将合并线程扔到线程池中等待调度就是了。那么是由哪些方法来完成的这一步呢?答案就在requestCompaction()及requestSystemCompaction()系列方法,而这一系列的requestCompaction()和requestSystemCompaction()方法参数不同,也仅意味着应用场景不同而已,最终还是要落到requestCompactionInternal()方法上的。同时,需要强调一点,requestCompaction()方法和requestSystemCompaction()方法有一个显著的区别,那就是在最终调用requestCompactionInternal()方法时,前者传入的selectNow为true,而后者传入的selectNow为false,这点需要特别注意下,下面也会讲到。先撇开都哪些地方会调用requestCompaction()系列方法,也就是compact发起的时机、条件等,我们后续会分析,这里我们先来看下requestCompactionInternal(),代码如下:

        直接说下大体流程吧!首先,需要做一些必要的检查,比如比如HRegionServer是否已停止、HRegion对应的表是否允许Compact操作等,然后这里有一个关键的地方,就是上述的selectNow,如果不是system compaction,selectNow为true,也就意味着它需要调用selectCompaction()方法,获取CompactionContext,而这本质上就是要选取待合并文件。我们先看下selectCompaction()方法,代码如下:

        而这个方法最终还是调用HStore的requestCompaction()方法来获取CompactionContext,继续分析:

        这里我们只叙述下主要过程,requestCompaction()方法的处理逻辑大体如下:

        1、如果对应HRegion不可写,直接返回null;

        2、在我们做合并之前,试着摆脱不必要的文件来简化事情;

        3、通过存储引擎storeEngine创建合并上下文CompactionContext类型的compaction;

        4、加读锁;

        5、如果存在协处理器:通过CompactionContext的preSelect()方法,选择StoreFile,返回StoreFilel列表;

        6、如果合并请求为空,即不存在协处理器:调用CompactionContext的select()方法,初始化compaction中的合并请求requst;

        7、如果之前传入的请求baseRequest不为空,则合并之;

        8、获取合并请求request;

        9、从合并请求request中获取待合并文件集合selectedFiles;

        10、将选择的文件集合加入到filesCompacting中,解答了之前文章的疑问;

        11、设置标志位forceMajor:是否为major合并;

        12、request中设置优先级、设置描述信息;

        13、解除读锁;

        14、调用HRegion的reportCompactionRequestStart()方法,汇报一个compact请求开始;

        15、返回合并上下文compaction。

        现在我们着重看下如何通过调用CompactionContext的select()方法初始化compaction中的合并请求requst,其他步骤比较简单,在此不一一叙述了。

        现在我们就看下其默认实现类DefaultCompactionContext中的select()方法,代码如下:

        它是利用合并策略compactionPolicy的selectCompaction()方法,获取合并请求request。那么按照上面讲的,我看下合并策略的一种实现RatioBasedCompactionPolicy的selectCompaction()方法实现,代码如下:

        我们捡重点的说,大体流程如下:

        1、根据传入的参数candidateFiles,创建一个候选的StoreFile列表;

               candidateFiles为通过storeFileManager.getStorefiles()方法获取的Store下的全部存储文件。

        2、确定futureFiles,如果filesCompacting为空则为0,否则为1;

        3、从候选列表candidateSelection中排除正在合并的文件,即filesCompacting中的文件;

        4、验证是否包含所有文件,设置标志位isAllFiles,判断的条件就是此时的候选列表candidateSelection大小是否等于初始的candidateFiles列表大小,而candidateFiles代表了Store下的全部文件;

        5、如果不是强制的Major合并,且不包含所有的文件,则调用skipLargeFiles()方法,跳过较大文件,并再次确定标志位isAllFiles;

        6、确定isTryingMajor,共两种情况:

            (1)强制Major合并,且包含所有问文件,且是一个用户合并;

            (2)强制Major合并,且包含所有问文件,或者本身就是一个Major合并,同时,必须是candidateSelection的数目小于配置的达到合并条件的最大文件数目;

        7、candidates中存在引用的话,则视为是在分裂后的文件,即isAfterSplit为true;

        8、如果不是TryingMajor,且不是在分裂后isAfterSplit,再次筛选文件:

              8.1、通过filterBulk()方法取出不应该位于Minor合并的文件;

              8.2、通过applyCompactionPolicy()方法,使用一定的算法,进行文件的筛选;

              8.3、通过checkMinFilesCriteria()方法,判断是否满足合并时最小文件数的要求;

        9、通过removeExcessFiles()方法在candidateSelection中移除过量的文件;

        10、查看是否为全部文件:再次确定标志位isAllFiles;

        11、利用candidateSelection构造合并请求CompactionRequest对象result;

        12、设置请求中的标志位;

        13、返回合并请求CompactionRequest对象result。

        我们主要分析下其中文件筛选的一些方法。

        首先看跳过大文件的skipLargeFiles()方法,代码如下:

        它会遍历文件列表candidates,最主要的一个判断,列表指定位置的文件大小是否超过阈值comConf.getMaxCompactSize(),这个阈值优先取参数hbase.hstore.compaction.max.size,参数未配置的话取Long.MAX_VALUE。

        其次再看下取出不应该位于Minor合并的文件的filterBulk()方法,代码如下:

        它根据StoreFile的标志位excludeFromMinorCompaction判断,而excludeFromMinorCompaction为true是当HFile信息的元数据中存在EXCLUDE_FROM_MINOR_COMPACTION标志时设置的,说了这么多,其实它就是要排除BulkLoad进入HBase的文件!

        然后,我们再看下比较复杂的applyCompactionPolicy()方法,代码如下:

        这个applyCompactionPolicy()方法是RatioBasedCompactionPolicy合并策略的精髓,我们需要细细分析,它的主要步骤为:

        1、如果文件列表为空,原样返回;

        2、获取文件合并比例:取参数hbase.hstore.compaction.ratio,默认为1.2,如果可以在峰值使用,取参数hbase.hstore.compaction.ratio.offpeak,默认为5.0,也就是说将参数调整大些;

        3、计算待合并文件数目countOfFiles;

        4、定义用于存放文件大小的数组fileSizes;

        5、定义用于存放该文件之后在最大文件数这个范围内所有文件(包含该文件)大小合计的数组sumSize;

        6、倒序遍历candidates文件列表:

              6.1、将文件大小放入数组fileSizes指定位置;

              6.2、tooFar表示后移动最大文件数位置的文件大小,其实也就是从i开始刚刚满足达到最大文件数位置的那个文件,也就是说,从i至tooFar数目为合并时允许的最大文件数,它类似于一个平滑的文件窗口;

              6.3、计算合计:该文件大小fileSizes[i] + (截止到下一个文件大小sumSize[i + 1]) - 后移动最大文件数位置的文件大小,也就是说sumSize[i]对应的被统计文件,永远是满足合并时允许的最大文件数这个阈值的,它相当于一个滑动的区间,区间大小为合并时允许的最大文件数,sumSize[i]对应的值为已该i开始所处文件窗口的所有文件大小合计。

        7、正序循环,如果文件数目满足最小合并时允许的最小文件数,且该位置的文件大小,大于合并时允许的文件最小大小与下一个文件窗口文件总大小乘以一定比例中的较大者,则继续,实际上就是选择出一个文件窗口内能兼顾最小文件数和最小文件大小的一组文件;

        8、保证最小文件数目的要求,必要时进行截取;

        9、截取并返回截取后的文件列表。

        上面的一个中心思想就是选出满足条件的最小的一组文件来合并。

        紧接着,我们看下检测是否满足最小文件数大的checkMinFilesCriteria()方法,代码如下:

        很直接有木有,不满足合并时最小文件数要求,直接clear,太奔放了!

        最后,我们看下如何移除过量的文件,即removeExcessFiles()方法,代码如下:

        它是要求待合并文件数不能超过系统设置的合并时最大文件数。

        至此,合并请求的生成和文件的选择就到此为止了。

        接下来再回到CompactSplitThread的requestCompactionInternal()方法,看下它对线程池是如何处理的。这里,它首先假设大部分合并都是small,所以它将系统引发的合并放进small pool,然后在特定的时机再做决断,如果有必要的话会挪至large pool。也就是说,如果selectNow为false,即系统自身引发的合并,比如MemStore flush、compact检查线程等,统一放入到shortCompactions中,即small pool;而如果是人为触发的,即selectNow为true,比如HBase

shell触发的,则还要看HStore中合并请求大小是否超过阈值,超过则放入longCompactions,即large pool,否则还是small pool。

        那么这个HStore中合并请求大小是否超过阈值是如何计算的呢?我们跟踪下HStore的throttleCompaction()方法,代码如下:

        它实际上是调用的合并策略CompactionPolicy的throttleCompaction()方法。那么,都有哪几种合并策略呢?总结起来,一共有两种:RatioBasedCompactionPolicy和StripeCompactionPolicy。现在我们以RatioBasedCompactionPolicy为例来讲,另一种StripeCompactionPolicy以后再分析。看下它的throttleCompaction()方法:

        它是将传入的compactionSize与comConf.getThrottlePoint()来比较的,传入的compactionSize实际上为上面提到的compaction.getRequest().getSize(),也就是合并请求的大小totalSize,这个totalSize是通过CompactionRequest的recalculateSize()方法计算得到的,代码如下:

        它遍历待合并文件StoreFile,获取其Reader,通过它获得文件长度并累加至totalSize。

        而comConf是其父类CompactionPolicy中关于compact配置的CompactionConfiguration类型成员变量,其getThrottlePoint()方法如下:

        实际上取得是CompactionConfiguration的成员变量throttlePoint,而throttlePoint在其构造方法中定义如下:

        优先取参数hbase.regionserver.thread.compaction.throttle,如果参数未配置,默认为最大合并文件数maxFilesToCompact与MemStore flush大小的两倍,而这个maxFilesToCompact的取值如下:

        也就是取参数hbase.hstore.compaction.max,参数未配置的话默认为10。那么MemStore flush大小是如何获取的呢?它实际上是通过StoreConfigInformation接口的getMemstoreFlushSize()方法获取的,而需要使用的最终实现该方法的类,还是HStore,代码如下:

        各位看官可能有疑问了,既然compact是以Store为单位进行的,为什么这里获取的是region的memstoreFlushSize呢?我们知道,HBase并不是一个纯粹意义上的列式数据库,它的MemStore flush的发起,并不是以Store为单位进行的,而是整个Region,这也是HBase一开始饱受诟病的列簇Column Family不能过多的原因。那么,这里的memstoreFlushSize就可以很容易理解为什么要获取Region的了。

        这个memstoreFlushSize我们之前介绍过,这里再回顾下,memstoreFlushSize为HRegion上设定的一个阈值,当MemStore的大小超过这个阈值时,将会发起flush请求,它的计算首先是由Table决定的,即每个表可以设定自己的memstoreFlushSize,通过关键字MEMSTORE_FLUSHSIZE来设定,如果MEMSTORE_FLUSHSIZE未设定,则取参数hbase.hregion.memstore.flush.size,参数未配置的话,则默认为1024*1024*128L,即128M。

        用俺们山东人的话来说,落落了这么多,到底是什么意思呢?很简单,它就是看合并请求中涉及的数据量大小是否超过一个阈值,超过则放入large pool,未超过则放入small pool。这个阈值可以通过参数直接配置,不配置的话,则是最大可合并文件数与引起MemStore的flush的阈值memstoreFlushSize的两倍,这个memstore flush到文件中,是不是就是文件的总大小呢?文件数乘以文件大小,是不是逻辑上近似于待合并数据的大小呢?大体就是这么个意思。

        好了,“数据”的目的地--线程池选好了,接下来就是该把“数据”放入线程池了。既然是线程池,那么这个“数据”就应该是一个线程,我们继续看。

        这一句体现的再明白不过了,将HStore、HRegion、合并上下文CompactionContext、线程池ThreadPoolExecutor包装成一个CompactionRunner对象,扔入线程池中执行。而CompactionRunner给我们的第一印象就是,它必定是一个可执行的线程。那么我们就看下它的代码吧:

        先看类的定义,类的定义中直接体现了,实现了Runnable接口意味着它是一个线程。而它除了构造函数传入的那四个成员变量外,还有个表示优先级的成员变量queuedPriority,它的初始化是在构造方法中完成的。如果合并上下文compaction为空,则通过HStore的getCompactPriority()方法获取,否则直接从合并请求中获取,而合并请求中的,实际上也是通过调用requestCompactionInternal()方法的priority传入的。我们接下来看下HStore的getCompactPriority()方法:

        它转而从StoreFileManager中获取Compact Priority,继续吧!在StoreFileManager的默认实现DefaultStoreFileManager中,代码如下:

        优先级为上述blockingFileCount减去当前storefiles的数目。而blockingFileCount优先取参数hbase.hstore.blockingStoreFiles,未配置的话再默认为7。还记得isTooManyStoreFiles这个方法吗?MemStore在进行flush时会判断HRegion上每个HStore下的文件数是否太多,太多则意味着MemStore的flush会被推迟进行,优先进行compact,否则文件数则会越来越多,而这里,离blockingFileCount越远,当前文件数越小的话,则意味着MemStore的flush可以优先进行,而compact可以在它flush之后再进行,将资源利用效率最大化。

        接下来,我们在看下CompactionRunner中最重要的run()方法,代码如下:

        run()方法以上来,也是会首选做一些必要的环境判断,比如HRegionServer是否已停止、HRegion对应的表是否允许Compact操作等。

        然后,针对compaction为null的情况,进行compaction的初始化,即待合并文件的选择。在这个过程之前,会先判断下优先级,之前的Compact优先级赋值给oldPriority,获取HStore的Compact优先级,如果当前优先级queuedPriority大于之前的oldPriority的话,即HStore下文件数目减少了,则会推迟compact,可以优先进行flush,将该CompactionRunner再扔回线程池。如果优先级满足条件,则继续,通过selectCompaction()选择待合并文件,并再次判断下是应该在large池中执行还是应该在small池中执行,此次只根据上述的那个阈值来判断。

        接下来,如果换池了,HStore调用cancelRequestedCompaction()方法取消合并请求,复位compaction为null,换池,并再次放入线程池,后续会再初始化compaction,然后就return。

        如果没换池的话,确保compaction不为空,调用HRegion的compact,针对store执行compact,计算执行时间,并获得compact的执行结果,根据合并结果确定下一步操作。

        如果合并成功,如果优先级Priority小于等于0,意味着当前文件已经太多,则需要发起一次SystemCompaction,否则请求分裂,实际上是看Region的大小是否超过阈值,从而引起分裂。

        整个CompactSplitThread的工作流程已描述完毕。那么接下来的问题,就是何时什么情况下会发起compact请求?发起的compact请求又有如何不同呢?是否会有定期检查的工作线程,促使compact在满足一定条件的情况下进行呢?

        且听下回分解。