天天看點

Hadoop Join

Hadoop 中的join分為三種

  • Reduce端join,适合于兩個大表
  • Map端join,适合一個大表和一個小表,小表放到 Distribute Cache裡面
  • semi join 當join隻用到其中一個表中的一小部分時

Reduce端join

  • 讀入兩個大表,對value按檔案進行标記
  • 在Reduce端收集屬于不同檔案的value到不同的list,對同一key的不同list中的value做笛卡爾積
  • Logger 用來記錄錯誤
  • Counter 用來記數想要的一些資料
  • configuration context用來傳遞資料
public class ReduceJoin {
    private static final String DELIMITER = "\\s+";
    private static final Logger LOG = Logger.getLogger(ReduceJoin.class);
    public static class JoinMapper extends Mapper<Object, Text, Text, Text> {
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            String path = split.getPath().toString();
            Configuration conf = context.getConfiguration();
            String t1 = conf.get("t1FileName");
            String t2 = conf.get("t2FileName");
            String line = value.toString();
            if (line == null || line.trim().equals("")) {
                return;
            }
            String[] values = line.split(DELIMITER);
            if (path.contains(t1)) {
                if (values.length != 2) {
                    LOG.error("t1 Number of Fields Error");
                    return;
                }
                context.getCounter("MapStage", "t1 read records").increment(1);
                context.write(new Text(values[0]), new Text("u#" + values[1]));
            } else if (path.contains(t2)) {
                if (values.length != 4) {
                    LOG.error("t2 Number of Fields Error");
                    return;
                }
                context.getCounter("MapStage", "t2 read records").increment(1);
                context.write(new Text(values[0]), new Text("l#" + values[2] + "\t" + values[3]));
            } else {
                context.getCounter("MapStage", "map filtered records").increment(1);
            }
        }
    }
    public static class JoinReducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
                InterruptedException {
            List<String> t1 = new ArrayList<String>();
            List<String> t2 = new ArrayList<String>();
            for (Text value : values) {
                String[] fields = value.toString().split("#");
                if (fields.length != 2) {
                    continue;
                }
                if (fields[0].equals("u")) {
                    t1.add(fields[1]);
                } else if (fields[0].equals("l")) {
                    t2.add(fields[1]);
                } else {
                    continue;
                }
            }
            for (String it1 : t1) {
                for (String it2 : t2) {
                    context.write(key, new Text(it1 + "\t" + it2));
                }
            }
        }
    }
    public static void main(String[] args) throws Exception {
        if (args.length != 4) {
            return;
        }
        Configuration conf = new Configuration();
        conf.set("t1FileName", args[2]);
        conf.set("t2FileName", args[3]);
        Job job = new Job(conf, "join");
        job.setJarByClass(ReduceJoin.class);
        job.setMapperClass(JoinMapper.class);
        job.setReducerClass(JoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}             

Map端join

  • 适用于一大一小兩個表
  • 小表裝進Distribute Cache裡
public class MapJoin {
    private static final Logger LOG = Logger.getLogger(MapJoin.class);

    protected static class MapJoinMapper extends Mapper<Object,Text,Text,Text>{
        private Map<String,String> map = new HashMap<String,String>();
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            BufferedReader br = new BufferedReader(new FileReader("t1"));
            String line = null;
            while((line = br.readLine())!=null){
                if(line == null || line.equals("")){
                    return;
                }
                String[] fields = line.split("\\s+");
                if(fields.length!=2){
                    context.getCounter("MapStage","Input Record Fields Count Error").increment(1);
                    return;
                }
                map.put(fields[0], fields[1]);
            }
            br.close();
        }
        @Override
        protected void map(Object key, Text value, Context context)throws IOException, InterruptedException {
            String line = value.toString();
            if(line == null || line.equals("")){
                return;
            }
            String[] fields = line.split("\\s+");
            if(fields.length!=4){
                context.getCounter("ReduceStage","Map output Record Fields Count Error").increment(1);
            }
            if(map.containsKey(fields[0])){
                context.write(new Text(fields[0]), new Text(map.get(fields[0])+"\t"+fields[2]+"\t"+fields[3]));
            }
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        DistributedCache.addCacheFile(new URI("hdfs://namenode/user/zhanghu/cache/t1#t1"), conf);
        Job job = new Job(conf,"MapJoin");
        job.setJarByClass(MapJoin.class);
        job.setMapperClass(MapJoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}             

Semi Join

  • 在map端進行資料過濾,隻傳輸參與join的資料,減少shuffle階段網絡傳輸量
  • 前提是存在于Logs中的UserId字段可以被放入到Cache中
  • 實作方法
    • 首先對右表中的UserId字段進行去重,儲存在UniqueUsers
    • 利用DistributeCache去除User表中UserId不在右表中的資料
/**
* 去重
**/
public class RemoveDuplicates {
    public static class RemoveDuplicatesMapper extends Mapper<Object, Text, Text, NullWritable> {
        Set<Text> set = new HashSet<Text>();
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split("\\s+");
            if (fields.length != 4) {
                return;
            }
            set.add(new Text(fields[0]));
        }
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            for (Text value : set) {
                context.write(value, NullWritable.get());
            }
        }
    }
    public static class RemoveDuplicatesReducer extends Reducer<Text, Text, Text, NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
                InterruptedException {
            context.write(key, NullWritable.get());
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "RemoveDuplicates");
        job.setJarByClass(RemoveDuplicates.class);
        job.setMapperClass(RemoveDuplicatesMapper.class);
        job.setReducerClass(RemoveDuplicatesReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}
/**
* 連接配接,除了去除不在右表中的User外與ReduceJoin一樣
**/
public class SemiJoin {
    public static class SemiJoinMapper extends Mapper<Object,Text,Text,Text>{
        Set<String> set = new HashSet<String>();
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            BufferedReader br = new BufferedReader(new FileReader("UniqueUsers"));
            String line = null;
            while((line = br.readLine()) != null){
                if(!line.trim().equals("")){
                    set.add(line.trim());
                }
            }
            br.close();
        }
        @Override
        protected void map(Object key, Text value, Context context)throws IOException, InterruptedException {
            FileSplit split = (FileSplit)context.getInputSplit();
            String path = split.getPath().toString();
            String line = value.toString();
            String[] fields = line.split("\\s+");
            if(path.contains("t1")){
                if(fields.length!=2){
                    return;
                }
                if(set.contains(fields[0])){
                    context.write(new Text(fields[0]), new Text("u#"+fields[1]));
                }
            }else if(path.contains("t2")){
                if(fields.length!=4){
                    return;
                }
                context.write(new Text(fields[0]), new Text("l#"+fields[2]+"\t"+fields[3]));
            }else{
                context.getCounter("MapStage","Invalid Records").increment(1);
            }
        }
    }

    public static class SemiJoinReducer extends Reducer<Text,Text,Text,Text>{
        private List<String> listT1 = new ArrayList<String>();
        private List<String> listT2    = new ArrayList<String>();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
            for(Text value:values){
                String[] fields = value.toString().split("#");
                if("u".equals(fields[0])){
                    listT1.add(fields[1]);
                }
                if("l".equals(fields[0])){
                    listT2.add(fields[1]);
                }
            }
            for(String t1:listT1){
                for(String t2:listT2){
                    context.write(key, new Text(t1+"\t"+t2));
                }
            }
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        DistributedCache.addCacheFile(new URI("/user/zhanghu/cache/UniqueUsers#UniqueUsers"),conf);
        Job job = new Job(conf,"SemiJoin");
        job.setJarByClass(SemiJoin.class);
        job.setMapperClass(SemiJoinMapper.class);
        job.setReducerClass(SemiJoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}           

改進方案

  • 第二步中還是用到了ReduceJoin是以還是需要傳輸較多資料
  • 前提經過過濾後的使用者表可以被全部放入到cache中
  • 實作方案
    • 對右表中的UserID字段進行去重,儲存在UniquUsers中
    • 以UniqueUsers作為cache對Users表進行過濾,得到FilteredUsers
    • 以FiltereddUsers作為cache,與UserLog進行Map端連接配接
  • 改進方案的特點
    • 優點:三個步驟全部隻有Map,沒有Shuffle階段,完全并行
    • 缺點:需要啟動三個作業,且要多次讀入Cache,如果Cache比較大得不償失

繼續閱讀