下面對調用鍊的方法進行簡要的分析, 疑難的部分會重點分析。同時也會根據 建立代理 的分層逐個進行分析。
線上常用的是訂閱方式,本文隻分析訂閱方式的發送請求流程。
訂閱方式的發送請求前半部分
InvokerInvocationHandler.invoke(Object, Method, Object[])
動态代理handler執行方法,擷取方法名和參數建立RpcInvocation
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {//如果方法所屬的類是Object ,不執行遠端請求,執行Object原本的方法
return method.invoke(invoker, args);
}//如果 是toString,hashCode,equals 這些方法,執行本地方法
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
} //RpcInvocation封裝了方法名,參數類型,參數值
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
RpcInvocation構造方法分析
建立執行參數資料對象。
public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments, Map<String, String> attachments, Invoker<?> invoker) {
this.methodName = methodName;//方法名
this.parameterTypes = parameterTypes == null ? new Class<?>[0] : parameterTypes;//參數類型
this.arguments = arguments == null ? new Object[0] : arguments;//參數object
this.attachments = attachments == null ? new HashMap<String, String>() : attachments;//附加參數 指派空map
this.invoker = invoker;// invoker ==null
}
建立RpcInvocation,存入 方法名,參數類型,參數數組。附加參數 指派空map,invoker ==null。
動态代理層:擷取方法名和參數建立執行參數資料對象
InvokerInvocationHandler後面部分,下面詳細分析
MockClusterInvoker<T>.invoke(Invocation) : 轉發 ,MockClusterInvoker實作了模拟調用,測試功能。
FailoverClusterInvoker<T>(AbstractClusterInvoker<T>).invoke(Invocation) :執行路由,然後擷取loadbalance執行個體
FailoverClusterInvoker<T>.doInvoke(Invocation, List<Invoker<T>>, LoadBalance) :執行 負載均衡,失敗重試
RegistryDirectory$InvokerDelegete<T>(InvokerWrapper<T>).invoke(Invocation) //轉發
cluster層總結: 即雲層, 實作了 失敗重試,負載均衡,路由功能
訂閱方式發送請求後半部分
注意: ProtocolFilterWrapper.invoke 包含了多步鍊式調用,沒有詳細畫出。
按照調用順序 分析過濾鍊層
ProtocolFilterWrapper$1.invoke(Invocation):調用鍊實作
ProtocolFilterWrapper$1是 在組裝 filter鍊時使用的匿名内部類(如 new Interface(){})
ConsumerContextFilter.invoke(Invoker<?>, Invocation) //儲存上下文資料, 友善監控擷取
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext.getContext()//擷取上下文對象,RpcContext存在ThreadLocal中
.setInvoker(invoker)//存入 invoker
.setInvocation(invocation)//存入invocation
.setLocalAddress(NetUtils.getLocalHost(), 0)//存入目前機器ip
.setRemoteAddress(invoker.getUrl().getHost(),
invoker.getUrl().getPort());// 存入遠端服務ip 和端口号
if (invocation instanceof RpcInvocation) {// 在invocation中存入現階段的invoker
((RpcInvocation) invocation).setInvoker(invoker);//invoker type is ProtocolFilterWrapper$1
}
try {
return invoker.invoke(invocation);
} finally {
RpcContext.getContext().clearAttachments();
}
}
ProtocolFilterWrapper$1.invoke(Invocation) //filter鍊 處理
MonitorFilter.invoke(Invoker<?>, Invocation)//擷取監控資料
ProtocolFilterWrapper$1.invoke(Invocation)//filter鍊 處理
FutureFilter.invoke(Invoker<?>, Invocation) //尾端調用鍊,執行invoke(),同步或異步執行回調,或抛出異常
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
fireInvokeCallback(invoker, invocation);// 觸發 執行前回調
// need to configure if there's return value before the invocation in order to help invoker to judge if it's
// necessary to return future.
Result result = invoker.invoke(invocation);
//觸發執行後回調
if (isAsync) {//異步回調處理。為執行Future添加一個響應回調。
asyncCallback(invoker, invocation);
} else {//有異常抛出,沒異常執行回調
syncCallback(invoker, invocation, result);
}
return result;
}
ListenerInvokerWrapper<T>.invoke(Invocation)//轉發
filter按照以下順序執行
- ProtocolFilterWrapper
- ConsumerContextFilter
- MonitorFilter
- FutureFilter
ProtocolFilterWrapper:組裝調用鍊,實作AOP功能, 實作鍊式調用。
FutureFilter:尾端調用鍊,執行invoke(),同步或異步執行回調,或抛出異常
MonitorFilter:收集監控資料, 如 服務名,方法名,調用時間,并發數
ConsumerContextFilter:記錄調用的臨時狀态 友善監控擷取
過濾鍊層總結:鍊式執行多個過濾器, 收集監控資料,執行傳回結果的 同步或異步執行回調,或抛出異常
下面開始分析協定層
DubboInvoker<T>(AbstractInvoker<T>).invoke(Invocation) //為RpcInvocation添加attachment
public Result invoke(Invocation inv) throws RpcException {
//...
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);// 更換invoker, type DubboInvoker
if (attachment != null && attachment.size() > 0) {
invocation.addAttachmentsIfAbsent(attachment);//添加附加參數 attachment={interface=tuling.dubbo.server.UserService, timeout=2147483647}
}
Map<String, String> context = RpcContext.getContext().getAttachments();
if (context != null) {// context 是空的
invocation.addAttachmentsIfAbsent(context);
}
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {//如果配置了異步,invocation 附加參數添加異步key
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);//如果配置了異步,添加執行ID。
try {
return doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
//反射異常需要特殊處理
Throwable te = e.getTargetException();
if (te == null) {
return new RpcResult(e);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
return new RpcResult(te);
}
} catch (RpcException e) {//自定義異常處理
if (e.isBiz()) {
return new RpcResult(e);
} else {
throw e;
}
} catch (Throwable e) {//未知異常 封裝并傳回。
return new RpcResult(e);
}
}
DubboInvoker<T>.doInvoke(Invocation)//多個client輪詢,根據參數 執行 單向,異步,同步請求。
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());//path=tuling.dubbo.server.UserService
inv.setAttachment(Constants.VERSION_KEY, version);//version=0.0.0
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {//多個輪詢
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);//擷取是否是異步
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//擷取是否是單向
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {//單向執行 不記錄future
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {//異步執行
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));// 将 future 存入目前RpcContext中
return new RpcResult();//傳回空結果
} else {//同步執行
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();//等待響應 get 抛出TimeoutException
}
} catch (TimeoutException e) {//組裝 timeout異常資訊
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
協定層:多個client輪詢,執行 單向,異步,同步請求。
ReferenceCountExchangeClient.request(Object, int)//轉發
HeaderExchangeClient.request(Object, int) //轉發
HeaderExchangeChannel.request(Object, int) channel 類型是NettyClient
建立Request,指派版本号,是否雙向,請求data(類型 是RpcInvocation),建立DefaultFuture 傳入channel,requst執行個體,逾時時間,以request id為key緩存 future,為同步請求,執行future.get()等待響應做準備.
交換層:生成請求對象,實作同步等待機制。
網絡傳輸實作層分析
NettyClient(AbstractPeer).send(Object) //擷取 sent屬性,它辨別 是否等待發送完成
NettyClient(AbstractClient).send(Object, boolean) /沒有連接配接則建立連接配接
NettyChannel.send(Object, boolean) //調用 NioClientSocketChannel執行write
網絡傳輸實作層:沒有連接配接就建立連接配接,然後發送資料。
異常處理分析
交換層和網絡傳輸實作層抛出RemotingException,異常由NettyChannel.send(Object, boolean) 産生
DubboInvoker<T>.doInvoke(Invocation) 将 RemotingException封裝成RpcException
發送資料 NettyHandler 寫事件處理
DubboProtocol$1(ChannelHandlerAdapter).sent(Channel, Object) :轉發
HeaderExchangeHandler.sent(Channel, Object) :轉發
DecodeHandler(AbstractChannelHandlerDelegate).sent(Channel, Object) :轉發
AllChannelHandler(WrappedChannelHandler).sent(Channel, Object) :轉發
HeartbeatHandler.sent(Channel, Object) :儲存最後發出資料時間戳
MultiMessageHandler(AbstractChannelHandlerDelegate).sent(Channel, Object) :轉發
NettyClient(AbstractPeer).sent(Channel, Object):轉發
NettyHandler.writeRequested(ChannelHandlerContext, MessageEvent):轉發
層層調用 沒幹啥有用的事
精華總結:
動态代理層:擷取方法名和參數建立執行參數資料對象
過濾鍊層總結:鍊式執行多個過濾器, 收集監控資料,執行傳回結果的 同步或異步執行回調,或抛出異常
協定層:多個client輪詢,實作 單向,異步,同步請求。
交換層:生成請求對象,實作同步等待機制。
client層:沒有連接配接就建立連接配接,然後發送資料。