天天看点

8.2 MapReduce 中的序列化(一)

任务目的

  • 了解序列化和反序列化的概念和作用
  • 理解 Java 的序列化和反序列化实现
  • 可以自定义对象实现 MapReduce 框架的序列化

任务清单

  • 任务1:序列化概述
  • 任务2:Java 序列化
  • 任务3:实现 MapReduce 框架的序列化

详细任务步骤

任务1:序列化概述

  • 序列化(Serialization):是指把结构化对象(Object)转化为字节流(ByteStream)。
  • 反序列化(Deserialization):是序列化的逆过程。即把字节流转回结构化对象。
8.2 MapReduce 中的序列化(一)

图1

 

  **为什么需要序列化和反序列化? **

  总的来说可以归结为以下几点:

  (1)永久性保存对象,保存对象的字节序列到本地文件或者数据库中;

  (2)通过序列化以字节流的形式使对象 在网络中进行传递和接收;

  (3)通过序列化在进程间传递对象。

  Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校 验信息、header、继承体系等),不便于在网络中高效传输;所以,Hadoop 自己开发了一套序列化机制 (Writable),精简,高效。 Hadoop 中的序列化框架已经对基本类型和 null 提供了序列化的实现了。分别是:

Java 类型 Hadoop Writable 类型
byte ByteWritable
short ShortWritable
int IntWritable
long LongWritable
float FloatWritable
double DoubleWritable
string Text
null NullWritable

任务2:Java 序列化

  Java 序列化的过程是与平台无关的,也就是说一个 Java 对象可以在一个平台上序列化之后传输到另外一个平台上进行反序列化。

Serializable。只有实现了 ​

​java.io.Serializable​

​​ 接口的类可以进行序列化/反序列化,其中 ​

​java.io.Serializable​

​ 接口是一个标记接口,标记接口不含有任何成员和方法,标记接口的作用是标记一组类,这些类都具有相同的特定的功能,常见的标记接口还有 Cloneable。

2.1 实现 Serializable 接口

  一个类的对象要想序列化成功,必须满足两个条件:

  (1)该类必须实现 ​

​java.io.Serializable​

​ 对象;

  (2)该类的所有属性必须是可序列化的。声明为 ​

​static​

​​ 和 ​

​transient​

​​ 类型的成员数据不能被序列化。因为 ​

​static​

​​ 代表类的状态,​

​transient​

​ 代表对象的临时数据。

  详细代码如下所示:

package com.hongyaa.java.serializable;

import java.io.Serializable;

/**
 * 定义一个Student类,实现 Serializable 接口
 */
public class Student implements Serializable {

  /**
   * 序列化ID
   */
  private static final long serialVersionUID = -7037627544448704690L;
  private int id;
  private String name;
  private int age;
  //该属性声明为暂时的,因此不可序列化
  private transient String sex;

  public Student() {
    super();
  }

  public Student(int id, String name, int {
    super();
    this.id = id;
    this.name = name;
    this.age = age;
    this.sex = sex;
  }

  public int getId() {
    return id;
  }

  public void setId(int {
    this.id = id;
  }

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int {
    this.age = age;
  }

  public String getSex() {
    return sex;
  }

  public void setSex(String sex) {
    this.sex = sex;
  }

  @Override
  public String toString() {
    return "id=" + id + ", name=" + name + ", age=" + age + ", sex="      

2.2 序列化

  那么我们如何将此类的对象序列化后保存到磁盘上呢?

  (1)创建一个 ​

​ObjectOutputStream​

​​ 输出流 ​

​oos​

  (2)调用此输出流 ​

​oos​

​​ 的 ​

​writeObject()​

​ 方法写对象

8.2 MapReduce 中的序列化(一)

图2

 

  详细代码如下所示:

package com.hongyaa.java.serializable;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;

/**
 * 序列化Student对象
 * 
 *
 */
public class SerializeTest {
  public static void main(String[] args) {
    Student stu = new Student(1, "cendy", 22, "female");
    try {
      // ObjectOutputStream 对象输出流,将Student对象存储到/root/software/student.ser,完成对Student对象的序列化操作
      FileOutputStream fos = new FileOutputStream("/root/software/student.ser");
      ObjectOutputStream oos = new ObjectOutputStream(fos);
      // 序列化一个对象,并将它发送到输出流
      oos.writeObject(stu);
      oos.close();
      fos.close();
      System.out.println("Serialized data is saved in /root/software/student.ser");
    } catch (FileNotFoundException e) {
      e.printStackTrace();
    } catch      

  执行结果如下所示:

8.2 MapReduce 中的序列化(一)

图3

 

  显示将 Student 的一个实例对象序列化到了/root/software/student.ser文件中,我们可以进入/root/software目录下进行你验证,发现创建了student.ser文件。如下图所示:

8.2 MapReduce 中的序列化(一)

图4

 

2.3 反序列化

  我们如从文本文件中将此对象的字节序列恢复成 Student 对象呢?

  (1)创建一个 ​

​ObjectInputStream​

​​ 输入流 ​

​ois​

  (2)调用此输入流 ​

​ois​

​​ 的 ​

​readObject()​

​ 方法读取对象

8.2 MapReduce 中的序列化(一)

图5

 

  详细代码如下所示:

package com.hognyaa.java.serializable;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;

/**
 * 反序列化Student对象
 *
 */
public class DeserializeTest {
  public static void main(String[] args) {
    Student stu = null;
    try {
      FileInputStream fis = new FileInputStream("/root/software/student.ser");
      ObjectInputStream ois = new ObjectInputStream(fis);
      // 从流中取出下一个对象,并将对象反序列化
      // 返回值类型为Object,因此需要将它转换成合适的数据类型,这里转换成Student类型
      stu = (Student) ois.readObject();
      ois.close();
      fis.close();
    } catch (FileNotFoundException e) {
      e.printStackTrace();
    } catch (ClassNotFoundException e) {
      System.out.println("Student class not found!!!");
      e.printStackTrace();
    } catch (IOException e) {
      e.printStackTrace();
    }
    System.out.println("Deserialized Student...");
    System.out.println(stu);
  }
}      

  执行结果如下所示:

8.2 MapReduce 中的序列化(一)

图6

 

  从执行结果中可以看出,sex 的值输出为null,这是因为该属性声明为暂时的,所以它是不可序列化的数据,也没有保存在 student.ser 中。在反序列化时,该属性的值 “female” 也就没有,而是 null。

任务3:实现 MapReduce 框架的序列化

Writable 接口的类,Writable 接口定义了两个方法:

  (1)使用 ​

​write(DataOutput out)​

​ 方法将数据写入到二进制数据流中

  (2)使用 ​

​readFields(DataInput in)​

​ 方法从二进制数据流中读取数据

  以流量统计项目案例为例:

  (1)数据样例

13726238888 2481 24681 
13560436666 1116 954 
13726230503 2481 24681 
13826544101 264 0 
13926435656 132 1512 
13926251106 240 0 
18211575961 1527 2106      

  (2)字段释义

字段中文释义 字段英文释义 数据类型
手机号 phone String
上行流量 upflow Long
下行流量 downflow Long

  (3)项目需求一

  统计每一个用户(手机号)所耗费的总上行流量、总下行流量、总流量。

  期望输出数据格式:

13480253104 2494800 2494800 4989600      

  下面是进行了序列化和反序列化的 FlowBean 类:

package com.hongyaa.sum; 
import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import org.apache.hadoop.io.Writable; 

public class FlowBean implements Writable { 

    private long upFlow; private long downFlow; 
    private long sumFlow; // 序列化框架在反序列化操作创建对象实例时会调用无参构造器 
    
    public FlowBean() { 
        super(); 
    } 

    // 为了对象数据的初始化方便,加入一个带参的构造函数     
    public FlowBean(long upFlow, long downFlow) { 
        super(); 
        this.upFlow = upFlow; 
        this.downFlow = downFlow; 
        this.sumFlow = upFlow + downFlow; 
    } 
    public long getUpFlow() { 
        return upFlow; 
    }
    public void setUpFlow(long upFlow) { 
        this.upFlow = upFlow; 
    } 
    public long getDownFlow() { 
        return downFlow; 
    } 
    public void setDownFlow(long downFlow) { 
        this.downFlow = downFlow; 
    } 
    public long getSumFlow() { 
        return sumFlow; 
    } 
    public void setSumFlow(long sumFlow) { 
        this.sumFlow = sumFlow; 
    } 

    // 序列化方法 
    @Override 
    public void write(DataOutput out) throws IOException { 
        out.writeLong(upFlow); 
        out.writeLong(downFlow); 
        out.writeLong(sumFlow); 
    } 

    // 反序列化方法 
    // 注意:字段的反序列化顺序与序列化时的顺序保持一致,并且参数类型和个数也一致 
    @Override 
    public void readFields(DataInput in) throws IOException { 
        this.upFlow = in.readLong(); 
        this.downFlow = in.readLong(); 
        this.sumFlow = in.readLong(); 
    } 
    @Override public String toString() { 
        return upFlow + "\t" + downFlow + "\t" + sumFlow; 
    } 
}