版權說明: 本文章版權歸本人及部落格園共同所有,轉載請标明原文出處(http://www.cnblogs.com/mikevictor07/),以下内容為個人了解,僅供參考。
一、簡介
該執行個體統計國内各個站點的最高溫度(為節省篇幅隻以溫度為例,可稍作修改即可統計氣壓與風速),資料來源于彙總在NCDC的天氣氣球資料集中(包含世界大量資料集,該執行個體隻分析國内站點,資料對外公開,可下載下傳)。
二、資料準備與預處理
從NCDC下載下傳的天氣氣球資料集(ftp://ftp.ncdc.noaa.gov/pub/data/igra或http://www1.ncdc.noaa.gov/pub/data/igra/ , 壓縮為gz包)如下,可見并不适合Hadoop的MR子產品處理,需要進行預處理(本例下載下傳資料gz包總大小為293MB,解壓縮後為1.43GB):
#5928719630901009999 5
10 85000 1481B 202B-9999 190 20
10 70000 3139B 142B-9999 180 20
10 50000 5880B -55B-9999 60 30
10 40000 7605B -142B-9999 100 40
10 30000 9750B -255B-9999 100 70
#5928719630901129999 7
10 85000 1481B 215B-9999 320 20
10 70000 3142B 132B-9999 300 10
10 50000 5889B -35B-9999 50 30
10 40000 7620B -125B-9999 100 40
10 30000 9759B -275B-9999 70 60
10 20000 12561B -482B-9999 90 110
10 10000 16788B -785B-9999 90 100
首先需要閱讀下載下傳相關目錄的readme.txt,才能站點相關字段的含義,溫度資料已經*10(為了保留一位小數):
以該資料為例(其中的 9999一般代表資料缺失):
#5052719630901009999 5
10 85000 1314B 68B-9999-9999-9999
資料頭部 | |||||
辨別 | 站點編号 | 日期 | 觀察起始時間 | 觀察結束時間 | 記錄數 |
# | 50527 | 19630901 | 00 | 9999 | 5 |
資料記錄 | ||||||||||
Major Level | Minor Level | 氣壓(Pa)3-8 | 氣壓辨別 | 位勢高度(米)10-14 | 位勢高度辨別 | 溫度*10(16- 20位) | 溫度辨別 | 露點下降 | 風力方向 | 風速(m/s) |
1 | 85000 | 空格 | 1314 | B | 68 | -9999 |
由于MapReduce一行行處理資料,而該資料記錄部分依賴于資料頭部,MR對資料進行分區時對它們分開的可能性非常大,是以每條資料記錄部分必須加上頭部的部分資訊(根據需要确定),即預處理,對資料預處理的結果可輸出到本地,然後再拷貝至HDFS。
在本例中,資料頭部隻關注<站點編号>、<日期>,資料頭部與資料記錄形成的新格式如下:
預處理後的資料格式 | ||||||||||||
505271963090110 85000 1314B 68B-9999-9999-9999 | ||||||||||||
氣壓(Pa) | 位勢高度(米)10-14位 | 溫度*10 | ||||||||||
即如下面格式的新格式:
592871963090110 85000 1481B 202B-9999 190 20
592871963090110 70000 3139B 142B-9999 180 20
592871963090110 50000 5880B -55B-9999 60 30
三、資料拷貝至HDFS
資料從本地拷貝至HDFS可以通過編碼,也可使用eclipse的hadoop插件進行(該插件目前一般需要根據自己的環境編譯得到jar放入eclipse的plugins檔案夾下,過程稍微繁瑣),
當然也可以使用bin/hadoop工具copyFromLocal進行(不過需要先複制到叢集中的任意一台機器),本例中把資料存放在HDFS的 /weatherballoon 目錄下,以下代碼可供參考:
core-site.xml:不同的配置檔案友善本地測試和叢集切換,在MR程式調試的時候很有用
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://hbase-01:9000</value>
</property>
</configuration>
public class CopyFromLocalMain {
private static Configuration config = new Configuration();
public static void main(String[] args) throws Exception {
config.addResource("core-site.xml");
File inputDir = new File("d:/weatherballoon/input/");
String hdfsDir = "/weatherballoon/input/";
if (!inputDir.exists()) {
System.err.println(inputDir.getAbsolutePath() + " directory not exist .");
System.exit(-1);
}
File[] files = inputDir.listFiles();
if (files == null) {
System.err.println(inputDir.getAbsolutePath() + " directory is empty .");
System.exit(-1);
}
for (File file : files) {
copyFileToHdfs(file, hdfsDir + file.getName());
}
System.out.println("Copy finished, total: " + files.length);
}
public static void copyFileToHdfs(File local,String dest) throws IOException{
InputStream in = new BufferedInputStream(new FileInputStream(local));
FileSystem fs = FileSystem.get(config);
FSDataOutputStream out = fs.create(new Path(dest));
IOUtils.copyBytes(in, out, 4096, true);
out.close();
}
}
資料拷貝完畢後可通路HDFS的namenode檢視狀态(預設50070端口),本例狀态如下圖:
四、編寫MapReduce程式
目前的資料格式已經每行之間無依賴性,首先編輯Mapper部分,該部分用于把站點ID作為key的資料集存入OutputCollector中:
public class MaxTemperatureMapper
extends MapReduceBase implements Mapper<LongWritable, Text, Text, RecordValue>{
public static final int DATA_LENGTH = 49; //預處理後的資料行長度
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, RecordValue> output, Reporter reporter) throws IOException {
//505271963090110 85000 1314B 68B-9999-9999-9999
String line = value.toString();
if (line.length() != DATA_LENGTH) {
System.out.println("------------->Error record : " + line);
return;
}
String stationId = line.substring(0, 5);
String date = line.substring(5, 13);
String temp = line.substring(28, 33);
if (!missing(temp)) {
int temperature = Integer.parseInt(temp.trim());
output.collect(new Text(stationId), new RecordValue(date, temperature));
}
}
private boolean missing(String temp) {
return temp.equals("-9999");
}
}
Mapper中輸出的Value值為自定義的類型(即RecordValue),因為需要同時記錄日期和溫度,如果要自定義類型,則必須實作Writable(如Hadoop自帶的LongWritable,IntWritable,Text等)。
實作該接口使得對象能夠序列化在不同機器間傳輸(程序間采用RPC通信,Hadoop采用Avro序列化,其他比較流行的序列化架構有Apache Thrift和Google protocol buffers),
一般建議實作WritableComparable接口,該接口中有個compareTo 方法的實作對于MapReduce來說是比較重要的,用于基于鍵的中間結果排序。
也可以實作RawComparator ,即可在位元組流中排序,而不需要反序列化,減小額外開銷。
RecordValue的實作如下:
public class RecordValue implements WritableComparable<RecordValue>{
private String date;
private int temperature;
public RecordValue(){}
public RecordValue(String date, int temperature) {
this.date = date;
this.temperature = temperature;
}
@Override
public void write(DataOutput out) throws IOException {
out.write(date.getBytes());
out.writeInt(temperature);
}
@Override
public void readFields(DataInput in) throws IOException {
byte[] buff = new byte[8];
in.readFully(buff);
date = new String(buff);
temperature = in.readInt();
}
@Override
public int compareTo(RecordValue o) {
if (date.compareTo(o.getDate()) > 0 || temperature > o.getTemperature()) {
return 1;
}
return -1;
}
@Override
public String toString() {
return " -- " + date + "," + temperature;
}
//省略setter getter
}
Mapper部分需要做單元測試,成功後接下面編寫Reducer部分:
public class MaxTemperatureReducer extends MapReduceBase implements
Reducer<Text, RecordValue, Text, RecordValue> {
@Override
public void reduce(Text key, Iterator<RecordValue> values,
OutputCollector<Text, RecordValue> output, Reporter reporter)
throws IOException {
int maxValue = Integer.MIN_VALUE;
String date = "00000000";
while (values.hasNext()) {
RecordValue record = values.next();
int temp = record.getTemperature();
if (temp > maxValue) {
maxValue = temp;
date = record.getDate();
}
}
output.collect(key, new RecordValue(date, maxValue));
}
}
當Reduce部分單元測試成功後即可編寫驅動程式MaxTemperatureDriver,先用本地小資料集進行測試,配置檔案切換為本地配置,如:
public static void main(String[] args) throws IOException {
String inputPath = "file:///d:/weatherballoon/input/*";
String outputPath = "file:///d:/weatherballoon/output";
File out = new File("d:/weatherballoon/output");
if (out.exists()) {
FileUtils.forceDelete(out); //采用apache common io包
}
Configuration conf = new Configuration();
conf.addResource("core-site-local.xml");
JobConf job = new JobConf(conf);
job.setJobName("Max Temperature(NCDC)");
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(RecordValue.class);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
JobClient.runJob(job);
}
運作程式,如果出錯則在本地容易查出錯誤地方,檢視輸出結構是否如何預期,下面是本例小部分資料集的輸出結果:
50527 -- 20040721,358
50557 -- 20100627,342
50603 -- 19730409,440
溫度已經乘以10,故對應的結果如下表格:
站點ID | 溫度(攝氏度) | |
20040721 | 35.8 | |
50557 | 20100627 | 34.2 |
50603 | 19730409 | 44.0 |
五、叢集運作
測試成功後可切換至叢集運作,更改MaxTemperatureDriver的main,如下:
public static void main(String[] args) throws IOException {
String inputPath = "/weatherballoon/input/";
String outputPath = "/weatherballoon/output";
Configuration conf = new Configuration();
JobConf job = new JobConf(conf);
job.setJobName("Max Temperature(NCDC)");
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setJarByClass(MaxTemperatureDriver.class); //!important
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(RecordValue.class);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
JobClient.runJob(job);
}
然後程式打包(編寫MANIFEST.MF):
Manifest-Version: 1.0
Class-Path: .
Main-Class: org.mike.hadoop.weatherballoon.MaxTemperatureDriver
eclipse->export->jar并選擇MANIFEST.MF檔案,把jar上傳到叢集任一節點,執行如下指令:
hadoop jar MaxTemperature.jar
運作如下圖:
成功後即可從HDFS拷貝結果檔案至本地檢視(或者直接hadoop dfs -cat也可以),本例得到的結果如下(列出小部分):
50527 -- 20040721,358
50557 -- 20100627,342
50603 -- 19730409,440
50745 -- 19990413,386
50774 -- 20100624,316
50834 -- 19920428,426
50953 -- 20010604,346
51076 -- 19931031,506
51133 -- 19870716,600
51156 -- 19860309,552
51232 -- 19800802,220
根據NCDN中igra-stations.txt檔案得到對應的站點整理後如下:
站點名稱 | 最高溫度 | ||
HAILAR | |||
NENJIANG | |||
CHIN-BARAG | |||
50745 | CHICHIHAR | 19990413 | 38.6 |
50774 | YICHUN | 20100624 | 31.6 |
50834 | TA KO TAI | 19920428 | 42.6 |
50953 | HARBIN | 20010604 | 34.6 |
51076 | ALTAY | 19931031 | 50.6 |
51133 | TA CHENG | 19870716 | 60 |
51156 | HOBOG SAIR | 19860309 | 55.2 |
51232 | BORDER STATION | 19800802 | 22 |
Finished ..