MapReduce的join操作
orders表
+-------------------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------------+-------------+------+-----+---------+----------------+
| order_id | int(11) | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| order_customer_id | int(11) | NO | | NULL | |
| order_status | varchar(45) | NO | | NULL | |
+-------------------+-------------+------+-----+---------+----------------+
customers表
+-------------------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------------+--------------+------+-----+---------+----------------+
| customer_id | int(11) | NO | PRI | NULL | auto_increment |
| customer_fname | varchar(45) | NO | | NULL | |
| customer_lname | varchar(45) | NO | | NULL | |
| customer_email | varchar(45) | NO | | NULL | |
| customer_password | varchar(45) | NO | | NULL | |
| customer_street | varchar(255) | NO | | NULL | |
| customer_city | varchar(45) | NO | | NULL | |
| customer_state | varchar(45) | NO | | NULL | |
| customer_zipcode | varchar(45) | NO | | NULL | |
+-------------------+--------------+------+-----+---------+----------------+
兩張表中都有customer_id,故将customer_id作為key比較合适
Reduce階段的join處理
mapper階段通過擷取inputSplit對象的path屬性分辨出切割的資料來自哪個資料源,以做不同的處理生成中間值.
在reduce階段将key值相同的中間值聚合為最後的對象
實作:
實體類CustomerOrders ,作為最後輸出的value,也是中間值的value.
package com.orderAndCustomer.entry;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class CustomerOrders implements Writable {
private String customer_id;
private String orderId;
private String name;
private String orderStatus;
// 标簽,用于區分不同資料源的對象
private String flag;
public String getCustomer_id() {
return customer_id;
}
public CustomerOrders setCustomer_id(String customer_id) {
this.customer_id = customer_id;
return this;
}
public String getOrderId() {
return orderId;
}
public CustomerOrders setOrderId(String orderId) {
this.orderId = orderId;
return this;
}
public String getName() {
return name;
}
public CustomerOrders setName(String name) {
this.name = name;
return this;
}
public String getOrderStatus() {
return orderStatus;
}
public CustomerOrders setOrderStatus(String orderStatus) {
this.orderStatus = orderStatus;
return this;
}
public String getFlag() {
return flag;
}
public CustomerOrders setFlag(String flag) {
this.flag = flag;
return this;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.customer_id);
out.writeUTF(this.orderId);
out.writeUTF(this.name);
out.writeUTF(this.orderStatus);
out.writeUTF(this.flag);
}
@Override
public String toString() {
return customer_id + '\t' +
orderId + '\t' +
name + '\t' +
orderStatus + '\t' +
flag + '\'';
}
@Override
public void readFields(DataInput in) throws IOException {
this.customer_id=in.readUTF();
this.orderId=in.readUTF();
this.name=in.readUTF();
this.orderId=in.readUTF();
this.flag=in.readUTF();
}
}
mapper:
要注意的是:
inputSplit的導入包是org.apache.hadoop.mapreduce.lib.input.FileSplit;
package com.orderAndCustomer.mapper;
import com.orderAndCustomer.entry.CustomerOrders;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class ReduceJoinMapper extends Mapper<LongWritable, Text,Text, CustomerOrders> {
// 源檔案路徑
String name="";
// value
CustomerOrders customerOrders =new CustomerOrders();
// key
Text text=new Text();
//一個maptask會先執行setup然後開始循環周遊每一個map
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 擷取這個map的InputSplit對象
FileSplit inputSplit= (FileSplit) context.getInputSplit();
// 擷取資料原檔案的名稱
name = inputSplit.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 連個源檔案都是","為分割符
String[] split = value.toString().split(",");
// order.csv的處理邏輯,标簽為1
if (name.startsWith("order")){
customerOrders.setOrderId(split[0]);
customerOrders.setOrderStatus(split[3]);
customerOrders.setCustomer_id(split[2]);
customerOrders.setFlag("1");
customerOrders.setName("");
}else { //customer的處理邏輯,标簽為2
customerOrders.setCustomer_id(split[0]);
customerOrders.setName(split[1]);
customerOrders.setFlag("0");
customerOrders.setOrderId("");
customerOrders.setOrderStatus("");
customerOrders.setCustomer_id("");
}
// customer_id作為key
text.set(customerOrders.getCustomer_id());
context.write(text, customerOrders);
}
}
reducer:
package com.orderAndCustomer.reduce;
import com.orderAndCustomer.entry.CustomerOrders;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
//CustomerOrders作為輸出key,也是最終顯示的内容,value并不重要,是以value可以用NullWritable
public class ReduceJoinReducer extends Reducer<Text, CustomerOrders, CustomerOrders, NullWritable> {
@Override
protected void reduce(Text key, Iterable<CustomerOrders> values, Context context) throws IOException, InterruptedException {
// customer表的字段與order表的内用是一對多的關系
// customer表的對象cuBean
CustomerOrders cuBean=new CustomerOrders();
// 存放order表對象
List<CustomerOrders> orderBeans=new ArrayList<>();
//周遊values,标簽為1的存放在orderBeans中
for (CustomerOrders bean:values
) {
if ("1".equals(bean.getFlag())){
CustomerOrders orderBean=new CustomerOrders();
try {
BeanUtils.copyProperties(orderBean,bean);
orderBeans.add(orderBean);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}else {
try {
BeanUtils.copyProperties(cuBean,bean);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
// 為每一個order添加上對應id的customer表中的内容
for (CustomerOrders orderBean:orderBeans){
orderBean.setName(cuBean.getName());
context.write(orderBean,NullWritable.get());
}
}
}
driver
package com.orderAndCustomer.driver;
import com.orderAndCustomer.entry.CustomerOrders;
import com.orderAndCustomer.mapper.ReduceJoinMapper;
import com.orderAndCustomer.reduce.ReduceJoinReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class ReduceJoinDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
Configuration configuration=new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(ReduceJoinDriver.class);
job.setMapperClass(ReduceJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CustomerOrders.class);
job.setReducerClass(ReduceJoinReducer.class);
job.setOutputKeyClass(CustomerOrders.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job,new Path("/home/jarvis/Desktop/data"));
Path path=new Path("/home/jarvis/Desktop/orderAndCus");
// 判斷檔案是否存在,存在則删除
FileSystem fs =FileSystem.get(new URI(path.toString()),configuration);
if (fs.exists(path)){
fs.delete(path,true);
}
FileOutputFormat.setOutputPath(job,path);
job.waitForCompletion(true);
}
}
結論:會造成map和reduce端資料傳輸時(shuffle階段)出現大量的資料傳輸,效率很低.而且合并的操作是在reduce階段完成的,reduce端處理壓力太大,map節點的運算負載則很低,資源使用率不高,且在reduce階段極易産生資料傾斜
解決方案:map端實作資料合并
Mapper階段的Join操作
order表有68883條資料
customer表有12435條資料
在map端緩存多張表,提前處理業務邏輯,增加map端的業務,減少reduce端資料的壓力,盡可能的減少資料傾斜.
- 具體方法:
- 在驅動函數中加載緩存:
job.addCacheFile(new URI("")); //緩存普通檔案到task運作節點
- 在mapper的setup階段,将檔案讀取到緩存集合中
URI[] cacheFiles = context.getCacheFiles();
- 在驅動函數中加載緩存:
mapper:
package com.orderAndCustomer.mapper;
import com.orderAndCustomer.entry.CustomerOrders;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.*;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
public class MapperJoinMapper extends Mapper<LongWritable, Text, NullWritable, CustomerOrders> {
Map<String,String> cusMap =new HashMap<>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 從緩存中讀取檔案
URI[] cacheFiles = context.getCacheFiles();
// 存放檔案路徑
String filename=new Path(cacheFiles[0]).getName();
// 讀取檔案
BufferedReader bufferedReader =new BufferedReader(new InputStreamReader(new FileInputStream(filename)));
// 存放從檔案中讀取的每一行内容
String line;
// 周遊緩存檔案
while(StringUtils.isNotEmpty(line=bufferedReader.readLine())){
String [] split=line.split(",");
// 将id與name作為鍵值對存貯在cusMap中
cusMap.put(split[0],split[1]);
}
// 關流
bufferedReader.close();
}
//最後顯示的内容對象
CustomerOrders customerOrders =new CustomerOrders();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// value都是order表的資料,切分後添加上對應id的customer資料生成最終對象,不需要reduce階段
String[] fields =value.toString().split(",");
customerOrders.setCustomer_id(fields[2]);
customerOrders.setOrderId(fields[0]);
customerOrders.setOrderStatus(fields[3]);
customerOrders.setName(cusMap.get(fields[2]));
customerOrders.setFlag("1");
context.write(NullWritable.get(), customerOrders);
}
}
reduce
沒有reduce
driver
package com.orderAndCustomer.driver;
import com.orderAndCustomer.entry.CustomerOrders;
import com.orderAndCustomer.mapper.MapperJoinMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class MapJoinDriver {
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(MapJoinDriver.class);
job.setMapperClass(MapperJoinMapper.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(CustomerOrders.class);
job.addCacheFile(new URI("/home/jarvis/Desktop/data/customers.csv"));
//輸入
FileInputFormat.setInputPaths(job,new Path("/home/jarvis/Desktop/data/orders.csv"));
Path path =new Path("/home/jarvis/Desktop/orderAndCus");
// 判斷檔案是否存在,存在則删除
FileSystem fs =FileSystem.get(new URI(path.toString()),configuration);
if (fs.exists(path)){
fs.delete(path,true);
}
// 輸出
FileOutputFormat.setOutputPath(job,path);
job.waitForCompletion(true);
}
}