Spark的中,通過netty實作了類似akka的actor機制。
在spark中,一個EndPointData就類似一個akka中的actor。
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
val inbox = new Inbox(ref, endpoint)
}
一個EndPointData由name,endpoint,ref,inbox四個成員組成。
其中name為該actor的名稱,endpoint處理遠端消息的處理,ref則為遠端消息調用時用戶端的目标對象,inbox則為endpoint處理消息來源時的消息存放信箱。
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
舉一個例子,當driver端向executor端下發task的時候,在CoarseGrainedSchedulerBackend中将會根據任務需要執行的所在executorId尋找得到相應的EndPointData,之後得到EndPointData下的ref調用send()方法即完成了一次遠端調用,而調用的具體task資料将會被序列化,并包裝在LaunchTask中,LaunchTask類似actor模型下對應的動作。
其中,ref為遠端executor在向driver端注冊executor的時候序列化後,在driver處反序列化的。在其send()方法中,會構造一條RequestMessage。
override def send(message: Any): Unit = {
require(message != null, "Message is null")
nettyEnv.send(new RequestMessage(nettyEnv.address, this, message))
}
RequestMessage包含了調用方的遠端位址,該ref本身供遠端尋找具體的endpoint進行操作,和上述的具體消息。
private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
if (receiver.client != null) {
message.sendWith(receiver.client)
} else {
require(receiver.address != null,
"Cannot send message to client endpoint with no listen address.")
val targetOutbox = {
val outbox = outboxes.get(receiver.address)
if (outbox == null) {
val newOutbox = new Outbox(this, receiver.address)
val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox)
if (oldOutbox == null) {
newOutbox
} else {
oldOutbox
}
} else {
outbox
}
}
if (stopped.get) {
// It's possible that we put `targetOutbox` after stopping. So we need to clean it.
outboxes.remove(receiver.address)
targetOutbox.stop()
} else {
targetOutbox.send(message)
}
}
}
在包裝好了的具體消息,如果該次消息為遠端調用,将會判斷是否已經完成了對應遠端位址的netty用戶端,如果已經完成了對應用戶端的建立,将會直接發送,否則将會構造一個對應用戶端的outbox緩存将要發送的資料,outbox中維護了一條隊列用來存放等到發送的消息,直到對應用戶端初試化完成後将會把outbox中的消息通過對應的netty用戶端發送。
到executor觀察其如何進行監聽并處理相關的遠端調用。
遠端調用的主要處理在于Dispatcher類,其生成與封裝在NettyRpcEnv的初始化過程中,NettyRpcEnv的初試化随着整個SparkEnv的初始化被調用。
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
private val transportContext = new TransportContext(transportConf,
new NettyRpcHandler(dispatcher, this, streamManager))
其中Dispatcher被初試化并封裝在NettyRpcHandler當中,而NettyRpcHandler将作為上下文的一員,後續在Netty服務端的初始化當中,将該NettyRpcHandler的調用寫到ChannelHandler重寫的channelRead()方法中。
@Override
public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
if (request instanceof RequestMessage) {
requestHandler.handle((RequestMessage) request);
} else if (request instanceof ResponseMessage) {
responseHandler.handle((ResponseMessage) request);
} else {
ctx.fireChannelRead(request);
}
}
private void processRpcRequest(final RpcRequest req) {
try {
rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
}
@Override
public void onFailure(Throwable e) {
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
}
});
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
} finally {
req.body().release();
}
}
在channelRead()方法中,遠端調用在完成類型判斷後,最後将會通過NettyRpcHandler的handle()方法處理。
override def receive(
client: TransportClient,
message: ByteBuffer,
callback: RpcResponseCallback): Unit = {
val messageToDispatch = internalReceive(client, message)
dispatcher.postRemoteMessage(messageToDispatch, callback)
}
在NettyRpcHandler中,首先通過internalReceive()方法,反序列化為RequestMessage,之後調用Dispatcher的 postRemoteMessage()方法,定位到具體的endpoint上前去執行。
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
val error = synchronized {
val data = endpoints.get(endpointName)
if (stopped) {
Some(new RpcEnvStoppedException())
} else if (data == null) {
Some(new SparkException(s"Could not find $endpointName."))
} else {
data.inbox.post(message)
receivers.offer(data)
None
}
}
// We don't need to call `onStop` in the `synchronized` block
error.foreach(callbackIfStopped)
}
Dispatcher在postMessage()方法中完成對于遠端調用的分發,此處将會根據遠端攜帶的ref的name從本地name與EndPointData的映射關系中找到對應的endpoint進行處理,如果能夠找到,将會把這條消息放到endpoint對應的inbox郵箱中等待endpoint擷取後進行處理。
在Dispatcher中,維護着一個線程池,執行MessageLoop線程。
private class MessageLoop extends Runnable {
override def run(): Unit = {
try {
while (true) {
try {
val data = receivers.take()
if (data == PoisonPill) {
// Put PoisonPill back so that other MessageLoops can see it.
receivers.offer(PoisonPill)
return
}
data.inbox.process(Dispatcher.this)
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {
case ie: InterruptedException => // exit
}
}
}
MessageLoop将會不斷從所有有消息但是還未處理的EndPointData集合中依次取出,調用其inbox信箱的process()方法調用endpoint的receive()方法根據遠端調用的事件類型選擇具體的邏輯處理對應的遠端調用。
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
服務端通過RpcEnv的setUpEndpoint()方法注冊一個EndPointData到Dispatcher當中,以便在上述消息分發的時候,可以找到對應的EndPointData,也就是處理事件遠端調用的actor模型。