DFSClient Hedged Read是Hadoop-2.4.0引入的一个新特性,如果读取一个数据块的操作比较慢,DFSClient Hedged Read将会开启一个从另一个副本的hedged读操作。我们会选取首先完成的操作,并取消其它操作。这个Hedged读特性将有助于控制异常值,比如由于命中一个坏盘等原因而需要花费较长时间的异常阅读等。
DFSClient Hedged Read特性默认是关闭的。如果要开启,则需配置如下:
1、dfs.client.hedged.read.threadpool.size
并发Hedged 读的线程池大小
2、dfs.client.hedged.read.threshold.millis
开启一个Hedged 读前的等待时间(毫秒)
三、实现分析
1、DFSClient实现
DFSClient中,定义了一个静态线程池:
DFSClient的构造函数中,有如下处理:
根据参数dfs.client.hedged.read.threadpool.size确定是否实例化线程池,而initThreadsNumForHedgedReads()方法如下:
实例化了一个ThreadPoolExecutor,corePoolSize大小是1,maximumPoolSize大小是参数,workQueue为一个没有数据缓冲的阻塞队列,ThreadFactory是Hadoop自己实现的后台线程工厂,并自定义了RejectedExecutionHandler,主要是在有异常时实现HEDGED_READ_METRIC.incHedgedReadOpsInCurThread(),即计数器减1。
最后,DFSClient提供了如下几个get和set方法,方便输入流调用:
2、DFSInputStream实现
在输入流DFSInputStream的read方法中,会通过dfsClient.isHedgedReadsEnabled()判断是否开启了Hedged Read特性,在其开启的情况下,调用hedgedFetchBlockByteRange()方法进行数据读取操作,如下:
hedgedFetchBlockByteRange()方法通过ExecutorCompletionService和Future List实现了Hedged Read特性,具体实现如下:
1、构造一个futures列表:
2、构造一个ExecutorCompletionService:
3、计算数据块和长度;
4、在一个while循环内,分两种情况:
1)第一次读取时:从NameNode选取DataNode,即chooseDataNode,构造Callable并提交至hedgedService,获取Future<ByteBuffer> firstRequest,然后
用非阻塞的poll获取结果future,判断future是否成功,成功即返回,否则在ignored中添加下次需要忽略的本节点,incHedgedReadOps计数并继续;
2)通过getBestNodeDNAddrPair或chooseDataNode选取DataNode,构造Callable并提交至hedgedService,通过getFirstToComplete获取第一个成功的结果后,调用cancelAll取消其它的,并计数,否则也是计数外加忽略本次DataNode。
getFirstToComplete中,是通过阻塞式的hedgedService.take()来实现的。
具体代码如下:
而getFirstToComplete实现如下:
over...