Netty 在服務端與用戶端的網絡通信中,使用的是異步雙向通信(雙工)的方式,即用戶端和服務端可以互相主動發請求給對方,發消息後不會同步等響應。這樣就會有一下問題:
- 如何識别消息是請求還是響應?
- 請求如何正确對應到響應?
1. 如何識别消息是請求還是響應
為了識别消息類型是請求或者響應,我們在消息中加入了
messageType
的屬性,在上文我們也提到,這個消息類型在自定義協定的頭部,他有幾種類型:請求、響應、心跳,我們先來說說請求、響應。
public enum MessageType {
/**
* 普通請求
*/
REQUEST((byte) 1),
/**
* 普通響應
*/
RESPONSE((byte) 2),
/**
* 心跳
*/
HEARTBEAT((byte) 3),
;
private final byte value;
}
請求(Request)的核心字段如下:
public class RpcRequest {
/**
* 接口名
*/
private String interfaceName;
/**
* 方法名
*/
private String methodName;
/**
* 參數清單
*/
private Object[] params;
/**
* 參數類型清單
*/
private Class<?>[] paramTypes;
/**
* 接口版本
*/
private String version;
}
響應(Response)的核心字段如下:
public class RpcResponse<T> {
/**
* 請求id
*/
private long requestId;
/**
* 響應碼
*/
private Integer code;
/**
* 提示消息
*/
private String message;
/**
* 響應資料
*/
private T data;
}
發送消息的時候,按照消息類型和結構體,将資料組裝好,寫到 channel 即可。接收消息則要先解碼,從消息頭拿到消息類型,根據消息類型來反序列化到對應的結構體。
2. 請求如何正确對應到響應
流程圖如下:
有幾個關鍵點:
- 用戶端請求之後拿到 Future
- 有一個
存放未響應的請求,Map
Key: RequestId,Value: Future
- 服務端響應的資料中,包含了用戶端的 RequestId,這是對應的關鍵
- 響應的結果會被
監聽到,根據響應的 RequestId 取出對應的 FutureNettyClientHandler.channelRead0
- 将結果寫到對應的 Future 中
- 用戶端通過
擷取到資料future.get()
1) 用戶端發請求
代碼如下:
public class NettyInvoker extends AbstractInvoker {
private final NettyClient nettyClient = NettyClient.getInstance();
@Override
protected RpcResult doInvoke(RpcRequest request, URL selected) throws RpcException {
// 擷取 Channel
Channel channel = nettyClient.getChannel(socketAddress);
// 構造一個空 Future
CompletableFuture<RpcResponse<?>> resultFuture = new CompletableFuture<>();
// 建構 RPC 消息,此處會建構 requestId
RpcMessage rpcMessage = buildRpcMessage(request);
// 将 request 和 Future 對應放到 Map 中
UnprocessedRequests.put(rpcMessage.getRequestId(), resultFuture);
// 送出請求
channel.writeAndFlush(rpcMessage);
// 傳回結果
return new AsyncResult(resultFuture);
}
// ...
}
傳回的
AsyncResult
隻是
future
的包裝。
public class AsyncResult implements RpcResult {
private final CompletableFuture<?> future;
public AsyncResult(CompletableFuture<?> future) {
this.future = future;
}
}
2) 請求暫存
這個存儲未響應的請求在
ccx-rpc
中是
UnprocessedRequests
類在管理:
public class UnprocessedRequests {
private static final Map<Long, CompletableFuture<RpcResponse<?>>> FUTURE_MAP = new ConcurrentHashMap<>();
public static void put(long requestId, CompletableFuture<RpcResponse<?>> future) {
FUTURE_MAP.put(requestId, future);
}
}
3) 服務端響應資料監聽
使用 Netty 的 Handler 監聽服務端響應的資料,當有資料響應,則調用
UnprocessedRequests.complete
寫入。
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcMessage> {
@Override
protected void channelRead0(ChannelHandlerContext context, RpcMessage requestMsg) {
RpcResponse<?> response = (RpcResponse<?>) requestMsg.getData();
UnprocessedRequests.complete(response);
}
}
UnprocessedRequests.complete
實際上就是找出并删除對應的請求,然後将資料寫入:
future.complete(rpcResponse)
public class UnprocessedRequests {
private static final Map<Long, CompletableFuture<RpcResponse<?>>> FUTURE_MAP = new ConcurrentHashMap<>();
/**
* 完成響應
*
* @param rpcResponse 響應内容
*/
public static void complete(RpcResponse<?> rpcResponse) {
CompletableFuture<RpcResponse<?>> future = FUTURE_MAP.remove(rpcResponse.getRequestId());
if (future != null) {
future.complete(rpcResponse);
} else {
throw new IllegalStateException("future is null. rpcResponse=" + JSONUtil.toJsonStr(rpcResponse));
}
}
}
最後通過
AsyncResult.getData
可以擷取到資料。
public class AsyncResult implements RpcResult {
private final CompletableFuture<?> future;
public AsyncResult(CompletableFuture<?> future) {
this.future = future;
}
@Override
public Object getData() {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
log.error("getData error.", e);
}
return null;
}
}
總結
Netty 網絡通信是異步雙工的,我們需要用正确 Request-Response 模型讓用戶端和服務端正确互動。
-
如何區分請求或響應?
在消息中,可以加入 messageType 字段用來區分是請求或者響應。
-
如何把請求和響應對應?
發出的請求需要用 RequestId 标記并用 Map 存起來。服務端收到請求之後,将 RequestId 原封不動寫到響應結果中。用戶端收到響應結果後,拿出 RequestId 找到對應的 Future 并寫入結果。
ccx-rpc 代碼已經開源
Github:https://github.com/chenchuxin/ccx-rpc
Gitee:https://gitee.com/imccx/ccx-rpc