天天看點

dubbo源碼分析4-consumer發送請求

下面對調用鍊的方法進行簡要的分析, 疑難的部分會重點分析。同時也會根據  建立代理 的分層逐個進行分析。

線上常用的是訂閱方式,本文隻分析訂閱方式的發送請求流程。 

訂閱方式的發送請求前半部分

dubbo源碼分析4-consumer發送請求

InvokerInvocationHandler.invoke(Object, Method, Object[])         

動态代理handler執行方法,擷取方法名和參數建立RpcInvocation

   public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        String methodName = method.getName();

        Class<?>[] parameterTypes = method.getParameterTypes();

        if (method.getDeclaringClass() == Object.class) {//如果方法所屬的類是Object ,不執行遠端請求,執行Object原本的方法

            return method.invoke(invoker, args);

        }//如果 是toString,hashCode,equals 這些方法,執行本地方法

        if ("toString".equals(methodName) && parameterTypes.length == 0) {

            return invoker.toString();

        }

        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {

            return invoker.hashCode();

        }

        if ("equals".equals(methodName) && parameterTypes.length == 1) {

            return invoker.equals(args[0]);

        } //RpcInvocation封裝了方法名,參數類型,參數值

        return invoker.invoke(new RpcInvocation(method, args)).recreate();

    }

RpcInvocation構造方法分析

 建立執行參數資料對象。

   public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments, Map<String, String> attachments, Invoker<?> invoker) {

        this.methodName = methodName;//方法名

        this.parameterTypes = parameterTypes == null ? new Class<?>[0] : parameterTypes;//參數類型

        this.arguments = arguments == null ? new Object[0] : arguments;//參數object

        this.attachments = attachments == null ? new HashMap<String, String>() : attachments;//附加參數 指派空map

        this.invoker = invoker;// invoker ==null

    }

建立RpcInvocation,存入 方法名,參數類型,參數數組。附加參數 指派空map,invoker ==null。

動态代理層:擷取方法名和參數建立執行參數資料對象

InvokerInvocationHandler後面部分,下面詳細分析

MockClusterInvoker<T>.invoke(Invocation) :     轉發 ,MockClusterInvoker實作了模拟調用,測試功能。

FailoverClusterInvoker<T>(AbstractClusterInvoker<T>).invoke(Invocation) :執行路由,然後擷取loadbalance執行個體

FailoverClusterInvoker<T>.doInvoke(Invocation, List<Invoker<T>>, LoadBalance) :執行 負載均衡,失敗重試    

RegistryDirectory$InvokerDelegete<T>(InvokerWrapper<T>).invoke(Invocation) //轉發

  cluster層總結: 即雲層, 實作了 失敗重試,負載均衡,路由功能

訂閱方式發送請求後半部分

dubbo源碼分析4-consumer發送請求

注意: ProtocolFilterWrapper.invoke 包含了多步鍊式調用,沒有詳細畫出。

按照調用順序 分析過濾鍊層

ProtocolFilterWrapper$1.invoke(Invocation):調用鍊實作

ProtocolFilterWrapper$1是 在組裝 filter鍊時使用的匿名内部類(如 new  Interface(){})

ConsumerContextFilter.invoke(Invoker<?>, Invocation) //儲存上下文資料,     友善監控擷取 

  public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        RpcContext.getContext()//擷取上下文對象,RpcContext存在ThreadLocal中

                .setInvoker(invoker)//存入 invoker

                .setInvocation(invocation)//存入invocation

                .setLocalAddress(NetUtils.getLocalHost(), 0)//存入目前機器ip

                .setRemoteAddress(invoker.getUrl().getHost(),

                        invoker.getUrl().getPort());// 存入遠端服務ip 和端口号

        if (invocation instanceof RpcInvocation) {// 在invocation中存入現階段的invoker 

            ((RpcInvocation) invocation).setInvoker(invoker);//invoker type is ProtocolFilterWrapper$1

        }

        try {

            return invoker.invoke(invocation);

        } finally {

            RpcContext.getContext().clearAttachments();

        }

    }    

ProtocolFilterWrapper$1.invoke(Invocation) //filter鍊 處理

MonitorFilter.invoke(Invoker<?>, Invocation)//擷取監控資料    

ProtocolFilterWrapper$1.invoke(Invocation)//filter鍊 處理

FutureFilter.invoke(Invoker<?>, Invocation) //尾端調用鍊,執行invoke(),同步或異步執行回調,或抛出異常        

    public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {

        final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);

        fireInvokeCallback(invoker, invocation);// 觸發 執行前回調

        // need to configure if there's return value before the invocation in order to help invoker to judge if it's

        // necessary to return future.

        Result result = invoker.invoke(invocation);

//觸發執行後回調

        if (isAsync) {//異步回調處理。為執行Future添加一個響應回調。

            asyncCallback(invoker, invocation);

        } else {//有異常抛出,沒異常執行回調

            syncCallback(invoker, invocation, result);

        }

        return result;

    }

ListenerInvokerWrapper<T>.invoke(Invocation)//轉發    

filter按照以下順序執行

  1. ProtocolFilterWrapper
  2. ConsumerContextFilter     
  3. MonitorFilter  
  4. FutureFilter    

ProtocolFilterWrapper:組裝調用鍊,實作AOP功能, 實作鍊式調用。

FutureFilter:尾端調用鍊,執行invoke(),同步或異步執行回調,或抛出異常

MonitorFilter:收集監控資料, 如 服務名,方法名,調用時間,并發數

ConsumerContextFilter:記錄調用的臨時狀态     友善監控擷取

過濾鍊層總結:鍊式執行多個過濾器, 收集監控資料,執行傳回結果的 同步或異步執行回調,或抛出異常

下面開始分析協定層

DubboInvoker<T>(AbstractInvoker<T>).invoke(Invocation) //為RpcInvocation添加attachment

   public Result invoke(Invocation inv) throws RpcException {

//...

        RpcInvocation invocation = (RpcInvocation) inv;

        invocation.setInvoker(this);// 更換invoker, type DubboInvoker

        if (attachment != null && attachment.size() > 0) {    

            invocation.addAttachmentsIfAbsent(attachment);//添加附加參數 attachment={interface=tuling.dubbo.server.UserService, timeout=2147483647}

        }

        Map<String, String> context = RpcContext.getContext().getAttachments();

        if (context != null) {// context 是空的

            invocation.addAttachmentsIfAbsent(context);

        }

        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {//如果配置了異步,invocation 附加參數添加異步key

            invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());

        }

        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);//如果配置了異步,添加執行ID。

        try {

            return doInvoke(invocation);

        } catch (InvocationTargetException e) { // biz exception

//反射異常需要特殊處理

            Throwable te = e.getTargetException();

            if (te == null) {

                return new RpcResult(e);

            } else {

                if (te instanceof RpcException) {

                    ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);

                }

                return new RpcResult(te);

            }

        } catch (RpcException e) {//自定義異常處理

            if (e.isBiz()) {

                return new RpcResult(e);

            } else {

                throw e;

            }

        } catch (Throwable e) {//未知異常 封裝并傳回。

            return new RpcResult(e);

        }

    }    

DubboInvoker<T>.doInvoke(Invocation)//多個client輪詢,根據參數 執行 單向,異步,同步請求。

    protected Result doInvoke(final Invocation invocation) throws Throwable {

        RpcInvocation inv = (RpcInvocation) invocation;

        final String methodName = RpcUtils.getMethodName(invocation);

        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());//path=tuling.dubbo.server.UserService

        inv.setAttachment(Constants.VERSION_KEY, version);//version=0.0.0

        ExchangeClient currentClient;

        if (clients.length == 1) {

            currentClient = clients[0];

        } else {//多個輪詢

            currentClient = clients[index.getAndIncrement() % clients.length];

        }

        try {

            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);//擷取是否是異步

            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//擷取是否是單向

            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

            if (isOneway) {//單向執行 不記錄future

                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);

                currentClient.send(inv, isSent);

                RpcContext.getContext().setFuture(null);

                return new RpcResult();

            } else if (isAsync) {//異步執行

                ResponseFuture future = currentClient.request(inv, timeout);

                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));// 将 future 存入目前RpcContext中

                return new RpcResult();//傳回空結果

            } else {//同步執行

                RpcContext.getContext().setFuture(null);

                return (Result) currentClient.request(inv, timeout).get();//等待響應  get 抛出TimeoutException

            }

        } catch (TimeoutException e) {//組裝 timeout異常資訊

            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);

        } catch (RemotingException e) {

            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);

        }

    }

協定層:多個client輪詢,執行 單向,異步,同步請求。

ReferenceCountExchangeClient.request(Object, int)//轉發    

HeaderExchangeClient.request(Object, int) //轉發

HeaderExchangeChannel.request(Object, int) channel 類型是NettyClient

建立Request,指派版本号,是否雙向,請求data(類型 是RpcInvocation),建立DefaultFuture 傳入channel,requst執行個體,逾時時間,以request id為key緩存 future,為同步請求,執行future.get()等待響應做準備.

交換層:生成請求對象,實作同步等待機制。

網絡傳輸實作層分析

NettyClient(AbstractPeer).send(Object) //擷取 sent屬性,它辨別 是否等待發送完成    

NettyClient(AbstractClient).send(Object, boolean) /沒有連接配接則建立連接配接

NettyChannel.send(Object, boolean)     //調用 NioClientSocketChannel執行write

網絡傳輸實作層:沒有連接配接就建立連接配接,然後發送資料。

異常處理分析 

交換層和網絡傳輸實作層抛出RemotingException,異常由NettyChannel.send(Object, boolean) 産生

DubboInvoker<T>.doInvoke(Invocation) 将    RemotingException封裝成RpcException

發送資料 NettyHandler 寫事件處理

DubboProtocol$1(ChannelHandlerAdapter).sent(Channel, Object) :轉發    

HeaderExchangeHandler.sent(Channel, Object) :轉發

DecodeHandler(AbstractChannelHandlerDelegate).sent(Channel, Object) :轉發    

AllChannelHandler(WrappedChannelHandler).sent(Channel, Object)     :轉發

HeartbeatHandler.sent(Channel, Object) :儲存最後發出資料時間戳    

MultiMessageHandler(AbstractChannelHandlerDelegate).sent(Channel, Object)    :轉發    

NettyClient(AbstractPeer).sent(Channel, Object):轉發    

NettyHandler.writeRequested(ChannelHandlerContext, MessageEvent):轉發    

層層調用 沒幹啥有用的事

精華總結:

動态代理層:擷取方法名和參數建立執行參數資料對象

過濾鍊層總結:鍊式執行多個過濾器, 收集監控資料,執行傳回結果的 同步或異步執行回調,或抛出異常

協定層:多個client輪詢,實作 單向,異步,同步請求。

交換層:生成請求對象,實作同步等待機制。

client層:沒有連接配接就建立連接配接,然後發送資料。

繼續閱讀