天天看点

寻找共同好友(hadoop解决方案)

测试数据

100,200 300 400 500 600 700 800
200,100 300 400 500 700
300,100 200 400 500 600 700
400,100 200 300 700 800
500,100 200 300 700 800
600,100 300
700,100 200 300 400 500 800
800,100 400 500 700
           

注意:测试数据中,逗号“,”前为一个用户的标识,逗号“,”后为该用户的好友。而且,用户与好友之间是互为好友的,例如:100的用户中有好友200,则200的用户中一定有好友100

分析

要找到共同好友,需要把每两个用户的好友都进行比较。map阶段就是把每两个用户的好友都找出来;reducer阶段就是找出每两个用户的共同好友。

map阶段:把每一行的数据中,用户和他的一个好友作为key,以他的所有好友作为value,例如用户100,他的map输出为:

100,200 200 300 400 500 600 700 800
100,300 200 300 400 500 600 700 800
100,400 200 300 400 500 600 700 800
100,500 200 300 400 500 600 700 800
100,600 200 300 400 500 600 700 800
100,700 200 300 400 500 600 700 800
100,800 200 300 400 500 600 700 800
           

最终,map的输出为:(key值的两个用户标识按从小到大的顺序排列)

,       
,       
,       
,       
,       
,       
,       

,     
,     
,     
,     
,     

,      
,      
,      
,      
,      
,      
.
..
...
           

此时可以看出,存在相同的key值,而对应的value分别是key值中两个用户的好友,这样就构建起两个用户的好友在一条数据里,此时,进行shuffle过程中的分组,合并,不需要我们来写代码完成,默认的方式即可。这样,reduce的最终输入为:

, (      ),(    )
, (      ),(     )
.
..
...
           

reduce阶段:比较每一行数据中的两组值,找出相同标识的用户,即为key值中两个用户的共同好友

解决方案

map阶段

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SameFrMapper extends Mapper<LongWritable, Text, Text, Text>{

    @Override
    protected void map(LongWritable key, Text value,Context context)
        throws IOException, InterruptedException {
        String line = value.toString();
        String[] st = line.split(",");
        String[] friends = st[].split("\t");
        String friendsline = st[].toString();
        for (int i = ; i < friends.length; i++) {
            context.write(bulidSortedKey(st[],friends[i]), new Text(friendsline));
        }
    }
    //将比较的两个用户由小到大排序
    protected Text bulidSortedKey(String user1,String user2){
        int u1 = Integer.parseInt(user1);
        int u2 = Integer.parseInt(user2);
        if(u1 < u2){
            return new Text(user1 + "," + user2);
        }else{
            return new Text(user2 + "," + user1);
        }
    }
}
           

reduce阶段

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SameFrReducer extends Reducer<Text, Text, Text, Text>{

    @Override
    protected void reduce(Text key, Iterable<Text> value,Context context) throws IOException,
        InterruptedException {
         List<String> list = new ArrayList<String>();
         Set<String> user1set = new TreeSet<String>();
         Set<String> user2set = new TreeSet<String>();
         for (Text t : value) {
             list.add(t.toString());
        }
        String[] user1list = list.get().split("\t");
        String[] user2list = list.get().split("\t");
        for (int i = ; i < user1list.length; i++) {
            user1set.add(user1list[i]);
        }
        for (int i = ; i < user2list.length; i++) {
            user2set.add(user2list[i]);
        }
        Set<String> result = intersect(user1set, user2set);
        Iterator<String> it = result.iterator();
        StringBuffer sb = new StringBuffer();
        while (it.hasNext()) {
            sb.append(it.next().toString()+"\t");           
        }
        context.write(key,new Text(sb.toString()));
    }

    //迭代比较寻找共同好友
    protected Set<String> intersect(Set<String> smallset,Set<String> largeset){
            Set<String> result = new TreeSet<String>();
            //迭代处理小集合来提高性能
            for (String x : smallset) {
                if(largeset.contains(x)){
                    result.add(x);
                }
            }
            return result;
        }
}
           

eclipse集成hadoop插件测试

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.bj.samefriends.SameFrMapper;
import com.bj.samefriends.SameFrReducer;

public class JobsRun {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://node1:9000");
        conf.set("mapred.job.tracker", "node1:9001");
        conf.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\sameFriends.jar");
        try {
            Job job = new Job(conf);
            job.setJarByClass(JobsRun.class);
            job.setMapperClass(SameFrMapper.class);
            job.setReducerClass(SameFrReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setNumReduceTasks();

            FileInputFormat.addInputPath(job, new Path("/user/root/input/"));
            FileOutputFormat.setOutputPath(job, new Path("/user/root/output/"));

            System.exit(job.waitForCompletion(true) ?  : );
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
           

输出结果为:

100,200 300 400 500 700 
100,400 200 300 700 800 
100,600 300 
100,800 400 500 700 
200,300 100 400 500 700 
200,500 100 300 700 
200,700 100 300 400 500 
300,400 100 200 700 
300,600 100 
400,700 100 200 300 800 
500,800 100 700 
700,800 100 400 500 
100,300 200 400 500 600 700 
100,500 200 300 700 800 
100,700 200 300 400 500 800 
200,400 100 300 700 
300,500 100 200 700 
300,700 100 200 400 500 
400,800 100 700 
500,700 100 200 300 800 
           

继续阅读