/*
*
* 1. Executor(server)
* 说明
* 负责 执行计算任务
* 2. Driver(client)
* 说明
* 负责 发送计算任务 给Executor
* 3. Task
* 负责 组织数据 和 计算逻辑
* */
// 模拟分布式计算
package TestOne {
import java.io.{ObjectInputStream, ObjectOutputStream}
import java.net.{InetAddress, ServerSocket, Socket}
// 负责 执行计算任务
object Executor extends App {
//1. 创建ServerSocket,指定端口号
private val serverSocket = new ServerSocket(8899)
//2. 获取Socket实例
println("开启Executor 等待计算任务")
private val socket: Socket = serverSocket.accept
//3. 读取计算任务
private val objectInputStream = new ObjectInputStream(socket.getInputStream)
private val readObject: Task = objectInputStream.readObject.asInstanceOf[Task]
println(s"计算结果 : ${readObject.calculate}")
//4. 关闭资源
objectInputStream.close
socket.close
serverSocket.close
println("计算完成")
}
// 负责 发送计算任务 给Executor
object Driver extends App {
//1. 创建Socket对象,指明服务器端ip和端口号
private val address: InetAddress = InetAddress.getByName("127.0.0.1")
private val socket = new Socket(address, 8899)
//2. 获取输出流
private val objectOutputStream = new ObjectOutputStream(socket.getOutputStream)
//3. 写入数据
objectOutputStream.writeObject(new Task)
//4. 关闭资源
objectOutputStream.close
socket.close
println("发送完毕")
}
// 负责 组织数据 和 计算逻辑
class Task extends Serializable {
//1. 组织数据
val list: List[Int] = List(1, 2, 3, 4)
//2. 计算逻辑 匿名函数
val logic = (x: Int) => x * 2
//3. 计算
def calculate = {
list.map(logic)
}
}
}
// 模拟分布式计算(1个Driver 2个Executor)
package TestTwo {
import java.io.{ObjectInputStream, ObjectOutputStream}
import java.net.{InetAddress, ServerSocket, Socket}
// 负责 执行计算任务
object Executor1 extends App {
//1. 创建ServerSocket,指定端口号
private val serverSocket = new ServerSocket(8898)
//2. 获取Socket实例
println("开启Executor1 等待计算任务")
private val socket: Socket = serverSocket.accept
//3. 读取计算任务
private val objectInputStream = new ObjectInputStream(socket.getInputStream)
private val readObject = objectInputStream.readObject.asInstanceOf[SubTask]
println(s"计算结果 : ${readObject.calculate}")
//4. 关闭资源
objectInputStream.close
socket.close
serverSocket.close
println("计算完成")
}
object Executor2 extends App {
//1. 创建ServerSocket,指定端口号
private val serverSocket = new ServerSocket(8899)
//2. 获取Socket实例
println("开启Executor2 等待计算任务")
private val socket: Socket = serverSocket.accept
//3. 读取计算任务
private val objectInputStream = new ObjectInputStream(socket.getInputStream)
private val readObject = objectInputStream.readObject.asInstanceOf[SubTask]
println(s"计算结果 : ${readObject.calculate}")
//4. 关闭资源
objectInputStream.close
socket.close
serverSocket.close
println("计算完成")
}
// 负责 发送计算任务 给Executor
object Driver extends App {
//1. 创建Socket对象,指明服务器端ip和端口号
private val address: InetAddress = InetAddress.getByName("127.0.0.1")
private val socket1 = new Socket(address, 8898)
private val socket2 = new Socket(address, 8899)
//2. 获取输出流
private val objectOutputStream1 = new ObjectOutputStream(socket1.getOutputStream)
private val objectOutputStream2 = new ObjectOutputStream(socket2.getOutputStream)
//3. 写入数据
private val task = new Task()
private val subTask1 = new SubTask
subTask1.list = task.list.take(2)
private val subTask2 = new SubTask
subTask2.list = task.list.reverse.take(2)
objectOutputStream1.writeObject(subTask1)
objectOutputStream2.writeObject(subTask2)
//4. 关闭资源
objectOutputStream1.close
objectOutputStream2.close
socket1.close
socket2.close
println("发送完毕")
}
// 负责 读取数据 和对数据分区
class Task extends Serializable {
//1. 组织数据
var list: List[Int] = List(1, 2, 3, 4)
}
// 负责 封装计算逻辑
class SubTask extends Serializable {
//1. 组织数据
var list: List[Int] = _
//2. 计算逻辑 匿名函数
val logic = (x: Int) => x * 2
//3. 计算
def calculate = {
list.map(logic)
}
}
}