天天看點

從零開始實作簡單 RPC 架構 8:網絡通信之 Request-Response 模型

Netty 在服務端與用戶端的網絡通信中,使用的是異步雙向通信(雙工)的方式,即用戶端和服務端可以互相主動發請求給對方,發消息後不會同步等響應。這樣就會有一下問題:

  1. 如何識别消息是請求還是響應?
  2. 請求如何正确對應到響應?

1. 如何識别消息是請求還是響應

為了識别消息類型是請求或者響應,我們在消息中加入了

messageType

的屬性,在上文我們也提到,這個消息類型在自定義協定的頭部,他有幾種類型:請求、響應、心跳,我們先來說說請求、響應。

public enum MessageType {
    /**
     * 普通請求
     */
    REQUEST((byte) 1),

    /**
     * 普通響應
     */
    RESPONSE((byte) 2),

    /**
     * 心跳
     */
    HEARTBEAT((byte) 3),
    ;
    private final byte value;
}
           

請求(Request)的核心字段如下:

public class RpcRequest {
    /**
     * 接口名
     */
    private String interfaceName;
    /**
     * 方法名
     */
    private String methodName;
    /**
     * 參數清單
     */
    private Object[] params;
    /**
     * 參數類型清單
     */
    private Class<?>[] paramTypes;
    /**
     * 接口版本
     */
    private String version;
}
           

響應(Response)的核心字段如下:

public class RpcResponse<T> {
    /**
     * 請求id
     */
    private long requestId;
    /**
     * 響應碼
     */
    private Integer code;
    /**
     * 提示消息
     */
    private String message;
    /**
     * 響應資料
     */
    private T data;
}
           

發送消息的時候,按照消息類型和結構體,将資料組裝好,寫到 channel 即可。接收消息則要先解碼,從消息頭拿到消息類型,根據消息類型來反序列化到對應的結構體。

2. 請求如何正确對應到響應

流程圖如下:

從零開始實作簡單 RPC 架構 8:網絡通信之 Request-Response 模型

有幾個關鍵點:

  1. 用戶端請求之後拿到 Future
  2. 有一個

    Map

    存放未響應的請求,

    Key: RequestId,Value: Future

  3. 服務端響應的資料中,包含了用戶端的 RequestId,這是對應的關鍵
  4. 響應的結果會被

    NettyClientHandler.channelRead0

    監聽到,根據響應的 RequestId 取出對應的 Future
  5. 将結果寫到對應的 Future 中
  6. 用戶端通過

    future.get()

    擷取到資料

1) 用戶端發請求

代碼如下:

public class NettyInvoker extends AbstractInvoker {

    private final NettyClient nettyClient = NettyClient.getInstance();

    @Override
    protected RpcResult doInvoke(RpcRequest request, URL selected) throws RpcException {
        // 擷取 Channel
        Channel channel = nettyClient.getChannel(socketAddress);
        // 構造一個空 Future
        CompletableFuture<RpcResponse<?>> resultFuture = new CompletableFuture<>();
        // 建構 RPC 消息,此處會建構 requestId
        RpcMessage rpcMessage = buildRpcMessage(request);
        // 将 request 和 Future 對應放到 Map 中
        UnprocessedRequests.put(rpcMessage.getRequestId(), resultFuture);
        // 送出請求
        channel.writeAndFlush(rpcMessage);
        // 傳回結果
        return new AsyncResult(resultFuture);
    }
    // ...
}
           

傳回的

AsyncResult

隻是

future

的包裝。

public class AsyncResult implements RpcResult {

    private final CompletableFuture<?> future;

    public AsyncResult(CompletableFuture<?> future) {
        this.future = future;
    }
}
           

2) 請求暫存

這個存儲未響應的請求在

ccx-rpc

中是

UnprocessedRequests

類在管理:

public class UnprocessedRequests {
    private static final Map<Long, CompletableFuture<RpcResponse<?>>> FUTURE_MAP = new ConcurrentHashMap<>();

    public static void put(long requestId, CompletableFuture<RpcResponse<?>> future) {
        FUTURE_MAP.put(requestId, future);
    }
}
           

3) 服務端響應資料監聽

使用 Netty 的 Handler 監聽服務端響應的資料,當有資料響應,則調用

UnprocessedRequests.complete

寫入。

public class NettyClientHandler extends SimpleChannelInboundHandler<RpcMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext context, RpcMessage requestMsg) {
        RpcResponse<?> response = (RpcResponse<?>) requestMsg.getData();
        UnprocessedRequests.complete(response);
    }
}
           

UnprocessedRequests.complete

實際上就是找出并删除對應的請求,然後将資料寫入:

future.complete(rpcResponse)

public class UnprocessedRequests {
    private static final Map<Long, CompletableFuture<RpcResponse<?>>> FUTURE_MAP = new ConcurrentHashMap<>();

    /**
     * 完成響應
     *
     * @param rpcResponse 響應内容
     */
    public static void complete(RpcResponse<?> rpcResponse) {
        CompletableFuture<RpcResponse<?>> future = FUTURE_MAP.remove(rpcResponse.getRequestId());
        if (future != null) {
            future.complete(rpcResponse);
        } else {
            throw new IllegalStateException("future is null. rpcResponse=" + JSONUtil.toJsonStr(rpcResponse));
        }
    }
}
           

最後通過

AsyncResult.getData

可以擷取到資料。

public class AsyncResult implements RpcResult {

    private final CompletableFuture<?> future;

    public AsyncResult(CompletableFuture<?> future) {
        this.future = future;
    }

    @Override
    public Object getData() {
        try {
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            log.error("getData error.", e);
        }
        return null;
    }
}
           

總結

Netty 網絡通信是異步雙工的,我們需要用正确 Request-Response 模型讓用戶端和服務端正确互動。

  1. 如何區分請求或響應?

    在消息中,可以加入 messageType 字段用來區分是請求或者響應。

  2. 如何把請求和響應對應?

    發出的請求需要用 RequestId 标記并用 Map 存起來。服務端收到請求之後,将 RequestId 原封不動寫到響應結果中。用戶端收到響應結果後,拿出 RequestId 找到對應的 Future 并寫入結果。

ccx-rpc 代碼已經開源

Github:https://github.com/chenchuxin/ccx-rpc

Gitee:https://gitee.com/imccx/ccx-rpc