(一) 什么是MapReduce?
(1).概念
官网上原话翻译成中文这么说的:Hadoop MapReduce是一个用于轻松编写应用程序的软件框架,它以可靠的容错方式在大型群集(数千个节点)的商品硬件上并行处理海量数据(多TB数据集).
关键词:软件框架 可靠性 高容错 处理海量数据
说白了,其实mapreduce就是把大量数据通过分而治之的核心思想来实现分布式计算框架,核心架构map和reduce任务。
(2) Map任务
2.1) 读取文档里面的数据内容,inputSplit将数据内容解析成k-v键值对,每对键值对调用一次map函数;
2.2) 实现自己的map函数,k-v键值对处理并输出到中间结果;
2.3) 对k-v键值对进行分区(partitioner)
2.4) 按照分区对k-v进行处理(排序和合并),
a.对相同key的value值放入一个list集合里面;
b.将不同分区的key相同的键值对放入同一个reduce处理。
2.5) 对mapper输出的结果进行归约并传入reduce(mapper任务的输出就是reduce任务的输入)
(3) Reduce任务
3.1)存在多个reduce的情况下,多mapper任务输出的结果按照不同分区复制到不同的reduce下,单个更简单;
3.2)reduce对mapper的输入结果进行合并、排序,然后在调用自己实现的reduce()方法来对k-v进行处理,得到新的
k-v键值对输出;
3.3)把通过自定义的reduce()处理后的结果存入文档;
(二) MapReduce的执行原理
MapReduce 运行的时候,会通过Mapper 运行的任务读取HDFS 中的数据文件,然后调用自己的方法,处理数据,最后输出;
Reducer 任务会接收Mapper 任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到HDFS 的文件中,如下图
2.1) Mapper任务执行流程
每个Mapper 任务是一个java 进程,它会读取HDFS 中的文件,解析成很多的键值对,经过我们覆盖的map 方法处理后,转换为很多的键值对再输出。
整个Mapper 任务的处理过程又可以分为以下几个阶段,如图
①:把输入文件按照一定的标准分片(InputSplit),每个输入片的大小是固定的。默认情况下,输入片(InputSplit)的大小与数据块(Block)的大小是相同的;
如果数据块(Block)的大小是默认值64MB,输入文件有两个,一个是32MB,一个是72MB;么小的文件是一个输入片,大文件会分为两个数据块,
那么是两个输入片。一共产生三个输入片;每一个输入片由一个Mapper 进程处理。(ps:Hadoop1.0数据块block默认64M,Hadoop2.0以后默认都是128M)
②:对输入片中的记录按照一定的规则解析成键值对。有个默认规则是把每一行文本内容解析成键值对。“键”是每一行的起始位置(单位是字节),“值”是本行的文本内容。
③:每一对键值对都会调用一次map()来对输出数据做一些处理,并且每一次调用map()都会产生0到多对键值对| ps :这里的map()就是我们自己覆盖实现的。
④:对键值对按照一定算法进行分区,分区是指划分key的空间,分区的总数对应job的reduce数,也就是说分区和reduce是一一对应的,
默认只有一个分区,分区名为HashPartitioner。(ps:分区的实质其实就是数据归类,为了减轻reduce的压力)
⑤:对④步骤中各个分区内部根据key来进行排序,对key-value键值对进行分组。
⑥:对分区中的数据进行规约。(ps:减少到reduce的键值对数据量,作业时间变短,效率提高)
2.2) Reduce任务执行流程
Reducer 任务接收Mapper 任务的输出,归约处理后写入到HDFS 中,步骤图所示:
①:将各个分区的mapper输出的键值对传入到reduce里面;
②:将reduce里面的k-v键值对进行合并,接下来对其合并结果进行排序;
③:对执行好①、②操作后的键值对结果输出到指定位置。(ps:如HDFS文件系统)
2.3) 一个mapreduce的job简单原理图
ps:<key1,value1>{input} ------>map------> <key2, value2>{combine} ------> <key2,value2>----->reduce-----><key3,value3>{output}
(三) 单词计数的例子展示
3.1)单词计数结构原理模拟图
①Combiner 节点负责完成上面提到的将同一个map中相同的key进行合并,避免重复传输,从而减少传输中的通信开销
②Partitioner节点负责将map产生的中间结果进行划分,确保相同的key到达同一个reduce节点.
③
3.2)代码实例展示
ps:博主使用的开发工具是IDE,开发语言是java,当然有兴趣也使用其他语言去实现!
①:先来张wordcount类简略图
②:来分析下每个方法,这里就偷懒,就直接贴代码
public class WordCount {
private final static String INPUT_PATH = "hdfs://master:9000/input"; //输入路径
private final static String OUTPUT_PATH = "hdfs://master:9000/output"; //输出路径,必须是不存在的
private static Configuration getConfiguration() {
Configuration conf = new Configuration();
return conf;
}
static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);//表示单词出现次数
private Text word = new Text();//某个单词
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//表示每次处理一行数据,并且按空格分割
StringTokenizer st = new StringTokenizer(value.toString());
while (st.hasMoreTokens()) {
word.set(st.nextToken());
context.write(word, one);//写入上下文中,用于后续传入reduce
}
}
}
static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable(0); //某单词出现的总次数
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.创建Configuration
Configuration cf = new Configuration();
//2.准备环境
Path outputPath = new Path(OUTPUT_PATH);
Path inputPath = new Path(INPUT_PATH);
FileSystem fs = FileSystem.get(getConfiguration());
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//3.创建job
Job job = new Job(cf, WordCount.class.getSimpleName());
job.setJarByClass(WordCount.class);//打成jar
FileInputFormat.setInputPaths(job, inputPath);//读取HDFS文件路径
//设置map相关
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reduce相关
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置job的输出环境
FileOutputFormat.setOutputPath(job, outputPath);
boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
}
第一步:打包我们上面的类大jar(ps:是jar包不是war包)
第二步:启动hadoop集群(ps:这里怎么配置集群我就不多说了,不懂的问度娘,网上一大堆)
第三步:新建输入路径并上传资源文件到HDFS,与java代码里面的input输出路径一一对应;
第四步:运行jar,获得结果
PS:MapReduce大概就这样了,等有机会更深入的了解,再来继续更新......