工程结构:
在整个案例过程中,代码如下:
wordcountmapper的代码如下:
package cn.toto.bigdata.mr.wc;
import java.io.ioexception;
import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.mapper;
/**
* 这里的mapper是hadoop-mapreduce-client-core-2.8.0.jar中的内容
* mapper<keyin, valuein, keyout, valueout>
* keyin :是指框架读取到的数据的key的类型,在默认的inputformat下,读到的key是一行文本的起始偏移量,所以key的类型是long
* valuein :是指框架读取到的数据的value的类型,在默认的inputformat下,读到的value是一行文本的内容,所以value的类型是string
* keyout :是指用户自定义逻辑方法返回的数据中key的类型,由用户业务逻辑决定,在此wordcount程序中,我们输出的key是单词,所以是string
* valueout :是指用户自定义逻辑方法返回的数据中value的类型,由用户业务逻辑决定,在此wordcount程序中,我们输出的value是单词的数量,所以是integer
*
* 但是,string,long等jdk中自带的数据类型,在序列化是,效率比较低,hadoop为了提高序列化效率,自定义了一套序列化框架,
* 所以,在hadoop的程序中,如果该数据需要进行序列化(写磁盘,或者网络传输),就一定要用实现了hadoop序列化框架的数据类型
* long ----> longwritable
* string ----> text
* integer ----> intwritable
* null ----> nullwritable
*/
public class wordcountmapper extends mapper<longwritable, text, text, intwritable> {
/**
* 这就是mapreduce框架中一个主体运行程序maptask所要调用的用户业务逻辑方法
* maptask会驱动inputformat去读取数据(keyin,valuein),每读到一个kv对,就传入这个用户写的map方法中调用一次
* 在默认的inputformat实现中,此处的一个key就是一行的起始偏移量,value就是一行的内容
*/
@override
protected void map(longwritable key, text value, mapper<longwritable, text, text, intwritable>.context context)
throws ioexception, interruptedexception {
string line = value.tostring();
string[] words = line.split(" ");
for (string word : words) {
context.write(new text(word), new intwritable(1));
}
}
}
wordcountreducer的代码如下:
import org.apache.hadoop.mapreduce.reducer;
public class wordcountreducer extends reducer<text, intwritable, text, intwritable> {
/** reducetask在调我们写的reduce方法
reducetask应该收到了前一阶段(map阶段)中所有maptask输出的数据中的一部分
(数据的key.hashcode%reducetask数==本reductask号)
reducetask将这些收到kv数据拿来处理时,是这样调用我们的reduce方法的:
先将自己收到的所有的kv对按照k分组(根据k是否相同)
将某一组kv中的第一个kv中的k传给reduce方法的key变量,把这一组kv中所有的v用一个迭代器传给reduce方法的变量values
*/
protected void reduce(text key, iterable<intwritable> values,
reducer<text, intwritable, text, intwritable>.context context) throws ioexception, interruptedexception {
int count = 0;
for(intwritable v : values) {
count += v.get();
context.write(key, new intwritable(count));
wordcountdriver的代码如下:
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.mapreduce.lib.input.fileinputformat;
import org.apache.hadoop.mapreduce.lib.input.textinputformat;
import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;
import org.apache.hadoop.mapreduce.lib.output.textoutputformat;
* 本类是客户端用来指定wordcount job程序运行时所需要的很多参数:
* 比如,指定用哪个组件作为数据读取器、数据结果输出器
* 指定用哪个类作为map阶段的业务逻辑类,哪个类作为reduce阶段的业务逻辑类
* 指定wordcount job程序的jar包所在路径
* ....
* 运行前准备工作
* 1、将当前的工程导出成wordcount.jar
* 2、准备/home/toto/software/wordcount/input/a.txt 其中a.txt中的内容类似:
* the true
nobility is
in being
superior to
your previous
self guess
3、将 /home/toto/software/wordcount通过 hadoop fs -put wordcount /wordcount 上传到hdfs中
*
* 以及其他各种需要的参数
* hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.wordcountdriver
* 上面的命令等同:
* java -cp wordcount.jar cn.toto.bigdata.mr.wc.wordcountdriver
* 上面的含义是通过hadoop jar将hadoop classpath的jar都拷贝到应用中,并且指定执行cn.toto.bigdata.mr.wc.wordcountdriver
*
* 最后查看结果的方式是:hadoop fs -cat /wordcount/output/part-r-00000,通过这个命令可以查看查看到
public class wordcountdriver {
public static void main(string[] args) throws exception {
configuration conf = new configuration();
conf.set("fs.defaultfs", "hdfs://hadoop:9000");
/*conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "mini1");*/
job job = job.getinstance(conf);
//告诉框架,我们的程序所在jar包的路径
// job.setjar("c:/wordcount.jar");
job.setjarbyclass(wordcountdriver.class);
//告诉框架,我们的程序所用的mapper类和reducer类
job.setmapperclass(wordcountmapper.class);
job.setreducerclass(wordcountreducer.class);
//告诉框架,我们的mapperreducer输出的数据类型
job.setmapoutputkeyclass(text.class);
job.setmapoutputvalueclass(intwritable.class);
job.setoutputkeyclass(text.class);
job.setoutputvalueclass(intwritable.class);
// 告诉框架,我们的数据读取、结果输出所用的format组件
// textinputformat是mapreduce框架中内置的一种读取文本文件的输入组件
job.setinputformatclass(textinputformat.class);
job.setoutputformatclass(textoutputformat.class);
// 告诉框架,我们要处理的文件在哪个路径下,注意若hdfs中已经有了/wordcount/input/这个文件,说明
fileinputformat.setinputpaths(job, new path("/wordcount/input/"));
// 告诉框架,我们的处理结果要输出到哪里去
fileoutputformat.setoutputpath(job, new path("/wordcount/output/"));深
boolean res = job.waitforcompletion(true);
system.exit(res?0:1);
}
运行前的准备工作:
运行前准备工作
1、将当前的工程导出成wordcount.jar
2、准备/home/toto/software/wordcount/input/a.txt 其中a.txt中的内容类似:
the true
3、将 /home/toto/software/wordcount通过 hadoop fs -put wordcount /wordcount 上传到hdfs中
最后,可以执行的命令是:
hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.wordcountdriver
执行后的效果如下:
b:使用wordcount本地运行,并且使用combiner的案例(主要改变是在wordcountdriver中),代码如下:
准备工作:
在e盘下准备e:/wordcount/input/a.txt,其中的内容如下:
右键运行上面的代码,进入:
e:\wordcount\output\part-r-00000中看结果,结果内容如下:
经过上面的所有步骤之后,程序已经编写完成
总结:
(1)combiner是mr程序中mapper和reducer之外的一种组件
(2)combiner组件的父类就是reducer
(3)combiner和reducer的区别在于运行的位置:
combiner是在每一个maptask所在的节点运行
reducer是接收全局所有mapper的输出结果;
(4) combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量
具体实现步骤:
1、 自定义一个combiner继承reducer,重写reduce方法
2、 在job中设置: job.setcombinerclass(customcombiner.class)
(5) combiner能够应用的前提是不能影响最终的业务逻辑
而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来
<a target="_blank"></a>
combiner的使用要非常谨慎
因为combiner在mapreduce过程中可能调用也肯能不调用,可能调一次也可能调多次
所以:combiner使用的原则是:有或没有都不能影响业务逻辑
===============================================================================
流量统计和自定义类实现序列化案例:
运行条件模拟:
1、配置环境变量为hadoop_home=e:\learntempfolder\hadoop-2.7.3
2、从csdn资源上下载支持win10版本的:e:\learntempfolder\hadoop-2.7.3\bin\winutils.exe 和 e:\learntempfolder\hadoop-2.7.3\bin\hadoop.dll
界面效果如下:
3、准备要处理的资料:
http_20130313143750.dat 数据文件的具体内容如:
4、先运行flowsum(右键执行java程序)
运行生成的文件为e:\learntempfolder\flow\output\part-r-00000,内容如下:
5、运行flowsumsort(注意不要删除上面的part-r-00000)
运行后产生的文件内容是:
当然,我们也可以一次性求和并运算出结果输出到指定的文件目录中,代码如下:
到"e:/flow/sortout/"目录下,查看结果:
即:
6、为不同的手机号设置分区,让不同的手机号在不同的文件中。方法如下:
a:下面是自定义分区,自定分区的代码如下:
b:测试一下自定义分区:
c:运行所需的准备:
数据文件:
文件内容如下:
运行后的结果如下:
part-r-00001中内容:
等等