Spark的中,通过netty实现了类似akka的actor机制。
在spark中,一个EndPointData就类似一个akka中的actor。
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
val inbox = new Inbox(ref, endpoint)
}
一个EndPointData由name,endpoint,ref,inbox四个成员组成。
其中name为该actor的名称,endpoint处理远程消息的处理,ref则为远程消息调用时客户端的目标对象,inbox则为endpoint处理消息来源时的消息存放信箱。
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
举一个例子,当driver端向executor端下发task的时候,在CoarseGrainedSchedulerBackend中将会根据任务需要执行的所在executorId寻找得到相应的EndPointData,之后得到EndPointData下的ref调用send()方法即完成了一次远程调用,而调用的具体task数据将会被序列化,并包装在LaunchTask中,LaunchTask类似actor模型下对应的动作。
其中,ref为远程executor在向driver端注册executor的时候序列化后,在driver处反序列化的。在其send()方法中,会构造一条RequestMessage。
override def send(message: Any): Unit = {
require(message != null, "Message is null")
nettyEnv.send(new RequestMessage(nettyEnv.address, this, message))
}
RequestMessage包含了调用方的远程地址,该ref本身供远程寻找具体的endpoint进行操作,和上述的具体消息。
private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
if (receiver.client != null) {
message.sendWith(receiver.client)
} else {
require(receiver.address != null,
"Cannot send message to client endpoint with no listen address.")
val targetOutbox = {
val outbox = outboxes.get(receiver.address)
if (outbox == null) {
val newOutbox = new Outbox(this, receiver.address)
val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox)
if (oldOutbox == null) {
newOutbox
} else {
oldOutbox
}
} else {
outbox
}
}
if (stopped.get) {
// It's possible that we put `targetOutbox` after stopping. So we need to clean it.
outboxes.remove(receiver.address)
targetOutbox.stop()
} else {
targetOutbox.send(message)
}
}
}
在包装好了的具体消息,如果该次消息为远程调用,将会判断是否已经完成了对应远程地址的netty客户端,如果已经完成了对应客户端的创建,将会直接发送,否则将会构造一个对应客户端的outbox缓存将要发送的数据,outbox中维护了一条队列用来存放等到发送的消息,直到对应客户端初试化完成后将会把outbox中的消息通过对应的netty客户端发送。
到executor观察其如何进行监听并处理相关的远程调用。
远程调用的主要处理在于Dispatcher类,其生成与封装在NettyRpcEnv的初始化过程中,NettyRpcEnv的初试化随着整个SparkEnv的初始化被调用。
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
private val transportContext = new TransportContext(transportConf,
new NettyRpcHandler(dispatcher, this, streamManager))
其中Dispatcher被初试化并封装在NettyRpcHandler当中,而NettyRpcHandler将作为上下文的一员,后续在Netty服务端的初始化当中,将该NettyRpcHandler的调用写到ChannelHandler重写的channelRead()方法中。
@Override
public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
if (request instanceof RequestMessage) {
requestHandler.handle((RequestMessage) request);
} else if (request instanceof ResponseMessage) {
responseHandler.handle((ResponseMessage) request);
} else {
ctx.fireChannelRead(request);
}
}
private void processRpcRequest(final RpcRequest req) {
try {
rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
}
@Override
public void onFailure(Throwable e) {
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
}
});
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
} finally {
req.body().release();
}
}
在channelRead()方法中,远程调用在完成类型判断后,最后将会通过NettyRpcHandler的handle()方法处理。
override def receive(
client: TransportClient,
message: ByteBuffer,
callback: RpcResponseCallback): Unit = {
val messageToDispatch = internalReceive(client, message)
dispatcher.postRemoteMessage(messageToDispatch, callback)
}
在NettyRpcHandler中,首先通过internalReceive()方法,反序列化为RequestMessage,之后调用Dispatcher的 postRemoteMessage()方法,定位到具体的endpoint上前去执行。
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
val error = synchronized {
val data = endpoints.get(endpointName)
if (stopped) {
Some(new RpcEnvStoppedException())
} else if (data == null) {
Some(new SparkException(s"Could not find $endpointName."))
} else {
data.inbox.post(message)
receivers.offer(data)
None
}
}
// We don't need to call `onStop` in the `synchronized` block
error.foreach(callbackIfStopped)
}
Dispatcher在postMessage()方法中完成对于远程调用的分发,此处将会根据远端携带的ref的name从本地name与EndPointData的映射关系中找到对应的endpoint进行处理,如果能够找到,将会把这条消息放到endpoint对应的inbox邮箱中等待endpoint获取后进行处理。
在Dispatcher中,维护着一个线程池,执行MessageLoop线程。
private class MessageLoop extends Runnable {
override def run(): Unit = {
try {
while (true) {
try {
val data = receivers.take()
if (data == PoisonPill) {
// Put PoisonPill back so that other MessageLoops can see it.
receivers.offer(PoisonPill)
return
}
data.inbox.process(Dispatcher.this)
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {
case ie: InterruptedException => // exit
}
}
}
MessageLoop将会不断从所有有消息但是还未处理的EndPointData集合中依次取出,调用其inbox信箱的process()方法调用endpoint的receive()方法根据远程调用的事件类型选择具体的逻辑处理对应的远程调用。
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
服务端通过RpcEnv的setUpEndpoint()方法注册一个EndPointData到Dispatcher当中,以便在上述消息分发的时候,可以找到对应的EndPointData,也就是处理事件远程调用的actor模型。