天天看點

dubbo 源碼學習筆記 (三) —— dubbo引用服務的過程

歡迎通路我的個人部落格休息的風

與dubbo服務釋出相對的,是引用服務進行調用的過程,這個很多步驟都是與服務釋出相對的,但是也有特有的地方,比如,負載均衡 ,叢集容錯等。這篇部落格,我們主要關注dubbo服務調用的一個核心過程。

dubbo服務調用的主要過程:将調用資訊注冊到zk上-> 通知RegistryDirectory重新整理可用服務清單->重新整理過程中,新服務會與netty服務端建立連接配接,并封裝到DubboInvoker中。-> 選擇失敗政策通過負載均衡算法,選擇服務端具體哪個服務去執行 -> 通過netty傳回執行結果。

Dubbo服務調用過程圖如下:(看不清,請點選新的頁簽進行檢視)  

dubbo 源碼學習筆記 (三) —— dubbo引用服務的過程

在ReferenceConfig.init的方法裡,會把配置資訊封閉成一個map,然後去建構一個代理類。在建構這個代理類,會用DubboProtocol擷取DubboInvoker。

private T createProxy(Map<String, String> map) {
    // 省略一些代碼。。。
    // 通過注冊中心配置拼裝URL
            List<URL> us = loadRegistries(false);
            if (us != null && us.size() > 0) {
                for (URL u : us) {
                    URL monitorUrl = loadMonitor(u);
                    if (monitorUrl != null) {
                        map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                    }
                    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                }
            }
            if (urls == null || urls.size() == 0) {
                throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
            }
        }

        if (urls.size() == 1) {
            //會調用RegistryProtocol.refer
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
        } else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
           //省略很多代碼
    // 建立服務代理,使用JavassistProxyFactory去建立
    return (T) proxyFactory.getProxy(invoker);
}      

在RegistryProtocol裡,會建立一個RegistryDirectory,這個對象儲存的調用服務的資訊,并且也是一個監聽。

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
    if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        //調用zk去真實的注冊
        registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                Constants.CHECK_KEY, String.valueOf(false)));
    }
    //最終也是調用zk去訂閱監聽,監聽器是RegistryDirectory
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
            Constants.PROVIDERS_CATEGORY
                    + "," + Constants.CONFIGURATORS_CATEGORY
                    + "," + Constants.ROUTERS_CATEGORY));
    return cluster.join(directory);
}      

在真正注冊到zk上,注冊監聽器到zk相應路徑上,會調用RegistryDirectory的notify通知方法,去擷取可用的服務清單。

public synchronized void notify(List<URL> urls) {
    List<URL> invokerUrls = new ArrayList<URL>();
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();
   //初始化值 
    //省略代碼
    // configurators
    if (configuratorUrls != null && configuratorUrls.size() > 0) {
        this.configurators = toConfigurators(configuratorUrls);
    }
    // routers
    if (routerUrls != null && routerUrls.size() > 0) {
        List<Router> routers = toRouters(routerUrls);
        if (routers != null) { // null - do nothing
            setRouters(routers);
        }
    }
    List<Configurator> localConfigurators = this.configurators; // local reference
    // 合并override參數
    this.overrideDirectoryUrl = directoryUrl;
    if (localConfigurators != null && localConfigurators.size() > 0) {
        for (Configurator configurator : localConfigurators) {
            this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
        }
    }
    // providers  重新整理服務清單
    refreshInvoker(invokerUrls);
}      

其中調用refershInvoker方法時,會去調用toInvokers把URl清單轉換為Invoker清單。

private void refreshInvoker(List<URL> invokerUrls) {
    if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
            && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        this.forbidden = true; // 禁止通路
        this.methodInvokerMap = null; // 置空清單
        destroyAllInvokers(); // 關閉所有Invoker
    } else {
        this.forbidden = false; // 允許通路
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) {
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
            this.cachedInvokerUrls = new HashSet<URL>();
            this.cachedInvokerUrls.addAll(invokerUrls);//緩存invokerUrls清單,便于交叉對比
        }
        if (invokerUrls.size() == 0) {
            return;
        }
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 将URL清單轉成Invoker清單
        Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 換方法名映射Invoker清單
        //省略一些代碼
        。。。。。。。
    }
}      

在将這些url轉換為invoker時,會使用DubboProtocol去建立與netty服務端的連接配接。

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
    if (urls == null || urls.size() == 0) {
        return newUrlInvokerMap;
    }
    Set<String> keys = new HashSet<String>();
    String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
    for (URL providerUrl : urls) {
       //封裝url資訊
        //省略一些代碼
        // 緩存key為沒有合并消費端參數的URL,不管消費端如何合并參數,如果服務端URL發生變化,則重新refer
        Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
        Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        if (invoker == null) { // 緩存中沒有,重新refer
            try {
                boolean enabled = true;
                if (url.hasParameter(Constants.DISABLED_KEY)) {
                    enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                } else {
                    enabled = url.getParameter(Constants.ENABLED_KEY, true);
                }
                if (enabled) {
                    //服務可以就使用DubboProtocol去建立連接配接,封裝DubboInvoker
                    invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
                }      

在DubboProtocol.refer裡,會去建立netty用戶端連接配接。

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}      

最後是傳回一個DubboInvoker對象,在這之前會調用getClients先去擷取或建立用戶端連接配接。是共享連接配接就擷取之前的加接,不是的話就建立新的連接配接。

private ExchangeClient initClient(URL url) {

    //設定一些參數,比如心跳機制等
    // client type setting.
    String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

    String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
    boolean compatible = (version != null && version.startsWith("1.0."));
    url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
    //預設開啟heartbeat
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

   //省略一些代碼

    ExchangeClient client;
    try {
        //設定連接配接應該是lazy的 
        if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
            client = new LazyConnectExchangeClient(url, requestHandler);
        } else {
            //跟服務釋出相對,具體調用HeaderExchanger.connect
            client = Exchangers.connect(url, requestHandler);
        }      

在HeaderExchanger裡,也是通過具體的NettyTransports連接配接去建立一個NettyClient,在doOpen方法裡,建立netty連接配接,熟悉netty的應該對這段代碼不陌生

protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    bootstrap = new ClientBootstrap(channelFactory);
    // config
    // @see org.jboss.netty.channel.socket.SocketChannelConfig
    bootstrap.setOption("keepAlive", true);
    bootstrap.setOption("tcpNoDelay", true);
    bootstrap.setOption("connectTimeoutMillis", getTimeout());
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            //使用适配,用DubboCodec去編碼解碼
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            //工作線程的處理handler
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
}      

在RegistryDirectory中,緩存了可用的服務清單Invoker,之後具體使用哪個服務去調用,就看選擇的負載均衡政策了。在 RegistryProtocol.doRefer裡,會去執行“cluster.join(directory)”,這裡的cluster看是使用哪種失敗政策。預設會調用FailoverCluster.join。在FailoverClusterInvoker.doInvoke裡,會去選擇具體要調用哪參機器上的哪個服務。

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyinvokers = invokers;
    checkInvokers(copyinvokers, invocation);
    int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //重試時,進行重新選擇,避免重試時invoker清單已發生變化.
        //注意:如果清單發生了變化,那麼invoked判斷會失效,因為invoker示例已經改變
        if (i > 0) {
            checkWhetherDestroyed();
            copyinvokers = list(invocation);
            //重新檢查一下
            checkInvokers(copyinvokers, invocation);
        }
        //使用loadbalance負載均衡選擇哪個服務invoker去調用
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            //調用執行調用鍊
            Result result = invoker.invoke(invocation);
            
            //省略一些代碼。。。。      

最後真正調用服務端的DubboProtocol.requestHandler.reply去處理,

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

    public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
        if (message instanceof Invocation) {
            Invocation inv = (Invocation) message;
            //從DubboExporter裡擷取Invoker
            Invoker<?> invoker = getInvoker(channel, inv);
            //如果是callback 需要處理高版本調用低版本的問題
            if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
               //callback的處理,省略。。。。
            }
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            //反射調用,真正用戶端方法調用的地方
            return invoker.invoke(inv);
        }      

處理結果再通過netty傳回到用戶端resutlt中。

到此,一個服務調用過程就基本完成了。與之前的dubbo 源碼學習筆記 (二) —— dubbo釋出服務的過程相對應,形成dubbo服務釋出調用的整個過程。這裡總結一下整體過程:

dubbo 源碼學習筆記 (三) —— dubbo引用服務的過程

dubbo工作原理分為服務釋出和服務引用兩個核心過程。

1、服務釋出的時候,DubboProtocol将調用鍊封裝為DubboExporter,放入到netty服務端工作線程池中

2、URL配置資訊注冊到zk注冊中心,并注冊override監聽,觸發訂閱。

3、服務引用時,也将服務引用的資訊封裝成URL并注冊到zk注冊中心,同時監聽category、providers、configurators、routers這四個目錄節點(才能感覺服務上下線的變化)。将這些資訊包裝成DubboInvoker用戶端的調用鍊,傳回代理。

4、用戶端使用代理進行調用時,經過負載均衡,選擇其中一個服務的URL,根據URl資訊與netty建立連接配接,發送Invocation到netty服務端;服務端在工作線程池中找一個線程,處理Invocation,并把RpcResult結果傳回給用戶端;用戶端接收解析RpcResult,擷取處理結果。

這樣整個過程就基本結束。了解這一整個過程,也就相當于了解了dubbo整體的原理。也就能從大的方向把握這一架構,當然,dubbo架構還有很多值得學習研究的地方。在之後的部落格中會繼續分析。