天天看點

MapReduce的join操作MapReduce的join操作

MapReduce的join操作

orders表
+-------------------+-------------+------+-----+---------+----------------+
| Field             | Type        | Null | Key | Default | Extra          |
+-------------------+-------------+------+-----+---------+----------------+
| order_id          | int(11)     | NO   | PRI | NULL    | auto_increment |
| order_date        | datetime    | NO   |     | NULL    |                |
| order_customer_id | int(11)     | NO   |     | NULL    |                |
| order_status      | varchar(45) | NO   |     | NULL    |                |
+-------------------+-------------+------+-----+---------+----------------+
customers表
+-------------------+--------------+------+-----+---------+----------------+
| Field             | Type         | Null | Key | Default | Extra          |
+-------------------+--------------+------+-----+---------+----------------+
| customer_id       | int(11)      | NO   | PRI | NULL    | auto_increment |
| customer_fname    | varchar(45)  | NO   |     | NULL    |                |
| customer_lname    | varchar(45)  | NO   |     | NULL    |                |
| customer_email    | varchar(45)  | NO   |     | NULL    |                |
| customer_password | varchar(45)  | NO   |     | NULL    |                |
| customer_street   | varchar(255) | NO   |     | NULL    |                |
| customer_city     | varchar(45)  | NO   |     | NULL    |                |
| customer_state    | varchar(45)  | NO   |     | NULL    |                |
| customer_zipcode  | varchar(45)  | NO   |     | NULL    |                |
+-------------------+--------------+------+-----+---------+----------------+
           

兩張表中都有customer_id,故将customer_id作為key比較合适

Reduce階段的join處理

mapper階段通過擷取inputSplit對象的path屬性分辨出切割的資料來自哪個資料源,以做不同的處理生成中間值.

在reduce階段将key值相同的中間值聚合為最後的對象

實作:

實體類CustomerOrders ,作為最後輸出的value,也是中間值的value.

package com.orderAndCustomer.entry;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class CustomerOrders implements Writable {
    private String customer_id;
    private String orderId;
    private String name;
    private String orderStatus;
//    标簽,用于區分不同資料源的對象
    private String flag;

    public String getCustomer_id() {
        return customer_id;
    }

    public CustomerOrders setCustomer_id(String customer_id) {
        this.customer_id = customer_id;
        return this;
    }

    public String getOrderId() {
        return orderId;
    }

    public CustomerOrders setOrderId(String orderId) {
        this.orderId = orderId;
        return this;
    }

    public String getName() {
        return name;
    }

    public CustomerOrders setName(String name) {
        this.name = name;
        return this;
    }

    public String getOrderStatus() {
        return orderStatus;
    }

    public CustomerOrders setOrderStatus(String orderStatus) {
        this.orderStatus = orderStatus;
        return this;
    }

    public String getFlag() {
        return flag;
    }

    public CustomerOrders setFlag(String flag) {
        this.flag = flag;
        return this;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.customer_id);
        out.writeUTF(this.orderId);
        out.writeUTF(this.name);
        out.writeUTF(this.orderStatus);
        out.writeUTF(this.flag);
    }

    @Override
    public String toString() {
        return customer_id + '\t' +
                orderId + '\t' +
                name + '\t' +
                orderStatus + '\t' +
                flag + '\'';
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.customer_id=in.readUTF();
        this.orderId=in.readUTF();
        this.name=in.readUTF();
        this.orderId=in.readUTF();
        this.flag=in.readUTF();
    }
}
           

mapper:

要注意的是:

inputSplit的導入包是org.apache.hadoop.mapreduce.lib.input.FileSplit;

package com.orderAndCustomer.mapper;

import com.orderAndCustomer.entry.CustomerOrders;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class ReduceJoinMapper extends Mapper<LongWritable, Text,Text, CustomerOrders> {
//    源檔案路徑
    String name="";
//    value
    CustomerOrders customerOrders =new CustomerOrders();
//    key
    Text text=new Text();
    //一個maptask會先執行setup然後開始循環周遊每一個map
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
//        擷取這個map的InputSplit對象
        FileSplit inputSplit= (FileSplit) context.getInputSplit();
//        擷取資料原檔案的名稱
        name = inputSplit.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//        連個源檔案都是","為分割符
        String[] split = value.toString().split(",");
//        order.csv的處理邏輯,标簽為1
        if (name.startsWith("order")){
            customerOrders.setOrderId(split[0]);
            customerOrders.setOrderStatus(split[3]);
            customerOrders.setCustomer_id(split[2]);
            customerOrders.setFlag("1");
            customerOrders.setName("");
        }else {         //customer的處理邏輯,标簽為2
            customerOrders.setCustomer_id(split[0]);
            customerOrders.setName(split[1]);
            customerOrders.setFlag("0");
            customerOrders.setOrderId("");
            customerOrders.setOrderStatus("");
            customerOrders.setCustomer_id("");
        }
//        customer_id作為key
        text.set(customerOrders.getCustomer_id());
        context.write(text, customerOrders);
    }
}
           

reducer:

package com.orderAndCustomer.reduce;

import com.orderAndCustomer.entry.CustomerOrders;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;

//CustomerOrders作為輸出key,也是最終顯示的内容,value并不重要,是以value可以用NullWritable
public class ReduceJoinReducer extends Reducer<Text, CustomerOrders, CustomerOrders, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<CustomerOrders> values, Context context) throws IOException, InterruptedException {
//        customer表的字段與order表的内用是一對多的關系
//        customer表的對象cuBean
        CustomerOrders cuBean=new CustomerOrders();
//        存放order表對象
        List<CustomerOrders> orderBeans=new ArrayList<>();
//周遊values,标簽為1的存放在orderBeans中
        for (CustomerOrders bean:values
             ) {
            if ("1".equals(bean.getFlag())){
                CustomerOrders orderBean=new CustomerOrders();
                try {
                    BeanUtils.copyProperties(orderBean,bean);
                    orderBeans.add(orderBean);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }else {
                try {
                    BeanUtils.copyProperties(cuBean,bean);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }
//        為每一個order添加上對應id的customer表中的内容
        for (CustomerOrders orderBean:orderBeans){
            orderBean.setName(cuBean.getName());
            context.write(orderBean,NullWritable.get());
        }
    }
}
           

driver

package com.orderAndCustomer.driver;

import com.orderAndCustomer.entry.CustomerOrders;
import com.orderAndCustomer.mapper.ReduceJoinMapper;
import com.orderAndCustomer.reduce.ReduceJoinReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class ReduceJoinDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        Configuration configuration=new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(ReduceJoinDriver.class);

        job.setMapperClass(ReduceJoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CustomerOrders.class);

        job.setReducerClass(ReduceJoinReducer.class);
        job.setOutputKeyClass(CustomerOrders.class);
        job.setOutputValueClass(NullWritable.class);


        FileInputFormat.addInputPath(job,new Path("/home/jarvis/Desktop/data"));
        Path path=new Path("/home/jarvis/Desktop/orderAndCus");
//        判斷檔案是否存在,存在則删除
        FileSystem fs =FileSystem.get(new URI(path.toString()),configuration);
        if (fs.exists(path)){
            fs.delete(path,true);
        }
        FileOutputFormat.setOutputPath(job,path);
        job.waitForCompletion(true);
    }
}
           

結論:會造成map和reduce端資料傳輸時(shuffle階段)出現大量的資料傳輸,效率很低.而且合并的操作是在reduce階段完成的,reduce端處理壓力太大,map節點的運算負載則很低,資源使用率不高,且在reduce階段極易産生資料傾斜

解決方案:map端實作資料合并

Mapper階段的Join操作

order表有68883條資料

customer表有12435條資料

在map端緩存多張表,提前處理業務邏輯,增加map端的業務,減少reduce端資料的壓力,盡可能的減少資料傾斜.

  • 具體方法:
    1. 在驅動函數中加載緩存:
      job.addCacheFile(new URI(""));	//緩存普通檔案到task運作節點
                 
    2. 在mapper的setup階段,将檔案讀取到緩存集合中
      URI[] cacheFiles = context.getCacheFiles();
                 

mapper:

package com.orderAndCustomer.mapper;

import com.orderAndCustomer.entry.CustomerOrders;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.*;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

public class MapperJoinMapper extends Mapper<LongWritable, Text, NullWritable, CustomerOrders> {
    Map<String,String> cusMap =new HashMap<>();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
//        從緩存中讀取檔案
        URI[] cacheFiles = context.getCacheFiles();
//        存放檔案路徑
        String filename=new Path(cacheFiles[0]).getName();
//        讀取檔案
        BufferedReader bufferedReader =new BufferedReader(new InputStreamReader(new FileInputStream(filename)));
        //        存放從檔案中讀取的每一行内容
        String line;
//        周遊緩存檔案
        while(StringUtils.isNotEmpty(line=bufferedReader.readLine())){
            String [] split=line.split(",");
//            将id與name作為鍵值對存貯在cusMap中
            cusMap.put(split[0],split[1]);
    }
//        關流
        bufferedReader.close();
    }
//最後顯示的内容對象
    CustomerOrders customerOrders =new CustomerOrders();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//        value都是order表的資料,切分後添加上對應id的customer資料生成最終對象,不需要reduce階段
        String[] fields =value.toString().split(",");
        customerOrders.setCustomer_id(fields[2]);
        customerOrders.setOrderId(fields[0]);
        customerOrders.setOrderStatus(fields[3]);
        customerOrders.setName(cusMap.get(fields[2]));
        customerOrders.setFlag("1");
        context.write(NullWritable.get(), customerOrders);

    }
}
           

reduce

沒有reduce
           

driver

package com.orderAndCustomer.driver;

import com.orderAndCustomer.entry.CustomerOrders;
import com.orderAndCustomer.mapper.MapperJoinMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class MapJoinDriver {
    public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(MapJoinDriver.class);

        job.setMapperClass(MapperJoinMapper.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(CustomerOrders.class);

        job.addCacheFile(new URI("/home/jarvis/Desktop/data/customers.csv"));
//輸入
        FileInputFormat.setInputPaths(job,new Path("/home/jarvis/Desktop/data/orders.csv"));
        Path path =new Path("/home/jarvis/Desktop/orderAndCus");
//        判斷檔案是否存在,存在則删除
        FileSystem fs =FileSystem.get(new URI(path.toString()),configuration);
        if (fs.exists(path)){
            fs.delete(path,true);
        }
//        輸出
        FileOutputFormat.setOutputPath(job,path);
        job.waitForCompletion(true);

    }
}
           

繼續閱讀