天天看點

控制Hive MAP個數詳解

Hive的MAP數或者說MAPREDUCE的MAP數是由誰來決定的呢?inputsplit size,那麼對于每一個inputsplit size是如何計算出來的,這是做MAP數調整的關鍵.

HADOOP給出了Inputformat接口用于描述輸入資料的格式,其中一個關鍵的方法就是getSplits,對輸入的資料進行分片.

Hive對InputFormat進行了封裝:

<a href="http://blog.51cto.com/attachment/201310/213505502.png" target="_blank"></a>

而具體采用的實作是由參數hive.input.format來決定的,主要使用2中類型HiveInputFormat和CombineHiveInputFormat.

對于HiveInputFormat來說:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

<code>public</code> <code>InputSplit[] getSplits(JobConf job, </code><code>int</code> <code>numSplits) </code><code>throws</code> <code>IOException {</code>

<code>    </code><code>//掃描每一個分區</code>

<code>    </code><code>for</code> <code>(Path dir : dirs) {</code>

<code>      </code><code>PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);</code>

<code>    </code><code>//擷取分區的輸入格式</code>

<code>      </code><code>Class inputFormatClass = part.getInputFileFormatClass();</code>

<code>      </code><code>InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);</code>

<code>    </code><code>//按照相應格式的分片算法擷取分片</code>

<code>    </code><code>//注意:這裡的Inputformat隻是old version API:org.apache.hadoop.mapred而不是org.apache.hadoop.mapreduce,是以不能采用新的API,否則在查詢時會報異常:Input format must implement InputFormat.差別就是新的API的計算inputsplit size(Math.max(minSize, Math.min(maxSize, blockSize))和老的(Math.max(minSize, Math.min(goalSize, blockSize)))不一樣;</code>

<code>      </code><code>InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);</code>

<code>      </code><code>for</code> <code>(InputSplit is : iss) {</code>

<code>    </code><code>//封裝結果,傳回</code>

<code>        </code><code>result.add(</code><code>new</code> <code>HiveInputSplit(is, inputFormatClass.getName()));</code>

<code>      </code><code>}</code>

<code>    </code><code>}</code>

<code>    </code><code>return</code> <code>result.toArray(</code><code>new</code> <code>HiveInputSplit[result.size()]);</code>

<code>}</code>

對于CombineHiveInputFormat來說的計算就比較複雜了:

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

<code>    </code><code>//加載CombineFileInputFormatShim,這個類繼承了org.apache.hadoop.mapred.lib.CombineFileInputFormat</code>

<code>    </code><code>CombineFileInputFormatShim combine = ShimLoader.getHadoopShims()</code>

<code>        </code><code>.getCombineFileInputFormat();</code>

<code>if</code> <code>(combine == </code><code>null</code><code>) {</code>

<code>//若為空則采用HiveInputFormat的方式,下同</code>

<code>      </code><code>return</code> <code>super</code><code>.getSplits(job, numSplits);</code>

<code>    </code><code>Path[] paths = combine.getInputPathsShim(job);</code>

<code>for</code> <code>(Path path : paths) {</code>

<code>//若是外部表,則按照HiveInputFormat方式分片</code>

<code>      </code><code>if</code> <code>((tableDesc != </code><code>null</code><code>) &amp;&amp; tableDesc.isNonNative()) {</code>

<code>        </code><code>return</code> <code>super</code><code>.getSplits(job, numSplits);</code>

<code>      </code><code>String inputFormatClassName = inputFormatClass.getName();</code>

<code>      </code><code>if</code> <code>(</code><code>this</code><code>.mrwork != </code><code>null</code> <code>&amp;&amp; !</code><code>this</code><code>.mrwork.getHadoopSupportsSplittable()) {</code>

<code>        </code><code>if</code> <code>(inputFormat </code><code>instanceof</code> <code>TextInputFormat) {</code>

<code>         </code><code>if</code> <code>((</code><code>new</code> <code>CompressionCodecFactory(job)).getCodec(path) != </code><code>null</code><code>)</code>

<code>//在未開啟hive.hadoop.supports.splittable.combineinputformat(MAPREDUCE-1597)參數情況下,對于TextInputFormat并且為壓縮則采用HiveInputFormat分片算法</code>

<code>                    </code><code>return</code> <code>super</code><code>.getSplits(job, numSplits);</code>

<code>        </code><code>}</code>

<code>    </code><code>//對于連接配接式同上</code>

<code>      </code><code>if</code> <code>(inputFormat </code><code>instanceof</code> <code>SymlinkTextInputFormat) {</code>

<code>      </code><code>CombineFilter f = </code><code>null</code><code>;</code>

<code>      </code><code>boolean</code> <code>done = </code><code>false</code><code>;</code>

<code>Path filterPath = path;</code>

<code>//由參數hive.mapper.cannot.span.multiple.partitions控制,預設false;如果沒true,則對每一個partition建立一個pool,以下省略為true的處理;對于同一個表的同一個檔案格式的split建立一個pool為combine做準備;</code>

<code>      </code><code>if</code> <code>(!mrwork.isMapperCannotSpanPartns()) {</code>

<code>        </code><code>opList = HiveFileFormatUtils.doGetWorksFromPath(</code>

<code>                   </code><code>pathToAliases, aliasToWork, filterPath);</code>

<code>        </code><code>f = poolMap.get(</code><code>new</code> <code>CombinePathInputFormat(opList, inputFormatClassName));</code>

<code>      </code><code>if</code> <code>(!done) {</code>

<code>        </code><code>if</code> <code>(f == </code><code>null</code><code>) {</code>

<code>          </code><code>f = </code><code>new</code> <code>CombineFilter(filterPath);</code>

<code>          </code><code>combine.createPool(job, f);</code>

<code>        </code><code>} </code><code>else</code> <code>{</code>

<code>          </code><code>f.addPath(filterPath);</code>

<code>if</code> <code>(!mrwork.isMapperCannotSpanPartns()) {</code>

<code>//到這裡才調用combine的分片算法,繼承了org.apache.hadoop.mapred.lib.CombineFileInputFormat extends 新版本CombineFileInputformat</code>

<code>      </code><code>iss = Arrays.asList(combine.getSplits(job, </code><code>1</code><code>));</code>

<code>//對于sample查詢特殊處理</code>

<code>    </code><code>if</code> <code>(mrwork.getNameToSplitSample() != </code><code>null</code> <code>&amp;&amp; !mrwork.getNameToSplitSample().isEmpty()) {</code>

<code>      </code><code>iss = sampleSplits(iss);</code>

<code>//封裝結果傳回</code>

<code>    </code><code>for</code> <code>(InputSplitShim is : iss) {</code>

<code>      </code><code>CombineHiveInputSplit csplit = </code><code>new</code> <code>CombineHiveInputSplit(job, is);</code>

<code>      </code><code>result.add(csplit);</code>

<code>    </code><code>return</code> <code>result.toArray(</code><code>new</code> <code>CombineHiveInputSplit[result.size()]);</code>

<code>  </code><code>}</code>

具體combine的getSplits算法如下:

<code>public</code> <code>List&lt;InputSplit&gt; getSplits(JobContext job)</code>

<code>    </code><code>throws</code> <code>IOException {</code>

<code>        </code><code>//決定切分的幾個參數</code>

<code>    </code><code>if</code> <code>(minSplitSizeNode != </code><code>0</code><code>) {</code>

<code>      </code><code>minSizeNode = minSplitSizeNode;</code>

<code>    </code><code>} </code><code>else</code> <code>{</code>

<code>      </code><code>minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, </code><code>0</code><code>);</code>

<code>    </code><code>if</code> <code>(minSplitSizeRack != </code><code>0</code><code>) {</code>

<code>      </code><code>minSizeRack = minSplitSizeRack;</code>

<code>      </code><code>minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, </code><code>0</code><code>);</code>

<code>    </code><code>if</code> <code>(maxSplitSize != </code><code>0</code><code>) {</code>

<code>      </code><code>maxSize = maxSplitSize;</code>

<code>      </code><code>maxSize= = conf.getLong(</code><code>"mapreduce.input.fileinputformat.split.maxsize"</code><code>, </code><code>0</code><code>);</code>

<code>        </code><code>for</code> <code>(MultiPathFilter onepool : pools) {</code>

<code>      </code><code>ArrayList&lt;Path&gt; myPaths = </code><code>new</code> <code>ArrayList&lt;Path&gt;();</code>

<code>      </code><code>// create splits for all files in this pool.</code>

<code>      </code><code>getMoreSplits(job, myPaths.toArray(</code><code>new</code> <code>Path[myPaths.size()]),</code>

<code>                    </code><code>maxSize, minSizeNode, minSizeRack, splits);</code>

跳到getMoreSplits:主要是填充如下資料結構,

<code>// all blocks for all the files in input set</code>

<code>    </code><code>OneFileInfo[] files;</code>

<code>    </code><code>// mapping from a rack name to the list of blocks it has</code>

<code>    </code><code>HashMap&lt;String, List&lt;OneBlockInfo&gt;&gt; rackToBlocks = </code><code>new</code> <code>HashMap&lt;String, List&lt;OneBlockInfo&gt;&gt;();</code>

<code>    </code><code>// mapping from a block to the nodes on which it has replicas</code>

<code>    </code><code>HashMap&lt;OneBlockInfo, String[]&gt; blockToNodes = </code><code>new</code> <code>HashMap&lt;OneBlockInfo, String[]&gt;();</code>

<code>    </code><code>// mapping from a node to the list of blocks that it contains</code>

<code>    </code><code>HashMap&lt;String, List&lt;OneBlockInfo&gt;&gt; nodeToBlocks = </code><code>new</code> <code>HashMap&lt;String, List&lt;OneBlockInfo&gt;&gt;();</code>

大概流程則是(這裡blockInfo生成略過不表,可以參考MAPREDUCE-2046):

1.首先處理每個Datanode的blockInfo,先按照&gt;=maxsplitsize來切分split,剩餘的再按照blockinfo&gt;=minSplitSizeNode切分,其餘的等和rack的其餘blockinfo進行合并

2.其次對每個Rack進行處理:先按照&gt;=maxsplitsize來切分split,剩餘的再按照blockinfo&gt;=minSplitSizeRack切分,其餘的等和overflow的其餘blockinfo進行合并

3.對于overflow blockInfo直接根據maxsplitsize來進行切分.

其餘影響MAP數的參數比較好了解了:

1.影響在MAPREDUCE後是否會啟動MAP進行檔案合并

hive.merge.mapfiles,hive.merge.mapredfiles,hive.merge.size.per.task(default=256 * 1000 * 1000),hive.merge.smallfiles.avgsize(default=16 * 1000 * 1000)

2.影響是否存在skew開啟多MAP:

hive.groupby.skewindata=false:

當該參數有true時會生成2個MR:

第一個MR的分區鍵是grouping key+distinct key,通過hash配置設定到reduce進行第一次聚合操作

第二個MR的分區鍵則是grouping key進行第二次聚合;(2個MR的sort key都是grouping key+distinct key)

<a href="https://issues.apache.org/jira/browse/HIVE-5118" target="_blank">https://issues.apache.org/jira/browse/HIVE-5118</a>

hive.optimize.skewjoin=false

hive.optimize.skewjoin.compiletime=false

hive.skewjoin.key=100000

hive.skewjoin.mapjoin.map.tasks=10000

hive.skewjoin.mapjoin.min.split=33554432

3.mapreduce參數,是否開啟map speculative

4.bucket table.

對于MAP/REDUCE的性能分析放到下一篇再說吧

本文轉自MIKE老畢 51CTO部落格,原文連結:http://blog.51cto.com/boylook/1316432,如需轉載請自行聯系原作者