天天看点

Hadoop之MapReduce02【自定义wordcount案例】

创建MapperTask

 创建一个java类继承Mapper父类

Hadoop之MapReduce02【自定义wordcount案例】
接口形参说明
Hadoop之MapReduce02【自定义wordcount案例】
注意数据经过网络传输,所以需要序列化
Hadoop之MapReduce02【自定义wordcount案例】

/**
 * 注意数据经过网络传输,所以需要序列化
 * 
 * KEYIN:默认是一行一行读取的偏移量  long LongWritable
 * VALUEIN:默认读取的一行的类型 String 
 * 
 * KEYOUT:用户处理完成后返回的数据的key String LongWritable
 * VALUEOUT:用户处理完成后返回的value integer IntWritable
 * @author 波波烤鸭
 *     [email protected]
 */
public class MyMapperTask extends Mapper<LongWritable, Text, Text, IntWritable> {
    /**
     * Map阶段的业务逻辑写在Map方法中
     * 默认是 每读取一行记录就会调用一次该方法
     * @param key 读取的偏移量
     * @param value 读取的那行数据
     */
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        // 根据空格切割单词
        String[] words = line.split(" ");
        for (String word : words) {
            // 将单词作为key 将1作为值 以便于后续的数据分发
            context.write(new Text(word), new IntWritable(1));
        }
    }
}      

创建ReduceTask

 创建java类继承自Reducer父类。

Hadoop之MapReduce02【自定义wordcount案例】
/**
 * KEYIN和VALUEIN 对应的是map阶段的 KEYOUT和VALUEOUT
 * 
 * KEYOUT:    reduce逻辑处理的输出类型
 * VALUEOUT:
 * @author 波波烤鸭
 *     [email protected]
 */
public class MyReducerTask extends Reducer<Text, IntWritable, Text, IntWritable>{

    /**
     * @param key map阶段输出的key
     * @param values map阶段输出的相同的key对应的数据集
     * @param context 上下文
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
        int count = 0 ;
        // 统计同一个key下的单词的个数
        for (IntWritable value : values) {
            count += value.get();
        }
        context.write(key, new IntWritable(count));
    }
}      

创建启动工具类

package com.bobo.mr.wc;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WcTest {

    public static void main(String[] args) throws Exception {
        // 创建配置文件对象
        Configuration conf = new Configuration(true);
        
        // 获取Job对象
        Job job = Job.getInstance(conf);
        // 设置相关类
        job.setJarByClass(WcTest.class);
        
        // 指定 Map阶段和Reduce阶段的处理类
        job.setMapperClass(MyMapperTask.class);
        job.setReducerClass(MyReducerTask.class);
        
        // 指定Map阶段的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        // 指定job的原始文件的输入输出路径 通过参数传入
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 提交任务,并等待响应
        job.waitForCompletion(true);
    }
}      

打包部署

 maven打包为jar包

Hadoop之MapReduce02【自定义wordcount案例】
Hadoop之MapReduce02【自定义wordcount案例】

上传测试

Hadoop之MapReduce02【自定义wordcount案例】

在HDFS系统中创建wordcount案例文件夹,并测试

hadoop fs -mkdir -p /hdfs/wordcount/input
hadoop fs -put a.txt b.txt /hdfs/wordcount/input/      

执行程序测试

hadoop jar hadoop-demo-0.0.1-SNAPSHOT.jar com.bobo.mr.wc.WcTest /hdfs/wordcount/input /hdfs/wordcount/output/      
Hadoop之MapReduce02【自定义wordcount案例】

执行成功

[root@hadoop-node01 ~]# hadoop jar hadoop-demo-0.0.1-SNAPSHOT.jar com.bobo.mr.wc.WcTest /hdfs/wordcount/input /hdfs/wordcount/output/
19/04/03 16:56:43 INFO client.RMProxy: Connecting to ResourceManager at hadoop-node01/192.168.88.61:8032
19/04/03 16:56:46 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner t
o remedy this.19/04/03 16:56:48 INFO input.FileInputFormat: Total input paths to process : 2
19/04/03 16:56:49 INFO mapreduce.JobSubmitter: number of splits:2
19/04/03 16:56:51 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1554281786018_0001
19/04/03 16:56:52 INFO impl.YarnClientImpl: Submitted application application_1554281786018_0001
19/04/03 16:56:53 INFO mapreduce.Job: The url to track the job: http://hadoop-node01:8088/proxy/application_1554281786018_0001/
19/04/03 16:56:53 INFO mapreduce.Job: Running job: job_1554281786018_0001
19/04/03 16:57:14 INFO mapreduce.Job: Job job_1554281786018_0001 running in uber mode : false
19/04/03 16:57:14 INFO mapreduce.Job:  map 0% reduce 0%
19/04/03 16:57:38 INFO mapreduce.Job:  map 100% reduce 0%
19/04/03 16:57:56 INFO mapreduce.Job:  map 100% reduce 100%
19/04/03 16:57:57 INFO mapreduce.Job: Job job_1554281786018_0001 completed successfully
19/04/03 16:57:57 INFO mapreduce.Job: Counters: 50
    File System Counters
        FILE: Number of bytes read=181
        FILE: Number of bytes written=321388
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=325
        HDFS: Number of bytes written=87
        HDFS: Number of read operations=9
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=2
        Launched reduce tasks=1
        Data-local map tasks=1
        Rack-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=46511
        Total time spent by all reduces in occupied slots (ms)=12763
        Total time spent by all map tasks (ms)=46511
        Total time spent by all reduce tasks (ms)=12763
        Total vcore-milliseconds taken by all map tasks=46511
        Total vcore-milliseconds taken by all reduce tasks=12763
        Total megabyte-milliseconds taken by all map tasks=47627264
        Total megabyte-milliseconds taken by all reduce tasks=13069312
    Map-Reduce Framework
        Map input records=14
        Map output records=14
        Map output bytes=147
        Map output materialized bytes=187
        Input split bytes=234
        Combine input records=0
        Combine output records=0
        Reduce input groups=10
        Reduce shuffle bytes=187
        Reduce input records=14
        Reduce output records=10
        Spilled Records=28
        Shuffled Maps =2
        Failed Shuffles=0
        Merged Map outputs=2
        GC time elapsed (ms)=1049
        CPU time spent (ms)=5040
        Physical memory (bytes) snapshot=343056384
        Virtual memory (bytes) snapshot=6182891520
        Total committed heap usage (bytes)=251813888
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=91
    File Output Format Counters 
        Bytes Written=87      
Hadoop之MapReduce02【自定义wordcount案例】

查看结果

[root@hadoop-node01 ~]# hadoop fs -cat /hdfs/wordcount/output/part-r-00000
ajax    1
bobo烤鸭  1
hello   2
java    2
mybatis 1
name    1
php 1
shell   2
spring  2
springmvc   1      

OK~