天天看点

MapReduce详解(MR运行全流程,shuffle,分区,分片)本文行文逻辑MapReduce程序详解(即map中,reduce中)map前,reduce后详解MapReduce运行全流程(主要介绍map到reduce的其中过程,即shuffle流程)

文章目录

  • 本文行文逻辑
  • MapReduce程序详解(即map中,reduce中)
  • map前,reduce后详解
    • 分片详解
      • 什么是分片?
      • 为何要分片?
      • 分片大小计算?
      • 分片的读取规则
    • 控制maptask和reducetask数量
  • MapReduce运行全流程(主要介绍map到reduce的其中过程,即shuffle流程)
    • MR运行全流程中自定义部分
      • 自定义数据类型
      • 自定义分区
      • Combine
    • 分区详解

本文行文逻辑

分别从MR过程的前,中,后三个阶段对MR进行详细介绍
           
  • 1

MapReduce程序详解(即map中,reduce中)

一种分布式运算程序,分为map阶段和reduce阶段

Map阶段会有一个实体程序,不需要我们自己开发,用户只需要维护map方法就可以

默认情况下map程序读取一行数据(映射成key-value对是一行行的)就会调用一次map方法,而且会将这一行数据的偏移量作为key(即key一定是IntWritable),这一行数据的内容作为value返回给框架,然后由框架写出context.write(key,value)

Reduce 阶段会有一个实体程序,不要我们自己开发我们需要维护reduce方法

Reduce程序会接受map端输出的中间结果数据,而且相同的key的数据会到达同一个reduce实例中去,每个reduce实例会处理多个key的数据。Reduce程序会将自己收集的数据按照key相同进行分组,对一组数据调用一次reduce方法(按组调用reduce方法),并且将参数传给reduce(key,迭代器values,context),然后写出

map前,reduce后详解

MapReduce详解(MR运行全流程,shuffle,分区,分片)本文行文逻辑MapReduce程序详解(即map中,reduce中)map前,reduce后详解MapReduce运行全流程(主要介绍map到reduce的其中过程,即shuffle流程)

分片详解

什么是分片?

在进行map计算之前,map会根据输入文件计算输入分片(input split);每个输入分片(input split)针对一个map任务,输入分片存储的并非是数据本身,而是一个分片长度和一个记录数据的位置的数组。

逻辑概念,分片信息包括起始偏移量,分片大小,分片数据所在的块的信息,块所在的主机列表。

注意:分片是根据输入文件计算的,这些输入文件即是存储在hdfs上的那些个文件。

为何要分片?

答:方便多个Map任务并行处理,提高运作效率。

每一个分片对应着一个maptask,通过调整分片的大小可以调整maptask的数量,也就是调整map阶段的并行度。

分片大小计算?

下面是源程序中对分片大小的计算:

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//返回1

long maxSize = getMaxSplitSize(job);//返回long的最大值

long splitSize = computeSplitSize(blockSize, minSize, maxSize)

return Math.max(minSize, Math.min(maxSize, blockSize));

总计:先比较块大小和最大分片,选出其中较小的,然后将结果和最小分片比较,选出较大的。

//计算分片大小,实际的分片大小。

while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {} //分片有一个1.1倍的冗余

通过这个可以对实际分片大小进行设置,主要从最大分片大小和最小分片大小入手。

(1)FileInputFormat.setMinInputSplitSize(job,1000);

(2)FileInputFormat.setMaxInputSplitSize(job,1000000);

分片的读取规则

  1. 第一个分片从第一行开始读取,读到分片末尾,再读取下一个分片的第一行
  2. 既不是第一个分片也不是最后一个分片,第一行数据舍去,读到分片末尾,再继续读 取下一个分片的第一行数据
  3. 最后一个分片舍去第一行,读到分片末尾

控制maptask和reducetask数量

控制maptask数量:

1)maptask数量由分片数量决定,可设置maxsize,minsize,blocksize来控制分片的大小,进而控制分片数量

2)改变数据总量也可影响maptask数量

控制reducetask数量:

1)job.setNumReduceTasks(5); 直接设置reducetask数量

2)分区数和reducetask数量是一致的,可以调整分区数。

MapReduce运行全流程(主要介绍map到reduce的其中过程,即shuffle流程)

MapReduce详解(MR运行全流程,shuffle,分区,分片)本文行文逻辑MapReduce程序详解(即map中,reduce中)map前,reduce后详解MapReduce运行全流程(主要介绍map到reduce的其中过程,即shuffle流程)
  • org.apache.hadoop.mapred.OutputCollector(即文中的输出收集器),OutputCollector由Hadoop框架提供,负责收集Mapper和Reducer的输出数据,实现map函数和reduce函数时,只需要简单的将其输出的<key,value>对往OutputCollector中一丢即可,剩余的事情框架自会帮你处理好。

    可以理解为map()和reduce()不会每执行一次便写出,是需要积累的,具体流程可以查看底层原理进行了解。

  • 为什么要写入环形缓冲区?

    答: hadoop在执行MapReduce任务的时候,在map阶段,map()函数产生输出以后,并不是直接写入到磁盘中,而是先写入到了环形缓冲区,这样做的原因是无论缓冲区写入还是读出,速度都更快,效率更高

  • 写入磁盘缓冲区的时候,为什么只写入百分之80,那百分之20干啥的?

    答:这个内存缓冲区是有大小限制的,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写,字面意思很直观。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。

  • 分片数据和溢写磁盘文件的对应关系?

    通过上一条问题可知,环形缓冲区会重复利用。

    一个分片数据有可能会产生多个溢写磁盘文件。

  • 内/外部排序针对的是内存,在内存中的排序称之为内部排序,反之,在磁盘等位置的排序称为外部排序。

MR运行全流程中自定义部分

在MR运行全流程中,存在若干过程是设计人员可以进行自定义设计,即人为干预的。

包括:

  • TextInputFormat
  • map()函数
  • 自定义分区(注意:先分区后排序,且是在溢写之前进行分区)
  • 分区内自定义排序(注意:要在分区内进行排序)
  • combine()
  • 传入reduce()函数的key,我们可以自定义key类型,并定义什么样的key算是一样的
  • reduce( )
  • TextOutputFormat

自定义数据类型

1.要实现writable接口

2.读写顺序要一致

3.构造方法如果进行了重写,要显示定义无参的构造方法

4.重写toString方法

自定义分区

数据分发的策略

如何实现自定义分区?

前提要保证相同key的数据会发送到同一个reduce中,或者说是分到同一个分区中

Partitioner<key,value> ,这里的key和value的数据类型和map输出的数据类型保持一致

继承Patitioner这个类,重写分区方法getPartition(map输出key,map输出value,分区数量)

然后在job中设置使用我们自定义的分区方法进行数据分发

/**
  场景:将不同手机号前缀归向不同省份地区
*/
           

public class ProvincePartitioner extends Partitioner<Text,FlowBean>{

private static HashMap<String,Integer>pmap = new HashMap<>();

static{

pmap.put(“136”,0);

pmap.put(“137”,1);

pmap.put(“138”,2);

pmap.put(“139”,3);

}

@Override

public int getPartition(Text key,FlowBean flowBean,int numPartitions){

String prefix = key.toStrirg().substring(0,3); #截取手机号前三位

Integer partNum = pmap.get(prefix);

return(partNum==null?4:partNUm);

}

}

job.setPartitioner(ProvincePartitioner.class);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

Combine

combine出现的原因,作用?

减少数据量,提高传输效率。将形如A 1 A 1 A1 转换成A 3,类似于map端的reduce。

注意:combine需要注意场合,不是什么MR都适用。

分区详解

分区的数量和reduce(或者叫reducetask)数量是一致的。

文章目录

  • 本文行文逻辑
  • MapReduce程序详解(即map中,reduce中)
  • map前,reduce后详解
    • 分片详解
      • 什么是分片?
      • 为何要分片?
      • 分片大小计算?
      • 分片的读取规则
    • 控制maptask和reducetask数量
  • MapReduce运行全流程(主要介绍map到reduce的其中过程,即shuffle流程)
    • MR运行全流程中自定义部分
      • 自定义数据类型
      • 自定义分区
      • Combine
    • 分区详解

本文行文逻辑

分别从MR过程的前,中,后三个阶段对MR进行详细介绍
           
  • 1

MapReduce程序详解(即map中,reduce中)

一种分布式运算程序,分为map阶段和reduce阶段

Map阶段会有一个实体程序,不需要我们自己开发,用户只需要维护map方法就可以

默认情况下map程序读取一行数据(映射成key-value对是一行行的)就会调用一次map方法,而且会将这一行数据的偏移量作为key(即key一定是IntWritable),这一行数据的内容作为value返回给框架,然后由框架写出context.write(key,value)

Reduce 阶段会有一个实体程序,不要我们自己开发我们需要维护reduce方法

Reduce程序会接受map端输出的中间结果数据,而且相同的key的数据会到达同一个reduce实例中去,每个reduce实例会处理多个key的数据。Reduce程序会将自己收集的数据按照key相同进行分组,对一组数据调用一次reduce方法(按组调用reduce方法),并且将参数传给reduce(key,迭代器values,context),然后写出

map前,reduce后详解

MapReduce详解(MR运行全流程,shuffle,分区,分片)本文行文逻辑MapReduce程序详解(即map中,reduce中)map前,reduce后详解MapReduce运行全流程(主要介绍map到reduce的其中过程,即shuffle流程)

分片详解

什么是分片?

在进行map计算之前,map会根据输入文件计算输入分片(input split);每个输入分片(input split)针对一个map任务,输入分片存储的并非是数据本身,而是一个分片长度和一个记录数据的位置的数组。

逻辑概念,分片信息包括起始偏移量,分片大小,分片数据所在的块的信息,块所在的主机列表。

注意:分片是根据输入文件计算的,这些输入文件即是存储在hdfs上的那些个文件。

为何要分片?

答:方便多个Map任务并行处理,提高运作效率。

每一个分片对应着一个maptask,通过调整分片的大小可以调整maptask的数量,也就是调整map阶段的并行度。

分片大小计算?

下面是源程序中对分片大小的计算:

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//返回1

long maxSize = getMaxSplitSize(job);//返回long的最大值

long splitSize = computeSplitSize(blockSize, minSize, maxSize)

return Math.max(minSize, Math.min(maxSize, blockSize));

总计:先比较块大小和最大分片,选出其中较小的,然后将结果和最小分片比较,选出较大的。

//计算分片大小,实际的分片大小。

while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {} //分片有一个1.1倍的冗余

通过这个可以对实际分片大小进行设置,主要从最大分片大小和最小分片大小入手。

(1)FileInputFormat.setMinInputSplitSize(job,1000);

(2)FileInputFormat.setMaxInputSplitSize(job,1000000);

分片的读取规则

  1. 第一个分片从第一行开始读取,读到分片末尾,再读取下一个分片的第一行
  2. 既不是第一个分片也不是最后一个分片,第一行数据舍去,读到分片末尾,再继续读 取下一个分片的第一行数据
  3. 最后一个分片舍去第一行,读到分片末尾

控制maptask和reducetask数量

控制maptask数量:

1)maptask数量由分片数量决定,可设置maxsize,minsize,blocksize来控制分片的大小,进而控制分片数量

2)改变数据总量也可影响maptask数量

控制reducetask数量:

1)job.setNumReduceTasks(5); 直接设置reducetask数量

2)分区数和reducetask数量是一致的,可以调整分区数。

MapReduce运行全流程(主要介绍map到reduce的其中过程,即shuffle流程)

MapReduce详解(MR运行全流程,shuffle,分区,分片)本文行文逻辑MapReduce程序详解(即map中,reduce中)map前,reduce后详解MapReduce运行全流程(主要介绍map到reduce的其中过程,即shuffle流程)
  • org.apache.hadoop.mapred.OutputCollector(即文中的输出收集器),OutputCollector由Hadoop框架提供,负责收集Mapper和Reducer的输出数据,实现map函数和reduce函数时,只需要简单的将其输出的<key,value>对往OutputCollector中一丢即可,剩余的事情框架自会帮你处理好。

    可以理解为map()和reduce()不会每执行一次便写出,是需要积累的,具体流程可以查看底层原理进行了解。

  • 为什么要写入环形缓冲区?

    答: hadoop在执行MapReduce任务的时候,在map阶段,map()函数产生输出以后,并不是直接写入到磁盘中,而是先写入到了环形缓冲区,这样做的原因是无论缓冲区写入还是读出,速度都更快,效率更高

  • 写入磁盘缓冲区的时候,为什么只写入百分之80,那百分之20干啥的?

    答:这个内存缓冲区是有大小限制的,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写,字面意思很直观。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。

  • 分片数据和溢写磁盘文件的对应关系?

    通过上一条问题可知,环形缓冲区会重复利用。

    一个分片数据有可能会产生多个溢写磁盘文件。

  • 内/外部排序针对的是内存,在内存中的排序称之为内部排序,反之,在磁盘等位置的排序称为外部排序。

MR运行全流程中自定义部分

在MR运行全流程中,存在若干过程是设计人员可以进行自定义设计,即人为干预的。

包括:

  • TextInputFormat
  • map()函数
  • 自定义分区(注意:先分区后排序,且是在溢写之前进行分区)
  • 分区内自定义排序(注意:要在分区内进行排序)
  • combine()
  • 传入reduce()函数的key,我们可以自定义key类型,并定义什么样的key算是一样的
  • reduce( )
  • TextOutputFormat

自定义数据类型

1.要实现writable接口

2.读写顺序要一致

3.构造方法如果进行了重写,要显示定义无参的构造方法

4.重写toString方法

自定义分区

数据分发的策略

如何实现自定义分区?

前提要保证相同key的数据会发送到同一个reduce中,或者说是分到同一个分区中

Partitioner<key,value> ,这里的key和value的数据类型和map输出的数据类型保持一致

继承Patitioner这个类,重写分区方法getPartition(map输出key,map输出value,分区数量)

然后在job中设置使用我们自定义的分区方法进行数据分发

/**
  场景:将不同手机号前缀归向不同省份地区
*/
           

继续阅读