天天看点

spark 如何用netty实现akka的actor模型

Spark的中,通过netty实现了类似akka的actor机制。

在spark中,一个EndPointData就类似一个akka中的actor。

private class EndpointData(
    val name: String,
    val endpoint: RpcEndpoint,
    val ref: NettyRpcEndpointRef) {
  val inbox = new Inbox(ref, endpoint)
}
           

一个EndPointData由name,endpoint,ref,inbox四个成员组成。

其中name为该actor的名称,endpoint处理远程消息的处理,ref则为远程消息调用时客户端的目标对象,inbox则为endpoint处理消息来源时的消息存放信箱。

val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK

logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
  s"${executorData.executorHost}.")

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
           

举一个例子,当driver端向executor端下发task的时候,在CoarseGrainedSchedulerBackend中将会根据任务需要执行的所在executorId寻找得到相应的EndPointData,之后得到EndPointData下的ref调用send()方法即完成了一次远程调用,而调用的具体task数据将会被序列化,并包装在LaunchTask中,LaunchTask类似actor模型下对应的动作。

其中,ref为远程executor在向driver端注册executor的时候序列化后,在driver处反序列化的。在其send()方法中,会构造一条RequestMessage。

override def send(message: Any): Unit = {
  require(message != null, "Message is null")
  nettyEnv.send(new RequestMessage(nettyEnv.address, this, message))
}
           

RequestMessage包含了调用方的远程地址,该ref本身供远程寻找具体的endpoint进行操作,和上述的具体消息。

private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
  if (receiver.client != null) {
    message.sendWith(receiver.client)
  } else {
    require(receiver.address != null,
      "Cannot send message to client endpoint with no listen address.")
    val targetOutbox = {
      val outbox = outboxes.get(receiver.address)
      if (outbox == null) {
        val newOutbox = new Outbox(this, receiver.address)
        val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox)
        if (oldOutbox == null) {
          newOutbox
        } else {
          oldOutbox
        }
      } else {
        outbox
      }
    }
    if (stopped.get) {
      // It's possible that we put `targetOutbox` after stopping. So we need to clean it.
      outboxes.remove(receiver.address)
      targetOutbox.stop()
    } else {
      targetOutbox.send(message)
    }
  }
}
           

在包装好了的具体消息,如果该次消息为远程调用,将会判断是否已经完成了对应远程地址的netty客户端,如果已经完成了对应客户端的创建,将会直接发送,否则将会构造一个对应客户端的outbox缓存将要发送的数据,outbox中维护了一条队列用来存放等到发送的消息,直到对应客户端初试化完成后将会把outbox中的消息通过对应的netty客户端发送。

到executor观察其如何进行监听并处理相关的远程调用。

远程调用的主要处理在于Dispatcher类,其生成与封装在NettyRpcEnv的初始化过程中,NettyRpcEnv的初试化随着整个SparkEnv的初始化被调用。

private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)

private val transportContext = new TransportContext(transportConf,
  new NettyRpcHandler(dispatcher, this, streamManager))
           

其中Dispatcher被初试化并封装在NettyRpcHandler当中,而NettyRpcHandler将作为上下文的一员,后续在Netty服务端的初始化当中,将该NettyRpcHandler的调用写到ChannelHandler重写的channelRead()方法中。

@Override
public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
  if (request instanceof RequestMessage) {
    requestHandler.handle((RequestMessage) request);
  } else if (request instanceof ResponseMessage) {
    responseHandler.handle((ResponseMessage) request);
  } else {
    ctx.fireChannelRead(request);
  }
}

private void processRpcRequest(final RpcRequest req) {
  try {
    rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
      @Override
      public void onSuccess(ByteBuffer response) {
        respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
      }

      @Override
      public void onFailure(Throwable e) {
        respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
      }
    });
  } catch (Exception e) {
    logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
    respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
  } finally {
    req.body().release();
  }
}
           

在channelRead()方法中,远程调用在完成类型判断后,最后将会通过NettyRpcHandler的handle()方法处理。

override def receive(
    client: TransportClient,
    message: ByteBuffer,
    callback: RpcResponseCallback): Unit = {
  val messageToDispatch = internalReceive(client, message)
  dispatcher.postRemoteMessage(messageToDispatch, callback)
}
           

在NettyRpcHandler中,首先通过internalReceive()方法,反序列化为RequestMessage,之后调用Dispatcher的 postRemoteMessage()方法,定位到具体的endpoint上前去执行。

private def postMessage(
    endpointName: String,
    message: InboxMessage,
    callbackIfStopped: (Exception) => Unit): Unit = {
  val error = synchronized {
    val data = endpoints.get(endpointName)
    if (stopped) {
      Some(new RpcEnvStoppedException())
    } else if (data == null) {
      Some(new SparkException(s"Could not find $endpointName."))
    } else {
      data.inbox.post(message)
      receivers.offer(data)
      None
    }
  }
  // We don't need to call `onStop` in the `synchronized` block
  error.foreach(callbackIfStopped)
}
           

Dispatcher在postMessage()方法中完成对于远程调用的分发,此处将会根据远端携带的ref的name从本地name与EndPointData的映射关系中找到对应的endpoint进行处理,如果能够找到,将会把这条消息放到endpoint对应的inbox邮箱中等待endpoint获取后进行处理。

在Dispatcher中,维护着一个线程池,执行MessageLoop线程。

private class MessageLoop extends Runnable {
  override def run(): Unit = {
    try {
      while (true) {
        try {
          val data = receivers.take()
          if (data == PoisonPill) {
            // Put PoisonPill back so that other MessageLoops can see it.
            receivers.offer(PoisonPill)
            return
          }
          data.inbox.process(Dispatcher.this)
        } catch {
          case NonFatal(e) => logError(e.getMessage, e)
        }
      }
    } catch {
      case ie: InterruptedException => // exit
    }
  }
}
           

MessageLoop将会不断从所有有消息但是还未处理的EndPointData集合中依次取出,调用其inbox信箱的process()方法调用endpoint的receive()方法根据远程调用的事件类型选择具体的逻辑处理对应的远程调用。

env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
  env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
           

服务端通过RpcEnv的setUpEndpoint()方法注册一个EndPointData到Dispatcher当中,以便在上述消息分发的时候,可以找到对应的EndPointData,也就是处理事件远程调用的actor模型。

继续阅读