天天看點

8.2 MapReduce 中的序列化(一)

任務目的

  • 了解序列化和反序列化的概念和作用
  • 了解 Java 的序列化和反序列化實作
  • 可以自定義對象實作 MapReduce 架構的序列化

任務清單

  • 任務1:序列化概述
  • 任務2:Java 序列化
  • 任務3:實作 MapReduce 架構的序列化

詳細任務步驟

任務1:序列化概述

  • 序列化(Serialization):是指把結構化對象(Object)轉化為位元組流(ByteStream)。
  • 反序列化(Deserialization):是序列化的逆過程。即把位元組流轉回結構化對象。
8.2 MapReduce 中的序列化(一)

圖1

 

  **為什麼需要序列化和反序列化? **

  總的來說可以歸結為以下幾點:

  (1)永久性儲存對象,儲存對象的位元組序列到本地檔案或者資料庫中;

  (2)通過序列化以位元組流的形式使對象 在網絡中進行傳遞和接收;

  (3)通過序列化在程序間傳遞對象。

  Java 的序列化是一個重量級序列化架構(Serializable),一個對象被序列化後,會附帶很多額外的資訊(各種校 驗資訊、header、繼承體系等),不便于在網絡中高效傳輸;是以,Hadoop 自己開發了一套序列化機制 (Writable),精簡,高效。 Hadoop 中的序列化架構已經對基本類型和 null 提供了序列化的實作了。分别是:

Java 類型 Hadoop Writable 類型
byte ByteWritable
short ShortWritable
int IntWritable
long LongWritable
float FloatWritable
double DoubleWritable
string Text
null NullWritable

任務2:Java 序列化

  Java 序列化的過程是與平台無關的,也就是說一個 Java 對象可以在一個平台上序列化之後傳輸到另外一個平台上進行反序列化。

Serializable。隻有實作了 ​

​java.io.Serializable​

​​ 接口的類可以進行序列化/反序列化,其中 ​

​java.io.Serializable​

​ 接口是一個标記接口,标記接口不含有任何成員和方法,标記接口的作用是标記一組類,這些類都具有相同的特定的功能,常見的标記接口還有 Cloneable。

2.1 實作 Serializable 接口

  一個類的對象要想序列化成功,必須滿足兩個條件:

  (1)該類必須實作 ​

​java.io.Serializable​

​ 對象;

  (2)該類的所有屬性必須是可序列化的。聲明為 ​

​static​

​​ 和 ​

​transient​

​​ 類型的成員資料不能被序列化。因為 ​

​static​

​​ 代表類的狀态,​

​transient​

​ 代表對象的臨時資料。

  詳細代碼如下所示:

package com.hongyaa.java.serializable;

import java.io.Serializable;

/**
 * 定義一個Student類,實作 Serializable 接口
 */
public class Student implements Serializable {

  /**
   * 序列化ID
   */
  private static final long serialVersionUID = -7037627544448704690L;
  private int id;
  private String name;
  private int age;
  //該屬性聲明為暫時的,是以不可序列化
  private transient String sex;

  public Student() {
    super();
  }

  public Student(int id, String name, int {
    super();
    this.id = id;
    this.name = name;
    this.age = age;
    this.sex = sex;
  }

  public int getId() {
    return id;
  }

  public void setId(int {
    this.id = id;
  }

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int {
    this.age = age;
  }

  public String getSex() {
    return sex;
  }

  public void setSex(String sex) {
    this.sex = sex;
  }

  @Override
  public String toString() {
    return "id=" + id + ", name=" + name + ", age=" + age + ", sex="      

2.2 序列化

  那麼我們如何将此類的對象序列化後儲存到磁盤上呢?

  (1)建立一個 ​

​ObjectOutputStream​

​​ 輸出流 ​

​oos​

  (2)調用此輸出流 ​

​oos​

​​ 的 ​

​writeObject()​

​ 方法寫對象

8.2 MapReduce 中的序列化(一)

圖2

 

  詳細代碼如下所示:

package com.hongyaa.java.serializable;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;

/**
 * 序列化Student對象
 * 
 *
 */
public class SerializeTest {
  public static void main(String[] args) {
    Student stu = new Student(1, "cendy", 22, "female");
    try {
      // ObjectOutputStream 對象輸出流,将Student對象存儲到/root/software/student.ser,完成對Student對象的序列化操作
      FileOutputStream fos = new FileOutputStream("/root/software/student.ser");
      ObjectOutputStream oos = new ObjectOutputStream(fos);
      // 序列化一個對象,并将它發送到輸出流
      oos.writeObject(stu);
      oos.close();
      fos.close();
      System.out.println("Serialized data is saved in /root/software/student.ser");
    } catch (FileNotFoundException e) {
      e.printStackTrace();
    } catch      

  執行結果如下所示:

8.2 MapReduce 中的序列化(一)

圖3

 

  顯示将 Student 的一個執行個體對象序列化到了/root/software/student.ser檔案中,我們可以進入/root/software目錄下進行你驗證,發現建立了student.ser檔案。如下圖所示:

8.2 MapReduce 中的序列化(一)

圖4

 

2.3 反序列化

  我們如從文本檔案中将此對象的位元組序列恢複成 Student 對象呢?

  (1)建立一個 ​

​ObjectInputStream​

​​ 輸入流 ​

​ois​

  (2)調用此輸入流 ​

​ois​

​​ 的 ​

​readObject()​

​ 方法讀取對象

8.2 MapReduce 中的序列化(一)

圖5

 

  詳細代碼如下所示:

package com.hognyaa.java.serializable;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;

/**
 * 反序列化Student對象
 *
 */
public class DeserializeTest {
  public static void main(String[] args) {
    Student stu = null;
    try {
      FileInputStream fis = new FileInputStream("/root/software/student.ser");
      ObjectInputStream ois = new ObjectInputStream(fis);
      // 從流中取出下一個對象,并将對象反序列化
      // 傳回值類型為Object,是以需要将它轉換成合适的資料類型,這裡轉換成Student類型
      stu = (Student) ois.readObject();
      ois.close();
      fis.close();
    } catch (FileNotFoundException e) {
      e.printStackTrace();
    } catch (ClassNotFoundException e) {
      System.out.println("Student class not found!!!");
      e.printStackTrace();
    } catch (IOException e) {
      e.printStackTrace();
    }
    System.out.println("Deserialized Student...");
    System.out.println(stu);
  }
}      

  執行結果如下所示:

8.2 MapReduce 中的序列化(一)

圖6

 

  從執行結果中可以看出,sex 的值輸出為null,這是因為該屬性聲明為暫時的,是以它是不可序列化的資料,也沒有儲存在 student.ser 中。在反序列化時,該屬性的值 “female” 也就沒有,而是 null。

任務3:實作 MapReduce 架構的序列化

Writable 接口的類,Writable 接口定義了兩個方法:

  (1)使用 ​

​write(DataOutput out)​

​ 方法将資料寫入到二進制資料流中

  (2)使用 ​

​readFields(DataInput in)​

​ 方法從二進制資料流中讀取資料

  以流量統計項目案例為例:

  (1)資料樣例

13726238888 2481 24681 
13560436666 1116 954 
13726230503 2481 24681 
13826544101 264 0 
13926435656 132 1512 
13926251106 240 0 
18211575961 1527 2106      

  (2)字段釋義

字段中文釋義 字段英文釋義 資料類型
手機号 phone String
上行流量 upflow Long
下行流量 downflow Long

  (3)項目需求一

  統計每一個使用者(手機号)所耗費的總上行流量、總下行流量、總流量。

  期望輸出資料格式:

13480253104 2494800 2494800 4989600      

  下面是進行了序列化和反序列化的 FlowBean 類:

package com.hongyaa.sum; 
import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import org.apache.hadoop.io.Writable; 

public class FlowBean implements Writable { 

    private long upFlow; private long downFlow; 
    private long sumFlow; // 序列化架構在反序列化操作建立對象執行個體時會調用無參構造器 
    
    public FlowBean() { 
        super(); 
    } 

    // 為了對象資料的初始化友善,加入一個帶參的構造函數     
    public FlowBean(long upFlow, long downFlow) { 
        super(); 
        this.upFlow = upFlow; 
        this.downFlow = downFlow; 
        this.sumFlow = upFlow + downFlow; 
    } 
    public long getUpFlow() { 
        return upFlow; 
    }
    public void setUpFlow(long upFlow) { 
        this.upFlow = upFlow; 
    } 
    public long getDownFlow() { 
        return downFlow; 
    } 
    public void setDownFlow(long downFlow) { 
        this.downFlow = downFlow; 
    } 
    public long getSumFlow() { 
        return sumFlow; 
    } 
    public void setSumFlow(long sumFlow) { 
        this.sumFlow = sumFlow; 
    } 

    // 序列化方法 
    @Override 
    public void write(DataOutput out) throws IOException { 
        out.writeLong(upFlow); 
        out.writeLong(downFlow); 
        out.writeLong(sumFlow); 
    } 

    // 反序列化方法 
    // 注意:字段的反序列化順序與序列化時的順序保持一緻,并且參數類型和個數也一緻 
    @Override 
    public void readFields(DataInput in) throws IOException { 
        this.upFlow = in.readLong(); 
        this.downFlow = in.readLong(); 
        this.sumFlow = in.readLong(); 
    } 
    @Override public String toString() { 
        return upFlow + "\t" + downFlow + "\t" + sumFlow; 
    } 
}