Hadoop 中的join分為三種
- Reduce端join,适合于兩個大表
- Map端join,适合一個大表和一個小表,小表放到 Distribute Cache裡面
- semi join 當join隻用到其中一個表中的一小部分時
Reduce端join
- 讀入兩個大表,對value按檔案進行标記
- 在Reduce端收集屬于不同檔案的value到不同的list,對同一key的不同list中的value做笛卡爾積
- Logger 用來記錄錯誤
- Counter 用來記數想要的一些資料
- configuration context用來傳遞資料
public class ReduceJoin {
private static final String DELIMITER = "\\s+";
private static final Logger LOG = Logger.getLogger(ReduceJoin.class);
public static class JoinMapper extends Mapper<Object, Text, Text, Text> {
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
String path = split.getPath().toString();
Configuration conf = context.getConfiguration();
String t1 = conf.get("t1FileName");
String t2 = conf.get("t2FileName");
String line = value.toString();
if (line == null || line.trim().equals("")) {
return;
}
String[] values = line.split(DELIMITER);
if (path.contains(t1)) {
if (values.length != 2) {
LOG.error("t1 Number of Fields Error");
return;
}
context.getCounter("MapStage", "t1 read records").increment(1);
context.write(new Text(values[0]), new Text("u#" + values[1]));
} else if (path.contains(t2)) {
if (values.length != 4) {
LOG.error("t2 Number of Fields Error");
return;
}
context.getCounter("MapStage", "t2 read records").increment(1);
context.write(new Text(values[0]), new Text("l#" + values[2] + "\t" + values[3]));
} else {
context.getCounter("MapStage", "map filtered records").increment(1);
}
}
}
public static class JoinReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
List<String> t1 = new ArrayList<String>();
List<String> t2 = new ArrayList<String>();
for (Text value : values) {
String[] fields = value.toString().split("#");
if (fields.length != 2) {
continue;
}
if (fields[0].equals("u")) {
t1.add(fields[1]);
} else if (fields[0].equals("l")) {
t2.add(fields[1]);
} else {
continue;
}
}
for (String it1 : t1) {
for (String it2 : t2) {
context.write(key, new Text(it1 + "\t" + it2));
}
}
}
}
public static void main(String[] args) throws Exception {
if (args.length != 4) {
return;
}
Configuration conf = new Configuration();
conf.set("t1FileName", args[2]);
conf.set("t2FileName", args[3]);
Job job = new Job(conf, "join");
job.setJarByClass(ReduceJoin.class);
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Map端join
- 适用于一大一小兩個表
- 小表裝進Distribute Cache裡
public class MapJoin {
private static final Logger LOG = Logger.getLogger(MapJoin.class);
protected static class MapJoinMapper extends Mapper<Object,Text,Text,Text>{
private Map<String,String> map = new HashMap<String,String>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
BufferedReader br = new BufferedReader(new FileReader("t1"));
String line = null;
while((line = br.readLine())!=null){
if(line == null || line.equals("")){
return;
}
String[] fields = line.split("\\s+");
if(fields.length!=2){
context.getCounter("MapStage","Input Record Fields Count Error").increment(1);
return;
}
map.put(fields[0], fields[1]);
}
br.close();
}
@Override
protected void map(Object key, Text value, Context context)throws IOException, InterruptedException {
String line = value.toString();
if(line == null || line.equals("")){
return;
}
String[] fields = line.split("\\s+");
if(fields.length!=4){
context.getCounter("ReduceStage","Map output Record Fields Count Error").increment(1);
}
if(map.containsKey(fields[0])){
context.write(new Text(fields[0]), new Text(map.get(fields[0])+"\t"+fields[2]+"\t"+fields[3]));
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
DistributedCache.addCacheFile(new URI("hdfs://namenode/user/zhanghu/cache/t1#t1"), conf);
Job job = new Job(conf,"MapJoin");
job.setJarByClass(MapJoin.class);
job.setMapperClass(MapJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
Semi Join
- 在map端進行資料過濾,隻傳輸參與join的資料,減少shuffle階段網絡傳輸量
- 前提是存在于Logs中的UserId字段可以被放入到Cache中
- 實作方法
- 首先對右表中的UserId字段進行去重,儲存在UniqueUsers
- 利用DistributeCache去除User表中UserId不在右表中的資料
/**
* 去重
**/
public class RemoveDuplicates {
public static class RemoveDuplicatesMapper extends Mapper<Object, Text, Text, NullWritable> {
Set<Text> set = new HashSet<Text>();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\\s+");
if (fields.length != 4) {
return;
}
set.add(new Text(fields[0]));
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Text value : set) {
context.write(value, NullWritable.get());
}
}
}
public static class RemoveDuplicatesReducer extends Reducer<Text, Text, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "RemoveDuplicates");
job.setJarByClass(RemoveDuplicates.class);
job.setMapperClass(RemoveDuplicatesMapper.class);
job.setReducerClass(RemoveDuplicatesReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
/**
* 連接配接,除了去除不在右表中的User外與ReduceJoin一樣
**/
public class SemiJoin {
public static class SemiJoinMapper extends Mapper<Object,Text,Text,Text>{
Set<String> set = new HashSet<String>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
BufferedReader br = new BufferedReader(new FileReader("UniqueUsers"));
String line = null;
while((line = br.readLine()) != null){
if(!line.trim().equals("")){
set.add(line.trim());
}
}
br.close();
}
@Override
protected void map(Object key, Text value, Context context)throws IOException, InterruptedException {
FileSplit split = (FileSplit)context.getInputSplit();
String path = split.getPath().toString();
String line = value.toString();
String[] fields = line.split("\\s+");
if(path.contains("t1")){
if(fields.length!=2){
return;
}
if(set.contains(fields[0])){
context.write(new Text(fields[0]), new Text("u#"+fields[1]));
}
}else if(path.contains("t2")){
if(fields.length!=4){
return;
}
context.write(new Text(fields[0]), new Text("l#"+fields[2]+"\t"+fields[3]));
}else{
context.getCounter("MapStage","Invalid Records").increment(1);
}
}
}
public static class SemiJoinReducer extends Reducer<Text,Text,Text,Text>{
private List<String> listT1 = new ArrayList<String>();
private List<String> listT2 = new ArrayList<String>();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
for(Text value:values){
String[] fields = value.toString().split("#");
if("u".equals(fields[0])){
listT1.add(fields[1]);
}
if("l".equals(fields[0])){
listT2.add(fields[1]);
}
}
for(String t1:listT1){
for(String t2:listT2){
context.write(key, new Text(t1+"\t"+t2));
}
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
DistributedCache.addCacheFile(new URI("/user/zhanghu/cache/UniqueUsers#UniqueUsers"),conf);
Job job = new Job(conf,"SemiJoin");
job.setJarByClass(SemiJoin.class);
job.setMapperClass(SemiJoinMapper.class);
job.setReducerClass(SemiJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
改進方案
- 第二步中還是用到了ReduceJoin是以還是需要傳輸較多資料
- 前提經過過濾後的使用者表可以被全部放入到cache中
- 實作方案
- 對右表中的UserID字段進行去重,儲存在UniquUsers中
- 以UniqueUsers作為cache對Users表進行過濾,得到FilteredUsers
- 以FiltereddUsers作為cache,與UserLog進行Map端連接配接
- 改進方案的特點
- 優點:三個步驟全部隻有Map,沒有Shuffle階段,完全并行
- 缺點:需要啟動三個作業,且要多次讀入Cache,如果Cache比較大得不償失