Hadoop學習(4)-mapreduce的一些注意事項
關于mapreduce的一些注意細節
如果把mapreduce程式打包放到了liux下去運作,
指令java –cp xxx.jar 主類名
如果報錯了,說明是缺少相關的依賴jar包
用指令hadoop jar xxx.jar 類名因為在叢集機器上用 hadoop jar xx.jar mr.wc.JobSubmitter 指令來啟動用戶端main方法時,hadoop jar這個指令會将所在機器上的hadoop安裝目錄中的jar包和配置檔案加入到運作時的classpath中
那麼,我們的用戶端main方法中的new Configuration()語句就會加載classpath中的配置檔案,自然就有了
fs.defaultFS 和 mapreduce.framework.name 和 yarn.resourcemanager.hostname 這些參數配置
會把本地hadoop的相關的所有jar包都會引用
Mapreduce也有本地的job運作,就是可以不用送出到yarn上,可以以單機的模式跑一邊以多個線程模拟也可以。
就是如果不管在Linux下還是windows下,送出job都會預設的送出到本地去運作,
如果在linux預設送出到yarn上運作,需要寫配置檔案hadoop/etc/mapred-site.xml檔案
mapreduce.framework.name
yarn
Key,value對,如果是自己的類的話,那麼這個類要實作Writable,同時要把你想序列化的資料轉化成二進制,然後放到重寫方法wirte參數的DataOutput裡面,另一個readFields重寫方法是用來反序列化用的,
注意反序列化的時候,會先拿這個類的無參構造方法構造出一個對象出來,然後再通過readFields方法來複原這個對象。
DataOutput也是一種流,隻不過是hadoop的在封裝,自己用的時候,裡面需要加個FileOutputStream對象
DataOutput寫字元串的時候要用writeUTF(“字元串”),他這樣編碼的時候,會在字元串的前面先加上字元串的長度,這是考慮到字元編碼對其的問題,hadoop解析的時候就會先讀前面兩個位元組,看一看這個字元串有多長,不然如果用write(字元串.getBytes())這樣他不知道這個字元串到底有多少個位元組。
在reduce階段,如果把一個對象寫到hdfs裡面,那麼會調用字元串的toString方法,你可以重寫這個類的toString方法
舉例,下面這個類就可以在hadoop裡序列化
package mapreduce2;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Waitable;
public class FlowBean implements Writable {
private int up;//上行流量
private int down;//下行流量
private int sum;//總流量
private String phone;//電話号
public FlowBean(int up, int down, String phone) {
this.up = up;
this.down = down;
this.sum = up + down;
this.phone = phone;
}
public int getUp() {
return up;
}
public void setUp(int up) {
this.up = up;
}
public int getDown() {
return down;
}
public void setDown(int down) {
this.down = down;
}
public int getSum() {
return sum;
}
public void setSum(int sum) {
this.sum = sum;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
@Override
public void readFields(DataInput di) throws IOException {
//注意這裡讀的順序要和寫的順序是一樣的
this.up = di.readInt();
this.down = di.readInt();
this.sum = this.up + this.down;
this.phone = di.readUTF();
}
@Override
public void write(DataOutput Do) throws IOException {
Do.writeInt(this.up);
Do.writeInt(this.down);
Do.writeInt(this.sum);
Do.writeUTF(this.phone);
}
@Override
public String toString() {
return "電話号"+this.phone+" 總流量"+this.sum;
}
}
當所有的reduceTask都運作完之後,還會調用一個cleanup方法
應用練習:統計一個頁面通路總量為n條的資料
方案一:隻用一個reducetask,利用cleanup方法,在reducetask階段,先不直接放到hdfs裡面,而是存到一個Treemap裡面
再在reducetask結束後,在cleanup裡面通過把Treemap裡面前五輸出到HDFS裡面;
package cn.edu360.mr.page.topn;
public class PageCount implements Comparable{
private String page;
private int count;
public void set(String page, int count) {
this.page = page;
this.count = count;
}
public String getPage() {
return page;
}
public void setPage(String page) {
this.page = page;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
@Override
public int compareTo(PageCount o) {
return o.getCount()-this.count==0?this.page.compareTo(o.getPage()):o.getCount()-this.count;
}
map類
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class PageTopnMapper extends Mapper{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(" ");
context.write(new Text(split[1]), new IntWritable(1));
}
reduce類
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Reducer;
public class PageTopnReducer extends Reducer{
TreeMap<PageCount, Object> treeMap = new TreeMap<>();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
PageCount pageCount = new PageCount();
pageCount.set(key.toString(), count);
treeMap.put(pageCount,null);
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
//可以在cleanup裡面拿到configuration,從裡面讀取要拿前幾條資料
int topn = conf.getInt("top.n", 5);
Set<Entry<PageCount, Object>> entrySet = treeMap.entrySet();
int i= 0;
for (Entry<PageCount, Object> entry : entrySet) {
context.write(new Text(entry.getKey().getPage()), new IntWritable(entry.getKey().getCount()));
i++;
if(i==topn) return;
}
}
然後jobSubmit類,注意這個要設定Configuration,這裡面有幾種方法
第一種是加載配置檔案
Configuration conf = new Configuration();
conf.addResource("xx-oo.xml");
然後再在xx-oo.xml檔案裡面寫
<property>
<name>top.n</name>
<value>6</value>
</property>
第二種方式
//通過直接設定
conf.setInt("top.n", 3);
//通過對java主程式 直接傳進來的參數
conf.setInt("top.n", Integer.parseInt(args[0]));
第三種方式通過擷取配置檔案參數
Properties props = new Properties();
props.load(JobSubmitter.class.getClassLoader().getResourceAsStream("topn.properties"));
conf.setInt("top.n", Integer.parseInt(props.getProperty("top.n")));
然後再在topn.properties裡面配置參數
top.n=5
subsubmit類,預設在本機模拟運作
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JobSubmitter {
public static void main(String[] args) throws Exception {
/**
* 通過加載classpath下的*-site.xml檔案解析參數
*/
Configuration conf = new Configuration();
conf.addResource("xx-oo.xml");
/**
* 通過代碼設定參數
*/
//conf.setInt("top.n", 3);
//conf.setInt("top.n", Integer.parseInt(args[0]));
/**
* 通過屬性配置檔案擷取參數
*/
/*Properties props = new Properties();
props.load(JobSubmitter.class.getClassLoader().getResourceAsStream("topn.properties"));
conf.setInt("top.n", Integer.parseInt(props.getProperty("top.n")));*/
Job job = Job.getInstance(conf);
job.setJarByClass(JobSubmitter.class);
job.setMapperClass(PageTopnMapper.class);
job.setReducerClass(PageTopnReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("F:\\mrdata\\url\\input"));
FileOutputFormat.setOutputPath(job, new Path("F:\\mrdata\\url\\output"));
job.waitForCompletion(true);
}
額外java知識點補充
Treemap,放進去的東西會自動排序
兩種Treemap的自定義方法,第一種是傳入一個Comparator
public class TreeMapTest {
public static void main(String[] args) {
TreeMap<FlowBean, String> tm1 = new TreeMap<>(new Comparator<FlowBean>() {
@Override
public int compare(FlowBean o1, FlowBean o2) {
//如果兩個類總流量相同的會比較電話号
if( o2.getAmountFlow()-o1.getAmountFlow()==0){
return o1.getPhone().compareTo(o2.getPhone());
}
//如果流量不同,就按從小到大的順序排序
return o2.getAmountFlow()-o1.getAmountFlow();
}
});
FlowBean b1 = new FlowBean("1367788", 500, 300);
FlowBean b2 = new FlowBean("1367766", 400, 200);
FlowBean b3 = new FlowBean("1367755", 600, 400);
FlowBean b4 = new FlowBean("1367744", 300, 500);
tm1.put(b1, null);
tm1.put(b2, null);
tm1.put(b3, null);
tm1.put(b4, null);
//treeset的周遊
Set<Entry<FlowBean,String>> entrySet = tm1.entrySet();
for (Entry<FlowBean,String> entry : entrySet) {
System.out.println(entry.getKey() +"\t"+ entry.getValue());
}
}
第二種是在這個類中,實作一個Comparable接口
private String page;
private int count;
public void set(String page, int count) {
this.page = page;
this.count = count;
}
public String getPage() {
return page;
}
public void setPage(String page) {
this.page = page;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
@Override
public int compareTo(PageCount o) {
return o.getCount()-this.count==0?this.page.compareTo(o.getPage()):o.getCount()-this.count;
}
原文位址
https://www.cnblogs.com/wpbing/archive/2019/07/25/11242866.html