天天看点

使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner

工程结构:

使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner

在整个案例过程中,代码如下:

wordcountmapper的代码如下:

package cn.toto.bigdata.mr.wc;

import java.io.ioexception;

import org.apache.hadoop.io.intwritable;

import org.apache.hadoop.io.longwritable;

import org.apache.hadoop.io.text;

import org.apache.hadoop.mapreduce.mapper;

/**

 * 这里的mapper是hadoop-mapreduce-client-core-2.8.0.jar中的内容

 * mapper<keyin, valuein, keyout, valueout>

 * keyin     :是指框架读取到的数据的key的类型,在默认的inputformat下,读到的key是一行文本的起始偏移量,所以key的类型是long

 * valuein   :是指框架读取到的数据的value的类型,在默认的inputformat下,读到的value是一行文本的内容,所以value的类型是string

 * keyout    :是指用户自定义逻辑方法返回的数据中key的类型,由用户业务逻辑决定,在此wordcount程序中,我们输出的key是单词,所以是string

 * valueout  :是指用户自定义逻辑方法返回的数据中value的类型,由用户业务逻辑决定,在此wordcount程序中,我们输出的value是单词的数量,所以是integer

 *

 * 但是,string,long等jdk中自带的数据类型,在序列化是,效率比较低,hadoop为了提高序列化效率,自定义了一套序列化框架,

 * 所以,在hadoop的程序中,如果该数据需要进行序列化(写磁盘,或者网络传输),就一定要用实现了hadoop序列化框架的数据类型

 * long       ----> longwritable

 * string     ----> text

 * integer    ----> intwritable

 * null       ----> nullwritable

 */

public class wordcountmapper extends mapper<longwritable, text, text, intwritable> {

         /**

          * 这就是mapreduce框架中一个主体运行程序maptask所要调用的用户业务逻辑方法

          * maptask会驱动inputformat去读取数据(keyin,valuein),每读到一个kv对,就传入这个用户写的map方法中调用一次

          * 在默认的inputformat实现中,此处的一个key就是一行的起始偏移量,value就是一行的内容

          */

         @override

         protected void map(longwritable key, text value, mapper<longwritable, text, text, intwritable>.context context)

                            throws ioexception, interruptedexception {

                   string line = value.tostring();

                   string[] words = line.split(" ");

                   for (string word : words) {

                            context.write(new text(word), new intwritable(1));

                   }

         }

}

wordcountreducer的代码如下:

import org.apache.hadoop.mapreduce.reducer;

public class wordcountreducer extends reducer<text, intwritable, text, intwritable> {

         /** reducetask在调我们写的reduce方法

                   reducetask应该收到了前一阶段(map阶段)中所有maptask输出的数据中的一部分

                   (数据的key.hashcode%reducetask数==本reductask号)

                   reducetask将这些收到kv数据拿来处理时,是这样调用我们的reduce方法的:

                          先将自己收到的所有的kv对按照k分组(根据k是否相同)

                          将某一组kv中的第一个kv中的k传给reduce方法的key变量,把这一组kv中所有的v用一个迭代器传给reduce方法的变量values

                    */

         protected void reduce(text key, iterable<intwritable> values,

                            reducer<text, intwritable, text, intwritable>.context context) throws ioexception, interruptedexception {

                   int count = 0;

                   for(intwritable v : values) {

                            count += v.get();

                   context.write(key, new intwritable(count));

wordcountdriver的代码如下:

import org.apache.hadoop.conf.configuration;

import org.apache.hadoop.fs.path;

import org.apache.hadoop.mapreduce.job;

import org.apache.hadoop.mapreduce.lib.input.fileinputformat;

import org.apache.hadoop.mapreduce.lib.input.textinputformat;

import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;

import org.apache.hadoop.mapreduce.lib.output.textoutputformat;

 * 本类是客户端用来指定wordcount job程序运行时所需要的很多参数:

 * 比如,指定用哪个组件作为数据读取器、数据结果输出器

 *     指定用哪个类作为map阶段的业务逻辑类,哪个类作为reduce阶段的业务逻辑类

 *     指定wordcount job程序的jar包所在路径

 *     ....

 *     运行前准备工作

 *     1、将当前的工程导出成wordcount.jar

 *     2、准备/home/toto/software/wordcount/input/a.txt 其中a.txt中的内容类似:

 *                  the true

                nobility is

                in being

                superior to

                your previous

                self guess

      3、将 /home/toto/software/wordcount通过 hadoop fs -put wordcount /wordcount 上传到hdfs中

 *    

 *     以及其他各种需要的参数

 *     hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.wordcountdriver

 *     上面的命令等同:

 *     java -cp wordcount.jar cn.toto.bigdata.mr.wc.wordcountdriver

 *     上面的含义是通过hadoop jar将hadoop classpath的jar都拷贝到应用中,并且指定执行cn.toto.bigdata.mr.wc.wordcountdriver

 *     

 *     最后查看结果的方式是:hadoop fs -cat /wordcount/output/part-r-00000,通过这个命令可以查看查看到

public class wordcountdriver {

   public static void main(string[] args) throws exception {

      configuration conf = new configuration();

      conf.set("fs.defaultfs", "hdfs://hadoop:9000");

      /*conf.set("mapreduce.framework.name", "yarn");

      conf.set("yarn.resourcemanager.hostname", "mini1");*/

      job job = job.getinstance(conf);

      //告诉框架,我们的程序所在jar包的路径

      // job.setjar("c:/wordcount.jar");

      job.setjarbyclass(wordcountdriver.class);

      //告诉框架,我们的程序所用的mapper类和reducer类

      job.setmapperclass(wordcountmapper.class);

      job.setreducerclass(wordcountreducer.class);

      //告诉框架,我们的mapperreducer输出的数据类型

      job.setmapoutputkeyclass(text.class);

      job.setmapoutputvalueclass(intwritable.class);

      job.setoutputkeyclass(text.class);

      job.setoutputvalueclass(intwritable.class);

      // 告诉框架,我们的数据读取、结果输出所用的format组件

      // textinputformat是mapreduce框架中内置的一种读取文本文件的输入组件

      job.setinputformatclass(textinputformat.class);

      job.setoutputformatclass(textoutputformat.class);

      // 告诉框架,我们要处理的文件在哪个路径下,注意若hdfs中已经有了/wordcount/input/这个文件,说明

      fileinputformat.setinputpaths(job, new path("/wordcount/input/"));

      // 告诉框架,我们的处理结果要输出到哪里去

      fileoutputformat.setoutputpath(job, new path("/wordcount/output/"));深

      boolean res = job.waitforcompletion(true);

      system.exit(res?0:1);

   }

运行前的准备工作:

运行前准备工作

1、将当前的工程导出成wordcount.jar

2、准备/home/toto/software/wordcount/input/a.txt 其中a.txt中的内容类似:

                   the true

   3、将 /home/toto/software/wordcount通过 hadoop fs -put wordcount /wordcount 上传到hdfs中

最后,可以执行的命令是:

hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.wordcountdriver

执行后的效果如下:

使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner
使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner

b:使用wordcount本地运行,并且使用combiner的案例(主要改变是在wordcountdriver中),代码如下:

准备工作:

在e盘下准备e:/wordcount/input/a.txt,其中的内容如下:

右键运行上面的代码,进入:

e:\wordcount\output\part-r-00000中看结果,结果内容如下:

经过上面的所有步骤之后,程序已经编写完成

总结:

(1)combiner是mr程序中mapper和reducer之外的一种组件

(2)combiner组件的父类就是reducer

(3)combiner和reducer的区别在于运行的位置:

combiner是在每一个maptask所在的节点运行

reducer是接收全局所有mapper的输出结果;

(4) combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量

具体实现步骤:

    1、 自定义一个combiner继承reducer,重写reduce方法

    2、 在job中设置: job.setcombinerclass(customcombiner.class)

(5) combiner能够应用的前提是不能影响最终的业务逻辑

而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来

<a target="_blank"></a>

combiner的使用要非常谨慎

因为combiner在mapreduce过程中可能调用也肯能不调用,可能调一次也可能调多次

所以:combiner使用的原则是:有或没有都不能影响业务逻辑

===============================================================================

 流量统计和自定义类实现序列化案例:

运行条件模拟:

1、配置环境变量为hadoop_home=e:\learntempfolder\hadoop-2.7.3

 2、从csdn资源上下载支持win10版本的:e:\learntempfolder\hadoop-2.7.3\bin\winutils.exe 和 e:\learntempfolder\hadoop-2.7.3\bin\hadoop.dll

界面效果如下:

使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner

3、准备要处理的资料:

使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner

http_20130313143750.dat 数据文件的具体内容如:

4、先运行flowsum(右键执行java程序)

运行生成的文件为e:\learntempfolder\flow\output\part-r-00000,内容如下:

5、运行flowsumsort(注意不要删除上面的part-r-00000)

运行后产生的文件内容是:

当然,我们也可以一次性求和并运算出结果输出到指定的文件目录中,代码如下:

到"e:/flow/sortout/"目录下,查看结果:

使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner

即:

6、为不同的手机号设置分区,让不同的手机号在不同的文件中。方法如下:

a:下面是自定义分区,自定分区的代码如下:

b:测试一下自定义分区:

c:运行所需的准备:

数据文件:

使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner

文件内容如下:

运行后的结果如下:

使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner

part-r-00001中内容:

使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner
使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner
使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner

等等