天天看點

MapReduce入門詳解(三)

個人部落格原文連結

join操作

左外連接配接(map)-JoinMapSideMR

問題描述:

将兩個檔案中每行的内容拼接到一個檔案中

思路分析:

準備好兩個map,firstMapper和joinMapper,firstMapper負責擷取檔案内容,joinMapper負責拼接檔案内容。利用Job開啟兩個firstMapper任務,擷取到兩個檔案的内容,然後再開啟一個joinMapper任務負責拼接擷取到的兩個檔案。

注:不常用map端的連接配接操作,推薦reduce端的連接配接操作

public class JoinMapSideMR extends Configured implements Tool {

    public static class FirstStepMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            if(!value.toString().equals("")) {
                context.write(value, NullWritable.get());
            }
        }
    }
    //讀取連接配接好的資料的mapper
    public static class JoinMapper extends Mapper<Text, TupleWritable, Text, Text>{
        @Override
        protected void map(Text key, TupleWritable value, Context context) throws IOException, InterruptedException {
            String v = StreamSupport.stream(value.spliterator(), false).map(s -> ((Text) s).toString())
                    .collect(Collectors.joining("|"));
            context.write(key,new Text(v));
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Path inpath1 = new Path(conf.get("inpath1"));
        Path inpath2 = new Path(conf.get("inpath2"));
        Path mr1 = new Path(conf.get("mr1"));
        Path mr2 = new Path(conf.get("mr2"));
        Path outpath = new Path(conf.get("outpath"));
        //------------------------
        Job job1 = Job.getInstance(conf,"first_step1_xj");
        job1.setJarByClass(this.getClass());
        job1.setMapperClass(FirstStepMapper.class);
        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(NullWritable.class);
        job1.setReducerClass(Reducer.class);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(NullWritable.class);
        TextInputFormat.addInputPath(job1,inpath1);
        TextOutputFormat.setOutputPath(job1,mr1);
        FileOutputFormat.setOutputCompressorClass(job1,new GzipCodec().getClass());
        //------------------------
        Job job2 = Job.getInstance(conf,"first_step2_xj");
        job2.setJarByClass(this.getClass());
        job2.setMapperClass(FirstStepMapper.class);
        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(NullWritable.class);
        job2.setReducerClass(Reducer.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(NullWritable.class);
        TextInputFormat.addInputPath(job2,inpath2);
        TextOutputFormat.setOutputPath(job2,mr2);
        FileOutputFormat.setOutputCompressorClass(job2,new GzipCodec().getClass());
        //------------------------
        Job job3 = Job.getInstance(conf,"map_join_xj");
        job3.setJarByClass(this.getClass());
        job3.setMapperClass(JoinMapper.class);
        job3.setMapOutputKeyClass(Text.class);
        job3.setMapOutputValueClass(Text.class);
        job3.setNumReduceTasks(0);
        job3.getConfiguration().set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
        String expr = CompositeInputFormat.compose("inner", KeyValueTextInputFormat.class, mr1, mr2);
        job3.getConfiguration().set("mapreduce.join.expr",expr);
        job3.setInputFormatClass(CompositeInputFormat.class);
        TextOutputFormat.setOutputPath(job3,outpath);
        List<Job> list = new ArrayList();
        list.add(job1);
        list.add(job2);
        list.add(job3);
        for (Job job : list) {
            boolean succ = job.waitForCompletion(true);
            if(!succ){
                System.out.println(job.getJobName()+":"+ job.getJobState().getValue());
                break;
            }
        }
        return 0;
    }

    public static void main(String[] args)throws Exception {
        ToolRunner.run(new JoinMapSideMR(),args);
    }
}
           

左外連接配接(reduce)-JoinReduceSideMR

問題描述:

将兩個檔案中每行的内容拼接到一個檔案中

思路分析:

準備好兩個map,fistMapper和SecondMapper,兩個map的key的輸出類型都為複合類型,包含id和tag,另外準備兩個類自定義分組和分區規則,隻根據id來分組和分區。是以,這兩個map的輸出結果就會進入到同一個reduce中,最後在reduce中完成拼接操作。

複合類型-ArtistIDTag

public class ArtistIDTag implements WritableComparable<ArtistIDTag> {

    private Text ArtistID = new Text(); // id
    private IntWritable Tag = new IntWritable(); // 标記

    public ArtistIDTag() {
    }

    public ArtistIDTag(Text artistID, IntWritable tag) {
        this.ArtistID = new Text(artistID.toString());
        this.Tag = new IntWritable(tag.get());
    }

    public Text getArtistID() {
        return ArtistID;
    }

    public void setArtistID(Text artistID) {
        this.ArtistID = new Text(artistID.toString());
    }

    public IntWritable getTag() {
        return Tag;
    }

    public void setTag(IntWritable tag) {
        this.Tag = new IntWritable(tag.get());
    }

    @Override
    public int compareTo(ArtistIDTag o) {
        return this.ArtistID.compareTo(o.ArtistID)==0 ? this.Tag.compareTo(o.Tag) : this.ArtistID.compareTo(o.ArtistID);
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        ArtistID.write(dataOutput);
        Tag.write(dataOutput);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        ArtistID.readFields(dataInput);
        Tag.readFields(dataInput);
    }
}
           

自定義分區規則-ArtistPartitioner

public class ArtistPartitioner extends Partitioner<ArtistIDTag, Text> {
    @Override
    public int getPartition(ArtistIDTag artistIDTag, Text text, int i) {
        return Math.abs(artistIDTag.getArtistID().hashCode()*127)%i;
    }
}
           

自定義分組規則-ArtistGroupComparator

public class ArtistGroupComparator extends WritableComparator{
    public ArtistGroupComparator() {
        super(ArtistIDTag.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        ArtistIDTag at1 = (ArtistIDTag) a;
        ArtistIDTag at2 = (ArtistIDTag) b;
        return at1.getArtistID().compareTo(at2.getArtistID());
    }
}
           

連接配接-JoinReduceSideMR

public class JoinReduceSideMR extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new JoinReduceSideMR(),args);
    }

    public static class FirstMapper extends Mapper<LongWritable,Text,ArtistIDTag,Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            Stream.of(value.toString()).filter(s -> s.length()>0).forEach(ExceptionConsumer.of(s -> {
                        String id = s.substring(0,s.indexOf(","));
                        String info = s.substring(s.indexOf(",")+1,s.length());
                        context.write(new ArtistIDTag(new Text(id),new IntWritable(0)),new Text(info));
                    }));
        }
    }

    public static class SecondMapper extends Mapper<LongWritable,Text,ArtistIDTag,Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            Stream.of(value.toString()).filter(s -> s.length()>0).forEach(ExceptionConsumer.of(s -> {
                        String id = s.substring(0,s.indexOf(","));
                        String info = s.substring(s.indexOf(",")+1,s.length());
                        context.write(new ArtistIDTag(new Text(id),new IntWritable(1)),new Text(info));
                    }));
        }
    }

    public static class JoinReducer extends Reducer<ArtistIDTag,Text,Text,Text>{
        @Override
        protected void reduce(ArtistIDTag key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            Iterator<Text> ite = values.iterator();
            String name = ite.next().toString();
            while (ite.hasNext()){
                Text count = ite.next();
                String info = name.toString() + "|" + count.toString();
                context.write(key.getArtistID(),new Text(info));
            }

        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "join_reduce_xj");
        job.setJarByClass(this.getClass());

        // 多任務輸入
        MultipleInputs.addInputPath(job,new Path(conf.get("inpath1")),TextInputFormat.class,FirstMapper.class);
        MultipleInputs.addInputPath(job,new Path(conf.get("inpath2")),TextInputFormat.class,SecondMapper.class);

        job.setMapOutputKeyClass(ArtistIDTag.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(JoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setOutputFormatClass(TextOutputFormat.class);

        TextOutputFormat.setOutputPath(job,new Path(conf.get("outpath")));

        // 設定分區規則
        job.setPartitionerClass(ArtistPartitioner.class);
        // 設定分組規則
        job.setGroupingComparatorClass(ArtistGroupComparator.class);

        return job.waitForCompletion(true)? 0 : 1;
    }
}

           

DB互動操作

讀取-DBtoHdfsMR

問題描述:

從mysql資料庫中讀取資料,并輸出到hdfs

思路分析:

  1. 準備好一個實作了DBWritable接口的複合類型,在該類型中定義的屬性分别對應資料庫中的列名。
  2. 将該複合類型作為map階段輸入的value的類型即可。
  3. 讓叢集加載jdbc驅動類。
  4. 設定配置資訊,連接配接到資料庫。
  5. 将輸入類型設定為DBInputFormat。

    複合類型-YearStationTempDB

    注:輸入操作要實作WritableComparable接口,這裡是讀操作可以删除。

public class YearStationTempDB implements DBWritable,WritableComparable<YearStationTempDB> {

    private int year; // 年份
    private String station; // 氣象站編号
    private int temperature; // 氣溫

    public YearStationTempDB() {
    }

    public YearStationTempDB(int year, String station, int temperature) {
        this.year = year;
        this.station = station;
        this.temperature = temperature;
    }

    @Override
    public void write(PreparedStatement prep) throws SQLException {
        prep.setInt(1,year);
        prep.setString(2,station);
        prep.setInt(3,temperature);
    }

    @Override
    public void readFields(ResultSet rs) throws SQLException {
        this.year = rs.getInt("year");
        this.station = rs.getString("station");
        this.temperature = rs.getInt("temperature");
    }

    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public String getStation() {
        return station;
    }

    public void setStation(String station) {
        this.station = station;
    }

    public int getTemperature() {
        return temperature;
    }

    public void setTemperature(int temperature) {
        this.temperature = temperature;
    }

    @Override
    public String toString() {
        return year + "," + station +"," + temperature;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(year);
        dataOutput.writeUTF(station);
        dataOutput.writeInt(temperature);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        year = dataInput.readInt();
        station = dataInput.readUTF();
        temperature = dataInput.readInt();
    }

    @Override
    public int compareTo(YearStationTempDB o) {
        return this.year - o.year == 0 ? this.station.compareTo(o.station) : this.year - o.year;
    }
}
           

讀取-DBtoHdfsMR

public class DBtoHdfsMR extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new DBtoHdfsMR(),args);
    }

    public static class DBMapper extends Mapper<LongWritable,YearStationTempDB,LongWritable,Text>{
        @Override
        protected void map(LongWritable key, YearStationTempDB value, Context context) throws IOException, InterruptedException {
            context.write(key,new Text(value.toString()));
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf,"bdtohdfs_xj");
        job.setJarByClass(this.getClass());

        job.setMapperClass(DBMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        // 如何讓叢集加載jdbc驅動類
        // 1 将jar放入share/hadoop/yarn/下會自動上傳jar到叢集
        // 2 把jar放入叢集中lib目錄下,重新開機叢集
        // 3 job.addFileToClassPath(),需要把jar包上傳到hdfs
        //job.addFileToClassPath(new Path("hdfs://172.16.0.4:9000/data/mysql-connector-java-5.1.38.jar"));

        // 連接配接資料庫
        DBConfiguration.configureDB(job.getConfiguration(),"com.mysql.jdbc.Driver",
                "jdbc:mysql://172.16.0.100:3306/hadoop","hadoop","hadoop");


        job.setInputFormatClass(DBInputFormat.class); // 設定為DB輸入類型
        job.setOutputFormatClass(TextOutputFormat.class);

        // year = 2000是條件,表示輸入year=2000的資料
        DBInputFormat.setInput(job,YearStationTempDB.class,"station_tbl","year = 2000","","year","station","temperature");
        TextOutputFormat.setOutputPath(job,new Path(conf.get("outpath")));
        job.setNumReduceTasks(1);
        return job.waitForCompletion(true)? 0 : 1;
    }
}

           

輸入-HdfstoDBMR

問題描述:

從hdfs讀取資料,并輸出到mysql資料庫

思路分析:

  1. 準備好一個實作了WritableComparable接口的複合類型,在該類型中定義的屬性分别對應資料庫中的列名,并重寫compareTo()方法。
  2. 将該複合類型作為reduce階段輸出的key的類型即可。
  3. 讓叢集加載jdbc驅動類。
  4. 設定配置資訊,連接配接到資料庫。
  5. 将輸出類型設定為DBOutputFormat。
public class HdfstoDBMR extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new HdfstoDBMR(),args);
    }

    public static class HTDMapper extends Mapper<LongWritable,Text,YearStation,IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            NcdcRecordParser parser = new NcdcRecordParser();
            parser.parse(()->value.toString()).ifPresent(ExceptionConsumer.of(
                    p->{
                        int year = p.getYear();
                        String stationId = p.getStationId();
                        int temp = p.getAirTemperature();
                        YearStation ys = new YearStation(year+"",stationId);
                        context.write(ys,new IntWritable(temp));
                    }
            ));
        }
    }

    public static class HTDReducer extends Reducer<YearStation,IntWritable,YearStationTempDB,NullWritable>{
        @Override
        protected void reduce(YearStation key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            Stream<IntWritable> stream = StreamSupport.stream(values.spliterator(), false);
            Integer max = stream.map(s -> s.get()).reduce(0, (x, y) -> Math.max(x, y));
            int y = Integer.parseInt(key.getYear().toString());
            YearStationTempDB yst = new YearStationTempDB(y,key.getStationid().toString(),max);
            context.write(yst,NullWritable.get());
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf,"hdfstodb_xj");
        job.setJarByClass(this.getClass());

        job.setMapperClass(HTDMapper.class);
        job.setMapOutputKeyClass(YearStation.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setReducerClass(HTDReducer.class);
        job.setOutputKeyClass(YearStationTempDB.class);
        job.setOutputValueClass(NullWritable.class);

        // 如何讓叢集加載jdbc驅動類
        // 1 将jar放入share/hadoop/yarn/下會自動上傳jar到叢集
        // 2 把jar放入叢集中lib目錄下,重新開機叢集
        // 3 job.addFileToClassPath(),需要把jar包上傳到hdfs
        //job.addFileToClassPath(new Path("hdfs://172.16.0.4:9000/data/mysql-connector-java-5.1.38.jar"));

        // 連接配接資料庫
        DBConfiguration.configureDB(job.getConfiguration(),"com.mysql.jdbc.Driver",
                "jdbc:mysql://172.16.0.100:3306/hadoop","hadoop","hadoop");

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(DBOutputFormat.class);

        TextInputFormat.addInputPath(job,new Path(conf.get("inpath")));
        DBOutputFormat.setOutput(job,"max_tmp_xj","year","station","temperature");
        return job.waitForCompletion(true)? 0 : 1;
    }
}

           

輸入類型-InputFormat

常見輸入類型

  1. TextInputFormat:按行擷取字元串資料
    job.setInputFormatClass(TextInputFormat.class);
    TextInputFormat.addInputPath(job,new Path(conf.get("inpath")));
               
  2. CombineTextInputFormat:将多個輸入檔案壓縮成一個檔案,避免開啟多個map
    job.setInputFormatClass(CombineTextInputFormat.class);
    CombineFileInputFormat.addInputPath(job,new Path(conf.get("inpath")));
               
  3. KeyValueTextInputFormat:按key-value形式擷取資料
    conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ","); // 設定key-value的分割符,隻識别第一個分隔符
     job1.setInputFormatClass(KeyValueTextInputFormat.class);
     KeyValueTextInputFormat.addInputPath(job1, new Path(conf.get("input")));
               
  4. DBInputFormat:從資料庫中擷取資料
    job.setInputFormatClass(DBInputFormat.class); // 設定為DB輸入類型
    // year = 2000是條件,表示輸入year=2000的資料
    DBInputFormat.setInput(job,YearStationTempDB.class,"station_tbl","year = 2000","","year","station","temperature");
               

自定義輸入類型

思路分析:

  1. 建立一個解析類繼承RecordReader
  2. 在解析類中完成擷取資料的邏輯
  3. 建立一個自定義輸入類型的類繼承FileInputFormat
  4. 在自定義輸入類型的類中重寫方法createRecordReader()
  5. 在該方法中建立解析類的對象并調用initialize()方法進行初始化,最後傳回該對象。
  6. 完成,可在其它類中調用該自定義輸入類型

解析類-YearStationRecordReader

public class YearStationRecordReader extends RecordReader<YearStation, IntWritable> {
    private LineRecordReader reader = new LineRecordReader();
    private NcdcRecordParser parser = new NcdcRecordParser();
    private YearStation ys = new YearStation();
    private IntWritable tmp = new IntWritable();

    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
        reader.initialize(inputSplit,taskAttemptContext);
    }

    @Override
    public boolean nextKeyValue() throws IOException {
        do {
            // 判斷是否有下一個值
            if (!reader.nextKeyValue()) {
                return false;
            }
            // 擷取并解析目前值
            Text line = reader.getCurrentValue();
            parser.parse(line.toString());
        }while (!parser.isValidTemperature()); // 如果氣溫傳回值為false則繼續循環下一個
        int year = parser.getYear();
        int tmp = parser.getAirTemperature();
        String station = parser.getStationId();
        ys.setYear(new Text(year+""));
        ys.setStationid(new Text(station));
        this.tmp.set(tmp);
        return true;
    }

    @Override
    public YearStation getCurrentKey() throws IOException, InterruptedException {
        return this.ys;
    }

    @Override
    public IntWritable getCurrentValue() throws IOException, InterruptedException {
        return this.tmp;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return reader.getProgress();
    }

    @Override
    public void close() throws IOException {
        reader.close();
    }
}
           

自定義輸入類型類-YearStationInputFormat

// 利用FileInputFormat資料分片功能,實作自定義輸入類型
public class YearStationInputFormat extends FileInputFormat<YearStation, IntWritable> {
    @Override
    public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
            throws IOException {
        YearStationRecordReader reader = new YearStationRecordReader();
        reader.initialize(inputSplit,taskAttemptContext);
        return reader;
    }
}
           

複合類型-YearStation

public class YearStation implements WritableComparable<YearStation> {
    private Text year = new Text(); // 年份
    private Text stationid = new Text(); //氣象站id

    public YearStation() {
    }

    public YearStation(Text year, Text stationid) {
        this.year = new Text(year.toString());
        this.stationid = new Text(stationid.toString());
    }

    public YearStation(String year, String stationid) {
        this.year = new Text(year);
        this.stationid = new Text(stationid);
    }

    @Override
    public int compareTo(YearStation o) {
        return this.year.compareTo(o.year)==0 ? this.stationid.compareTo(o.stationid) : this.year.compareTo(o.year);
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        year.write(dataOutput);
        stationid.write(dataOutput);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        year.readFields(dataInput);
        stationid.readFields(dataInput);
    }

    public Text getYear() {
        return year;
    }

    public void setYear(Text year) {
        this.year = new Text(year.toString());
    }

    public Text getStationid() {
        return stationid;
    }

    public void setStationid(Text stationid) {
        this.stationid = new Text(stationid.toString());
    }

    @Override
    public String toString() {
        return year.toString()+"\t"+stationid.toString();
    }
}
           

測試自定義類型-MaxTmpByYearStationMR

public class MaxTmpByYearStationMR extends Configured implements Tool {
    public static class MTBYSMapper extends Mapper<YearStation, IntWritable, YearStation,IntWritable> {
        @Override
        protected void map(YearStation key, IntWritable value, Context context) throws IOException, InterruptedException {
            context.write(key,value);
        }
    }
    public static class MTBYSReducer extends Reducer<YearStation, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(YearStation key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            Optional<Integer> max = StreamSupport.stream(values.spliterator(), false)
                    .map(e -> e.get()).reduce((x, y) -> Math.max(x, y));
            context.write(new Text(key.toString()), new IntWritable(max.get()));
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "MaxTmpByYS_xj");
        job.setJarByClass(this.getClass());

        job.setMapperClass(MTBYSMapper.class);
        job.setMapOutputKeyClass(YearStation.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setReducerClass(MTBYSReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setInputFormatClass(YearStationInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        YearStationInputFormat.addInputPath(job,new Path(conf.get("inpath")));
        TextOutputFormat.setOutputPath(job,new Path(conf.get("outpath")));
        return job.waitForCompletion(true)? 0 : 1;
    }
    public static void main(String[] args) throws Exception{
        ToolRunner.run(new MaxTmpByYearStationMR(),args);
    }
}
           

JobControl

簡述:

如果MapReduce中需要用到多個job,而且多個job之間需要設定一些依賴關系,比如Job3需要依賴于Job2,Job2依賴于Job1,這就要用到JobControl。

代碼執行個體:

Job getSDJob = Job.getInstance(conf, "get_sd_job_xj");
getSDJob.setJarByClass(GetSimilarityDegree.class);
// 3 為任務裝配mapper類
getSDJob.setMapperClass(GetSimilarityDegree.GSDMapper.class);
getSDJob.setMapOutputKeyClass(Text.class);
getSDJob.setMapOutputValueClass(DoubleWritable.class);
// 5 配置資料輸入路徑
TextInputFormat.addInputPath(getSDJob, new Path("src/train_bin"));
// 6 配置結果輸出路徑
TextOutputFormat.setOutputPath(getSDJob, new Path("src/name_sd"));

Job sortBySDJob = Job.getInstance(conf, "sortBySDJob");
sortBySDJob.setJarByClass(SortedByDegree.class);
// 3 為任務裝配mapper類
sortBySDJob.setMapperClass(SortedByDegree.SBDMapper.class);
sortBySDJob.setMapOutputKeyClass(TagDegree.class);
sortBySDJob.setMapOutputValueClass(NullWritable.class);
// 4 為任務裝配reducer類
sortBySDJob.setReducerClass(SortedByDegree.SBDReducer.class);
sortBySDJob.setOutputKeyClass(Text.class);
sortBySDJob.setOutputValueClass(DoubleWritable.class);
// 5 配置資料輸入路徑
TextInputFormat.addInputPath(sortBySDJob, new Path("src/name_sd"));
// 6 配置結果輸出路徑
TextOutputFormat.setOutputPath(sortBySDJob, new Path("src/name_sd_sorted"));

Job getFKJob = Job.getInstance(conf, "getFKJob");
getFKJob.setJarByClass(GetFirstK.class);
// 3 為任務裝配mapper類
getFKJob.setMapperClass(GetFirstK.GFKMapper.class);
getFKJob.setMapOutputKeyClass(TagDegree.class);
getFKJob.setMapOutputValueClass(IntWritable.class);
// 4 為任務裝配reducer類
getFKJob.setReducerClass(GetFirstK.GFKReducer.class);
getFKJob.setOutputKeyClass(Text.class);
getFKJob.setOutputValueClass(Text.class);
// 5 配置資料輸入路徑
TextInputFormat.addInputPath(getFKJob, new Path("src/name_sd_sorted"));
// 6 配置結果輸出路徑
TextOutputFormat.setOutputPath(getFKJob, new Path("src/gfk_res"));
getFKJob.setGroupingComparatorClass(GFKGroupComparator.class);

Job getLRJob = Job.getInstance(conf, "getLRJob");
getLRJob.setJarByClass(GetLastResult.class);
// 3 為任務裝配mapper類
getLRJob.setMapperClass(GetLastResult.GLRMapper.class);
getLRJob.setMapOutputKeyClass(Text.class);
getLRJob.setMapOutputValueClass(TagAvgNum.class);
// 4 為任務裝配reducer類
getLRJob.setReducerClass(GetLastResult.GLRReducer.class);
getLRJob.setOutputKeyClass(Text.class);
getLRJob.setOutputValueClass(NullWritable.class);
// 5 配置資料輸入路徑
TextInputFormat.addInputPath(getLRJob, new Path("src/gfk_res"));
// 6 配置結果輸出路徑
TextOutputFormat.setOutputPath(getLRJob, new Path("src/last_res"));

ControlledJob getSD = new ControlledJob(getSDJob.getConfiguration());
ControlledJob sortBySD = new ControlledJob(sortBySDJob.getConfiguration());
ControlledJob getFK = new ControlledJob(getFKJob.getConfiguration());
ControlledJob getLR = new ControlledJob(getLRJob.getConfiguration());
// 添加依賴
getLR.addDependingJob(getFK);
getFK.addDependingJob(sortBySD);
sortBySD.addDependingJob(getSD);

JobControl con = new JobControl("test");
con.addJob(getSD);
con.addJob(sortBySD);
con.addJob(getFK);
con.addJob(getLR);

Thread t = new Thread(con);
t.start();

while (true) {
    if (con.allFinished()) {
        System.out.println("圖檔識别完畢,請檢視結果");
        System.exit(0);
    }
}
           

繼續閱讀