天天看點

Hadoop - 國内各站點最高溫度、氣壓和風速統計

版權說明:  本文章版權歸本人及部落格園共同所有,轉載請标明原文出處(http://www.cnblogs.com/mikevictor07/),以下内容為個人了解,僅供參考。

一、簡介

    該執行個體統計國内各個站點的最高溫度(為節省篇幅隻以溫度為例,可稍作修改即可統計氣壓與風速),資料來源于彙總在NCDC的天氣氣球資料集中(包含世界大量資料集,該執行個體隻分析國内站點,資料對外公開,可下載下傳)。

二、資料準備與預處理

    從NCDC下載下傳的天氣氣球資料集(ftp://ftp.ncdc.noaa.gov/pub/data/igra或http://www1.ncdc.noaa.gov/pub/data/igra/ , 壓縮為gz包)如下,可見并不适合Hadoop的MR子產品處理,需要進行預處理(本例下載下傳資料gz包總大小為293MB,解壓縮後為1.43GB):

#5928719630901009999   5
10 85000  1481B  202B-9999  190   20
10 70000  3139B  142B-9999  180   20
10 50000  5880B  -55B-9999   60   30
10 40000  7605B -142B-9999  100   40
10 30000  9750B -255B-9999  100   70
#5928719630901129999   7
10 85000  1481B  215B-9999  320   20
10 70000  3142B  132B-9999  300   10
10 50000  5889B  -35B-9999   50   30
10 40000  7620B -125B-9999  100   40
10 30000  9759B -275B-9999   70   60
10 20000 12561B -482B-9999   90  110
10 10000 16788B -785B-9999   90  100      

首先需要閱讀下載下傳相關目錄的readme.txt,才能站點相關字段的含義,溫度資料已經*10(為了保留一位小數):

以該資料為例(其中的 9999一般代表資料缺失):

#5052719630901009999 5

10 85000 1314B 68B-9999-9999-9999

資料頭部
辨別 站點編号 日期 觀察起始時間 觀察結束時間 記錄數
# 50527 19630901 00 9999 5
資料記錄
Major Level Minor Level 氣壓(Pa)3-8 氣壓辨別 位勢高度(米)10-14 位勢高度辨別 溫度*10(16- 20位) 溫度辨別 露點下降 風力方向 風速(m/s)
1 85000 空格 1314 B 68 -9999

 由于MapReduce一行行處理資料,而該資料記錄部分依賴于資料頭部,MR對資料進行分區時對它們分開的可能性非常大,是以每條資料記錄部分必須加上頭部的部分資訊(根據需要确定),即預處理,對資料預處理的結果可輸出到本地,然後再拷貝至HDFS。

    在本例中,資料頭部隻關注<站點編号>、<日期>,資料頭部與資料記錄形成的新格式如下:

預處理後的資料格式
505271963090110 85000  1314B   68B-9999-9999-9999
氣壓(Pa) 位勢高度(米)10-14位 溫度*10

即如下面格式的新格式:

592871963090110 85000  1481B  202B-9999  190   20
592871963090110 70000  3139B  142B-9999  180   20
592871963090110 50000  5880B  -55B-9999   60   30      

三、資料拷貝至HDFS

    資料從本地拷貝至HDFS可以通過編碼,也可使用eclipse的hadoop插件進行(該插件目前一般需要根據自己的環境編譯得到jar放入eclipse的plugins檔案夾下,過程稍微繁瑣),

當然也可以使用bin/hadoop工具copyFromLocal進行(不過需要先複制到叢集中的任意一台機器),本例中把資料存放在HDFS的 /weatherballoon 目錄下,以下代碼可供參考:

core-site.xml:不同的配置檔案友善本地測試和叢集切換,在MR程式調試的時候很有用

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
     <property>
         <name>fs.default.name</name>
         <value>hdfs://hbase-01:9000</value>
     </property>
</configuration>
      
public class CopyFromLocalMain {
    
    private static Configuration config = new Configuration();
    
    public static void main(String[] args) throws Exception {
        config.addResource("core-site.xml");
        File inputDir = new File("d:/weatherballoon/input/");
        String hdfsDir = "/weatherballoon/input/";
        
        if (!inputDir.exists()) {
            System.err.println(inputDir.getAbsolutePath() + " directory not exist .");
            System.exit(-1);
        }
        File[] files = inputDir.listFiles();
        if (files == null) {
            System.err.println(inputDir.getAbsolutePath() + " directory is empty .");
            System.exit(-1);
        }
        
        for (File file : files) {
            copyFileToHdfs(file, hdfsDir + file.getName());
        }
        System.out.println("Copy finished, total: " + files.length);
    }

    public static void copyFileToHdfs(File local,String dest) throws IOException{
        InputStream in = new BufferedInputStream(new FileInputStream(local));
        FileSystem fs = FileSystem.get(config);
        FSDataOutputStream out = fs.create(new Path(dest));
        IOUtils.copyBytes(in, out, 4096, true);
        out.close();
    }
}      

資料拷貝完畢後可通路HDFS的namenode檢視狀态(預設50070端口),本例狀态如下圖:

Hadoop - 國内各站點最高溫度、氣壓和風速統計

四、編寫MapReduce程式

    目前的資料格式已經每行之間無依賴性,首先編輯Mapper部分,該部分用于把站點ID作為key的資料集存入OutputCollector中:

public class MaxTemperatureMapper 
    extends MapReduceBase implements Mapper<LongWritable, Text, Text, RecordValue>{
    
    public static final int DATA_LENGTH = 49; //預處理後的資料行長度
    
    @Override
    public void map(LongWritable key, Text value,
            OutputCollector<Text, RecordValue> output, Reporter reporter) throws IOException {
        
        //505271963090110 85000  1314B   68B-9999-9999-9999
        String line = value.toString();
        if (line.length() != DATA_LENGTH) {
            System.out.println("------------->Error record : " + line);
            return;
        }
        
        String stationId = line.substring(0, 5);
        String date = line.substring(5, 13);
        String temp = line.substring(28, 33);
        if (!missing(temp)) {
            int temperature = Integer.parseInt(temp.trim());
            output.collect(new Text(stationId), new RecordValue(date, temperature));
        }
    }
    
    private boolean missing(String temp) {
        return temp.equals("-9999");
    }
    
}      

    Mapper中輸出的Value值為自定義的類型(即RecordValue),因為需要同時記錄日期和溫度,如果要自定義類型,則必須實作Writable(如Hadoop自帶的LongWritable,IntWritable,Text等)。

    實作該接口使得對象能夠序列化在不同機器間傳輸(程序間采用RPC通信,Hadoop采用Avro序列化,其他比較流行的序列化架構有Apache Thrift和Google protocol buffers),

一般建議實作WritableComparable接口,該接口中有個compareTo 方法的實作對于MapReduce來說是比較重要的,用于基于鍵的中間結果排序。

也可以實作RawComparator ,即可在位元組流中排序,而不需要反序列化,減小額外開銷。

RecordValue的實作如下:

public class RecordValue implements WritableComparable<RecordValue>{
    
    private String date;
    private int temperature;
    
    public RecordValue(){}
    public RecordValue(String date, int temperature) {
        this.date = date;
        this.temperature = temperature;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.write(date.getBytes());
        out.writeInt(temperature);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        byte[] buff = new byte[8];
        in.readFully(buff);
        date = new String(buff);
        temperature = in.readInt();
    }
    @Override
    public int compareTo(RecordValue o) {
        if (date.compareTo(o.getDate()) > 0 || temperature > o.getTemperature()) {
            return 1;
        }
        return -1;
    }
    @Override
    public String toString() {
        return " -- " + date + "," + temperature;
    }
    
    //省略setter getter
}      

Mapper部分需要做單元測試,成功後接下面編寫Reducer部分:

public class MaxTemperatureReducer extends MapReduceBase implements
        Reducer<Text, RecordValue, Text, RecordValue> {

    @Override
    public void reduce(Text key, Iterator<RecordValue> values,
            OutputCollector<Text, RecordValue> output, Reporter reporter)
            throws IOException {
        
        int maxValue = Integer.MIN_VALUE;
        String date = "00000000";
        while (values.hasNext()) {
            RecordValue record = values.next();
            int temp = record.getTemperature();
            if (temp > maxValue) {
                maxValue = temp;
                date = record.getDate();
            }
        }
        output.collect(key, new RecordValue(date, maxValue));
    }
    
}      

當Reduce部分單元測試成功後即可編寫驅動程式MaxTemperatureDriver,先用本地小資料集進行測試,配置檔案切換為本地配置,如:

public static void main(String[] args) throws IOException {
        
        String inputPath = "file:///d:/weatherballoon/input/*";
        String outputPath = "file:///d:/weatherballoon/output";
        
        File out = new File("d:/weatherballoon/output");
        if (out.exists()) {
            FileUtils.forceDelete(out); //采用apache common io包
        }
        
        Configuration conf = new Configuration();
        conf.addResource("core-site-local.xml");
        
        JobConf job = new JobConf(conf);
        job.setJobName("Max Temperature(NCDC)");
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(RecordValue.class);
        
        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        JobClient.runJob(job);
    }      

運作程式,如果出錯則在本地容易查出錯誤地方,檢視輸出結構是否如何預期,下面是本例小部分資料集的輸出結果:

50527 -- 20040721,358

50557 -- 20100627,342

50603 -- 19730409,440

溫度已經乘以10,故對應的結果如下表格:

站點ID 溫度(攝氏度)
20040721 35.8
50557 20100627 34.2
50603 19730409 44.0

五、叢集運作

測試成功後可切換至叢集運作,更改MaxTemperatureDriver的main,如下:

public static void main(String[] args) throws IOException {
        
        String inputPath = "/weatherballoon/input/";
        String outputPath = "/weatherballoon/output";
        
        Configuration conf = new Configuration();
        
        JobConf job = new JobConf(conf);
        job.setJobName("Max Temperature(NCDC)");
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        job.setJarByClass(MaxTemperatureDriver.class); //!important
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(RecordValue.class);
        
        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        JobClient.runJob(job);
    }      

然後程式打包(編寫MANIFEST.MF):

Manifest-Version: 1.0
Class-Path: .
Main-Class: org.mike.hadoop.weatherballoon.MaxTemperatureDriver

      

eclipse->export->jar并選擇MANIFEST.MF檔案,把jar上傳到叢集任一節點,執行如下指令:

hadoop jar MaxTemperature.jar

運作如下圖:

Hadoop - 國内各站點最高溫度、氣壓和風速統計

成功後即可從HDFS拷貝結果檔案至本地檢視(或者直接hadoop dfs -cat也可以),本例得到的結果如下(列出小部分):

50527    -- 20040721,358
50557    -- 20100627,342
50603    -- 19730409,440
50745    -- 19990413,386
50774    -- 20100624,316
50834    -- 19920428,426
50953    -- 20010604,346
51076    -- 19931031,506
51133    -- 19870716,600
51156    -- 19860309,552
51232    -- 19800802,220      

根據NCDN中igra-stations.txt檔案得到對應的站點整理後如下:

站點名稱 最高溫度
HAILAR
NENJIANG
CHIN-BARAG
50745 CHICHIHAR 19990413 38.6
50774 YICHUN 20100624 31.6
50834 TA KO TAI 19920428 42.6
50953 HARBIN 20010604 34.6
51076 ALTAY 19931031 50.6
51133 TA CHENG 19870716 60
51156 HOBOG SAIR 19860309 55.2
51232 BORDER STATION 19800802 22

Finished ..