天天看点

03_用Socket模拟分布式计算(模拟Driver、Executor、RDD)

/*
*
*  1. Executor(server)
*     说明
*         负责 执行计算任务
*  2. Driver(client)
*     说明
*         负责 发送计算任务 给Executor
*  3. Task
*     负责 组织数据 和 计算逻辑
* */

// 模拟分布式计算
package TestOne {

  import java.io.{ObjectInputStream, ObjectOutputStream}
  import java.net.{InetAddress, ServerSocket, Socket}

  // 负责 执行计算任务
  object Executor extends App {
    //1. 创建ServerSocket,指定端口号
    private val serverSocket = new ServerSocket(8899)

    //2. 获取Socket实例
    println("开启Executor 等待计算任务")
    private val socket: Socket = serverSocket.accept

    //3. 读取计算任务
    private val objectInputStream = new ObjectInputStream(socket.getInputStream)
    private val readObject: Task = objectInputStream.readObject.asInstanceOf[Task]
    println(s"计算结果 : ${readObject.calculate}")


    //4. 关闭资源
    objectInputStream.close
    socket.close
    serverSocket.close
    println("计算完成")


  }

  // 负责 发送计算任务 给Executor
  object Driver extends App {
    //1. 创建Socket对象,指明服务器端ip和端口号
    private val address: InetAddress = InetAddress.getByName("127.0.0.1")
    private val socket = new Socket(address, 8899)

    //2. 获取输出流
    private val objectOutputStream = new ObjectOutputStream(socket.getOutputStream)

    //3. 写入数据
    objectOutputStream.writeObject(new Task)

    //4. 关闭资源
    objectOutputStream.close
    socket.close
    println("发送完毕")


  }

  // 负责 组织数据 和 计算逻辑
  class Task extends Serializable {
    //1. 组织数据
    val list: List[Int] = List(1, 2, 3, 4)

    //2. 计算逻辑 匿名函数
    val logic = (x: Int) => x * 2

    //3. 计算
    def calculate = {
      list.map(logic)
    }


  }


}

// 模拟分布式计算(1个Driver 2个Executor)
package TestTwo {

  import java.io.{ObjectInputStream, ObjectOutputStream}
  import java.net.{InetAddress, ServerSocket, Socket}

  // 负责 执行计算任务
  object Executor1 extends App {
    //1. 创建ServerSocket,指定端口号
    private val serverSocket = new ServerSocket(8898)

    //2. 获取Socket实例
    println("开启Executor1 等待计算任务")
    private val socket: Socket = serverSocket.accept

    //3. 读取计算任务
    private val objectInputStream = new ObjectInputStream(socket.getInputStream)
    private val readObject = objectInputStream.readObject.asInstanceOf[SubTask]
    println(s"计算结果 : ${readObject.calculate}")


    //4. 关闭资源
    objectInputStream.close
    socket.close
    serverSocket.close
    println("计算完成")


  }

  object Executor2 extends App {
    //1. 创建ServerSocket,指定端口号
    private val serverSocket = new ServerSocket(8899)

    //2. 获取Socket实例
    println("开启Executor2 等待计算任务")
    private val socket: Socket = serverSocket.accept

    //3. 读取计算任务
    private val objectInputStream = new ObjectInputStream(socket.getInputStream)
    private val readObject = objectInputStream.readObject.asInstanceOf[SubTask]
    println(s"计算结果 : ${readObject.calculate}")


    //4. 关闭资源
    objectInputStream.close
    socket.close
    serverSocket.close
    println("计算完成")


  }


  // 负责 发送计算任务 给Executor
  object Driver extends App {
    //1. 创建Socket对象,指明服务器端ip和端口号
    private val address: InetAddress = InetAddress.getByName("127.0.0.1")
    private val socket1 = new Socket(address, 8898)
    private val socket2 = new Socket(address, 8899)

    //2. 获取输出流
    private val objectOutputStream1 = new ObjectOutputStream(socket1.getOutputStream)
    private val objectOutputStream2 = new ObjectOutputStream(socket2.getOutputStream)

    //3. 写入数据
    private val task = new Task()
    private val subTask1 = new SubTask
    subTask1.list = task.list.take(2)
    private val subTask2 = new SubTask
    subTask2.list = task.list.reverse.take(2)
    objectOutputStream1.writeObject(subTask1)
    objectOutputStream2.writeObject(subTask2)

    //4. 关闭资源
    objectOutputStream1.close
    objectOutputStream2.close
    socket1.close
    socket2.close
    println("发送完毕")


  }

  // 负责 读取数据 和对数据分区
  class Task extends Serializable {
    //1. 组织数据
    var list: List[Int] = List(1, 2, 3, 4)
  }

  // 负责 封装计算逻辑
  class SubTask extends Serializable {
    //1. 组织数据
    var list: List[Int] = _

    //2. 计算逻辑 匿名函数
    val logic = (x: Int) => x * 2

    //3. 计算
    def calculate = {
      list.map(logic)
    }


  }


}