Hive的MAP數或者說MAPREDUCE的MAP數是由誰來決定的呢?inputsplit size,那麼對于每一個inputsplit size是如何計算出來的,這是做MAP數調整的關鍵.
HADOOP給出了Inputformat接口用于描述輸入資料的格式,其中一個關鍵的方法就是getSplits,對輸入的資料進行分片.
Hive對InputFormat進行了封裝:
<a href="http://blog.51cto.com/attachment/201310/213505502.png" target="_blank"></a>
而具體采用的實作是由參數hive.input.format來決定的,主要使用2中類型HiveInputFormat和CombineHiveInputFormat.
對于HiveInputFormat來說:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<code>public</code> <code>InputSplit[] getSplits(JobConf job, </code><code>int</code> <code>numSplits) </code><code>throws</code> <code>IOException {</code>
<code> </code><code>//掃描每一個分區</code>
<code> </code><code>for</code> <code>(Path dir : dirs) {</code>
<code> </code><code>PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);</code>
<code> </code><code>//擷取分區的輸入格式</code>
<code> </code><code>Class inputFormatClass = part.getInputFileFormatClass();</code>
<code> </code><code>InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);</code>
<code> </code><code>//按照相應格式的分片算法擷取分片</code>
<code> </code><code>//注意:這裡的Inputformat隻是old version API:org.apache.hadoop.mapred而不是org.apache.hadoop.mapreduce,是以不能采用新的API,否則在查詢時會報異常:Input format must implement InputFormat.差別就是新的API的計算inputsplit size(Math.max(minSize, Math.min(maxSize, blockSize))和老的(Math.max(minSize, Math.min(goalSize, blockSize)))不一樣;</code>
<code> </code><code>InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);</code>
<code> </code><code>for</code> <code>(InputSplit is : iss) {</code>
<code> </code><code>//封裝結果,傳回</code>
<code> </code><code>result.add(</code><code>new</code> <code>HiveInputSplit(is, inputFormatClass.getName()));</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code> </code><code>return</code> <code>result.toArray(</code><code>new</code> <code>HiveInputSplit[result.size()]);</code>
<code>}</code>
對于CombineHiveInputFormat來說的計算就比較複雜了:
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
<code> </code><code>//加載CombineFileInputFormatShim,這個類繼承了org.apache.hadoop.mapred.lib.CombineFileInputFormat</code>
<code> </code><code>CombineFileInputFormatShim combine = ShimLoader.getHadoopShims()</code>
<code> </code><code>.getCombineFileInputFormat();</code>
<code>if</code> <code>(combine == </code><code>null</code><code>) {</code>
<code>//若為空則采用HiveInputFormat的方式,下同</code>
<code> </code><code>return</code> <code>super</code><code>.getSplits(job, numSplits);</code>
<code> </code><code>Path[] paths = combine.getInputPathsShim(job);</code>
<code>for</code> <code>(Path path : paths) {</code>
<code>//若是外部表,則按照HiveInputFormat方式分片</code>
<code> </code><code>if</code> <code>((tableDesc != </code><code>null</code><code>) && tableDesc.isNonNative()) {</code>
<code> </code><code>return</code> <code>super</code><code>.getSplits(job, numSplits);</code>
<code> </code><code>String inputFormatClassName = inputFormatClass.getName();</code>
<code> </code><code>if</code> <code>(</code><code>this</code><code>.mrwork != </code><code>null</code> <code>&& !</code><code>this</code><code>.mrwork.getHadoopSupportsSplittable()) {</code>
<code> </code><code>if</code> <code>(inputFormat </code><code>instanceof</code> <code>TextInputFormat) {</code>
<code> </code><code>if</code> <code>((</code><code>new</code> <code>CompressionCodecFactory(job)).getCodec(path) != </code><code>null</code><code>)</code>
<code>//在未開啟hive.hadoop.supports.splittable.combineinputformat(MAPREDUCE-1597)參數情況下,對于TextInputFormat并且為壓縮則采用HiveInputFormat分片算法</code>
<code> </code><code>return</code> <code>super</code><code>.getSplits(job, numSplits);</code>
<code> </code><code>}</code>
<code> </code><code>//對于連接配接式同上</code>
<code> </code><code>if</code> <code>(inputFormat </code><code>instanceof</code> <code>SymlinkTextInputFormat) {</code>
<code> </code><code>CombineFilter f = </code><code>null</code><code>;</code>
<code> </code><code>boolean</code> <code>done = </code><code>false</code><code>;</code>
<code>Path filterPath = path;</code>
<code>//由參數hive.mapper.cannot.span.multiple.partitions控制,預設false;如果沒true,則對每一個partition建立一個pool,以下省略為true的處理;對于同一個表的同一個檔案格式的split建立一個pool為combine做準備;</code>
<code> </code><code>if</code> <code>(!mrwork.isMapperCannotSpanPartns()) {</code>
<code> </code><code>opList = HiveFileFormatUtils.doGetWorksFromPath(</code>
<code> </code><code>pathToAliases, aliasToWork, filterPath);</code>
<code> </code><code>f = poolMap.get(</code><code>new</code> <code>CombinePathInputFormat(opList, inputFormatClassName));</code>
<code> </code><code>if</code> <code>(!done) {</code>
<code> </code><code>if</code> <code>(f == </code><code>null</code><code>) {</code>
<code> </code><code>f = </code><code>new</code> <code>CombineFilter(filterPath);</code>
<code> </code><code>combine.createPool(job, f);</code>
<code> </code><code>} </code><code>else</code> <code>{</code>
<code> </code><code>f.addPath(filterPath);</code>
<code>if</code> <code>(!mrwork.isMapperCannotSpanPartns()) {</code>
<code>//到這裡才調用combine的分片算法,繼承了org.apache.hadoop.mapred.lib.CombineFileInputFormat extends 新版本CombineFileInputformat</code>
<code> </code><code>iss = Arrays.asList(combine.getSplits(job, </code><code>1</code><code>));</code>
<code>//對于sample查詢特殊處理</code>
<code> </code><code>if</code> <code>(mrwork.getNameToSplitSample() != </code><code>null</code> <code>&& !mrwork.getNameToSplitSample().isEmpty()) {</code>
<code> </code><code>iss = sampleSplits(iss);</code>
<code>//封裝結果傳回</code>
<code> </code><code>for</code> <code>(InputSplitShim is : iss) {</code>
<code> </code><code>CombineHiveInputSplit csplit = </code><code>new</code> <code>CombineHiveInputSplit(job, is);</code>
<code> </code><code>result.add(csplit);</code>
<code> </code><code>return</code> <code>result.toArray(</code><code>new</code> <code>CombineHiveInputSplit[result.size()]);</code>
<code> </code><code>}</code>
具體combine的getSplits算法如下:
<code>public</code> <code>List<InputSplit> getSplits(JobContext job)</code>
<code> </code><code>throws</code> <code>IOException {</code>
<code> </code><code>//決定切分的幾個參數</code>
<code> </code><code>if</code> <code>(minSplitSizeNode != </code><code>0</code><code>) {</code>
<code> </code><code>minSizeNode = minSplitSizeNode;</code>
<code> </code><code>} </code><code>else</code> <code>{</code>
<code> </code><code>minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, </code><code>0</code><code>);</code>
<code> </code><code>if</code> <code>(minSplitSizeRack != </code><code>0</code><code>) {</code>
<code> </code><code>minSizeRack = minSplitSizeRack;</code>
<code> </code><code>minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, </code><code>0</code><code>);</code>
<code> </code><code>if</code> <code>(maxSplitSize != </code><code>0</code><code>) {</code>
<code> </code><code>maxSize = maxSplitSize;</code>
<code> </code><code>maxSize= = conf.getLong(</code><code>"mapreduce.input.fileinputformat.split.maxsize"</code><code>, </code><code>0</code><code>);</code>
<code> </code><code>for</code> <code>(MultiPathFilter onepool : pools) {</code>
<code> </code><code>ArrayList<Path> myPaths = </code><code>new</code> <code>ArrayList<Path>();</code>
<code> </code><code>// create splits for all files in this pool.</code>
<code> </code><code>getMoreSplits(job, myPaths.toArray(</code><code>new</code> <code>Path[myPaths.size()]),</code>
<code> </code><code>maxSize, minSizeNode, minSizeRack, splits);</code>
跳到getMoreSplits:主要是填充如下資料結構,
<code>// all blocks for all the files in input set</code>
<code> </code><code>OneFileInfo[] files;</code>
<code> </code><code>// mapping from a rack name to the list of blocks it has</code>
<code> </code><code>HashMap<String, List<OneBlockInfo>> rackToBlocks = </code><code>new</code> <code>HashMap<String, List<OneBlockInfo>>();</code>
<code> </code><code>// mapping from a block to the nodes on which it has replicas</code>
<code> </code><code>HashMap<OneBlockInfo, String[]> blockToNodes = </code><code>new</code> <code>HashMap<OneBlockInfo, String[]>();</code>
<code> </code><code>// mapping from a node to the list of blocks that it contains</code>
<code> </code><code>HashMap<String, List<OneBlockInfo>> nodeToBlocks = </code><code>new</code> <code>HashMap<String, List<OneBlockInfo>>();</code>
大概流程則是(這裡blockInfo生成略過不表,可以參考MAPREDUCE-2046):
1.首先處理每個Datanode的blockInfo,先按照>=maxsplitsize來切分split,剩餘的再按照blockinfo>=minSplitSizeNode切分,其餘的等和rack的其餘blockinfo進行合并
2.其次對每個Rack進行處理:先按照>=maxsplitsize來切分split,剩餘的再按照blockinfo>=minSplitSizeRack切分,其餘的等和overflow的其餘blockinfo進行合并
3.對于overflow blockInfo直接根據maxsplitsize來進行切分.
其餘影響MAP數的參數比較好了解了:
1.影響在MAPREDUCE後是否會啟動MAP進行檔案合并
hive.merge.mapfiles,hive.merge.mapredfiles,hive.merge.size.per.task(default=256 * 1000 * 1000),hive.merge.smallfiles.avgsize(default=16 * 1000 * 1000)
2.影響是否存在skew開啟多MAP:
hive.groupby.skewindata=false:
當該參數有true時會生成2個MR:
第一個MR的分區鍵是grouping key+distinct key,通過hash配置設定到reduce進行第一次聚合操作
第二個MR的分區鍵則是grouping key進行第二次聚合;(2個MR的sort key都是grouping key+distinct key)
<a href="https://issues.apache.org/jira/browse/HIVE-5118" target="_blank">https://issues.apache.org/jira/browse/HIVE-5118</a>
hive.optimize.skewjoin=false
hive.optimize.skewjoin.compiletime=false
hive.skewjoin.key=100000
hive.skewjoin.mapjoin.map.tasks=10000
hive.skewjoin.mapjoin.min.split=33554432
3.mapreduce參數,是否開啟map speculative
4.bucket table.
對于MAP/REDUCE的性能分析放到下一篇再說吧
本文轉自MIKE老畢 51CTO部落格,原文連結:http://blog.51cto.com/boylook/1316432,如需轉載請自行聯系原作者