天天看点

Dubbo服务发布端启动流程配置承载初始化远程服务发布(暴露)本地服务发布(暴露)

服务发布端启动流程

  • 配置承载初始化
  • 远程服务发布(暴露)
    • 1.ServiceConfig#export
    • 2.ServiceConfigBase#shouldDelay
    • 3.ServiceConfig#doExport
    • 4.ServiceConfig#doExportUrls(重要)
    • 5.ConfigValidationUtils#loadRegistries(待完善)
    • 6.doExportUrlsFor1Protocol(重要)
    • 7.RegistryProtocol#export(重要)
    • 8.RegistryProtocol#doLocalExport
    • 9.ProtocolFilterWrapper#export
    • 10.ProtocolListenerWrapper#export
    • 11.QosProtocolWrapper#export
    • 12.DubboProtocol#export(重要)
    • 13.DubboProtocol#openServer
    • 14.DubboProtocol#createServer
    • 15.Exchangers#bind
    • 16.HeaderExchanger#bind
    • 17.Transporters#bind
    • 18.NettyTransporter#bind
    • 19.NettyServer#doOpen
    • 20.RegistryProtocol#getRegistry
    • 21.ZookeeperRegistryFactory#getRegistry
    • 22.ZookeeperRegistryFactory#createRegistry
    • 23.FailbackRegistry#register(重要)
    • 小结
  • 本地服务发布(暴露)
    • 1.ServiceConfig#exportLocal
    • 2.InjvmProtocol#export

配置承载初始化

Dubbo框架会根据 优先级 对 配置信息 做 聚合处理,目前 默认覆盖策略 主要遵循以下几点规则:

  • -D传递给JVM参数优先级最高
  • 代码 或 XML配置优先级 次高
  • 配置文件优先级最低

Dubbo的配置 也会受到 provider的影响,这个属于 运行期 属性值影响,同样遵循以下几点规则:

  • 如果只有 provider段指定配置,则会 自动透传到客户端
  • 如果 客户端也配置了响应属性,则 服务端配置 会被覆盖

一般 不允许 透传的属性 都会在 ClusterUtils#mergeUrl中 进行 特殊处理

远程服务发布(暴露)

Dubbo服务发布端启动流程配置承载初始化远程服务发布(暴露)本地服务发布(暴露)

Dubbo框架 做服务发布(暴露) 分为两大部分

  1. 将 持有的 服务实例 通过代理 转换成 Invoker(1-6)
  2. 把Invoker 通过具体的协议 转换成 Exporter(7-结束)

Invoker 可以简单理解成 一个真实的 服务对象实例

Dubbo服务发布端启动流程配置承载初始化远程服务发布(暴露)本地服务发布(暴露)

1.ServiceConfig#export

@Override
    public synchronized void export() {
        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.initialize();
        }

        checkAndUpdateSubConfigs();

        initServiceMetadata(provider);
        serviceMetadata.setServiceType(getInterfaceClass());
        serviceMetadata.setTarget(getRef());
        serviceMetadata.generateServiceKey();

        if (!shouldExport()) {
            return;
        }

        if (shouldDelay()) {
            DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }

        exported();
    }

    private static final ScheduledExecutorService DELAY_EXPORT_EXECUTOR = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter", true));

           

如果没有设置延迟时间,则直接调用

doExport

方法发布服务

如果设置了延迟发布,则等时间过期后调用

doExport

方法来发布服务

2.ServiceConfigBase#shouldDelay

public boolean shouldDelay() {
        Integer delay = getDelay();
        return delay != null && delay > 0;
    }

    @Override
    public Integer getDelay() {
        return (delay == null && provider != null) ? provider.getDelay() : delay;
    }
           

是否有延迟是根据

provider.getDelay()

3.ServiceConfig#doExport

protected synchronized void doExport() {
   		// unexported应该是取消发布(暴露)
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        if (exported) {
            return;
        }
        exported = true;

        if (StringUtils.isEmpty(path)) {
            path = interfaceName;
        }
        doExportUrls();
        bootstrap.setReady(true);
    }

	/**
     * Whether the provider has been exported
     */
    private transient volatile boolean exported;

    /**
     * The flag whether a service has unexported ,if the method unexported is invoked, the value is true
     */
    private transient volatile boolean unexported;

    /**
     * The service name
     */
    protected String path;
    
    /**
     * The interface name of the exported service
     */
    protected String interfaceName;
           

4.ServiceConfig#doExportUrls(重要)

private void doExportUrls() {
        ServiceRepository repository = ApplicationModel.getServiceRepository();
        ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
        // TODO
        repository.registerProvider(
                getUniqueServiceName(),
                ref,
                serviceDescriptor,
                this,
                serviceMetadata
        );

		// 获取 当前服务 对应的 注册中心实例
		// TODO
        List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
       
        int protocolConfigNum = protocols.size();
        for (ProtocolConfig protocolConfig : protocols) {
        	// TODO
            String pathKey = URL.buildKey(getContextPath(protocolConfig)
                    .map(p -> p + "/" + path)
                    .orElse(path), group, version);
            // In case user specified path, register service one more time to map it to path.
            // TODO
            repository.registerService(pathKey, interfaceClass);
            doExportUrlsFor1Protocol(protocolConfig, registryURLs, protocolConfigNum);
        }
    }
           

5.ConfigValidationUtils#loadRegistries(待完善)

在Dubbo中,一个服务 可以被注册到 多个服务注册中心

6.doExportUrlsFor1Protocol(重要)

这个方法特别长

首先把参数封装为URL(在Dubbo里会把所有参数封装到一个URL里),然后 具体执行 服务导出

代码1解析MethodConfig对象 设置的 方法级别的配置 并保存到 参数map中

代码2用来判断调用类型,如果为泛型调用,则设置泛型类型(true、nativejava或bean方式)

代码5用来导出服务,Dubbo服务导出 分 本地导出 与 远程导出

本地导出使用了injvm协议,是一个伪协议,它不开启端口,不发起远程调用,只在JVM 内直接关联,但执行Dubbo的Filter链

在默认情况下,Dubbo同时支持本地导出与远程导出协议

可以通过ServiceConfig的setScope()方法设置,其中配置为none表示不导出服务,为remote表示只导出远程服务,为local表示只导出本地服务

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs, int protocolConfigNum) {
        String name = protocolConfig.getName();
        if (StringUtils.isEmpty(name)) {
            name = DUBBO;
        }

		// 1 解析MethodConfig数据
        Map<String, String> map = new HashMap<String, String>();
        map.put(SIDE_KEY, PROVIDER_SIDE);

        ServiceConfig.appendRuntimeParameters(map);
        // 读取其他配置信息到map,用后续构造URL
        AbstractConfig.appendParameters(map, getMetrics());
        AbstractConfig.appendParameters(map, getApplication());
        AbstractConfig.appendParameters(map, getModule());
        // remove 'default.' prefix for configs from ProviderConfig
        // appendParameters(map, provider, Constants.DEFAULT_KEY);
        AbstractConfig.appendParameters(map, provider);
        AbstractConfig.appendParameters(map, protocolConfig);
        AbstractConfig.appendParameters(map, this);
        MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
        if (metadataReportConfig != null && metadataReportConfig.isValid()) {
            map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
        }
        if (CollectionUtils.isNotEmpty(getMethods())) {
            for (MethodConfig method : getMethods()) {
                AbstractConfig.appendParameters(map, method, method.getName());
                String retryKey = method.getName() + RETRY_SUFFIX;
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if (FALSE_VALUE.equals(retryValue)) {
                        map.put(method.getName() + RETRIES_SUFFIX, ZERO_VALUE);
                    }
                }
                List<ArgumentConfig> arguments = method.getArguments();
                if (CollectionUtils.isNotEmpty(arguments)) {
                    for (ArgumentConfig argument : arguments) {
                        // convert argument type
                        if (argument.getType() != null && argument.getType().length() > 0) {
                            Method[] methods = interfaceClass.getMethods();
                            // visit all methods
                            if (methods.length > 0) {
                                for (int i = 0; i < methods.length; i++) {
                                    String methodName = methods[i].getName();
                                    // target the method, and get its signature
                                    if (methodName.equals(method.getName())) {
                                        Class<?>[] argtypes = methods[i].getParameterTypes();
                                        // one callback in the method
                                        if (argument.getIndex() != -1) {
                                            if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                                AbstractConfig
                                                        .appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                            } else {
                                                throw new IllegalArgumentException(
                                                        "Argument config error : the index attribute and type attribute not match :index :" +
                                                                argument.getIndex() + ", type:" + argument.getType());
                                            }
                                        } else {
                                            // multiple callbacks in the method
                                            for (int j = 0; j < argtypes.length; j++) {
                                                Class<?> argclazz = argtypes[j];
                                                if (argclazz.getName().equals(argument.getType())) {
                                                    AbstractConfig.appendParameters(map, argument, method.getName() + "." + j);
                                                    if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                        throw new IllegalArgumentException(
                                                                "Argument config error : the index attribute and type attribute not match :index :" +
                                                                        argument.getIndex() + ", type:" + argument.getType());
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        } else if (argument.getIndex() != -1) {
                            AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                        } else {
                            throw new IllegalArgumentException(
                                    "Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                        }

                    }
                }
            } // end of methods for
        }

		// 2 如果为泛型调用,设置 泛型类型
        if (ProtocolUtils.isGeneric(generic)) {
            map.put(GENERIC_KEY, generic);
            map.put(METHODS_KEY, ANY_VALUE);
        } else {
        	// 3 正常调用设置拼接URL的参数
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put(REVISION_KEY, revision);
            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }

        /**
         * Here the token value configured by the provider is used to assign the value to ServiceConfig#token
         */
        if (ConfigUtils.isEmpty(token) && provider != null) {
            token = provider.getToken();
        }

        if (!ConfigUtils.isEmpty(token)) {
            if (ConfigUtils.isDefault(token)) {
                map.put(TOKEN_KEY, UUID.randomUUID().toString());
            } else {
                map.put(TOKEN_KEY, token);
            }
        }
        //init serviceMetadata attachments
        serviceMetadata.getAttachments().putAll(map);

        // export service
        // 4 拼接URL对象
        String host = findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = findConfigedPorts(protocolConfig, name, map, protocolConfigNum);
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

        // You can customize Configurator to append extra parameters
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

		// 5 导出服务 本地服务,远程服务
        String scope = url.getParameter(SCOPE_KEY);
        // don't export when none is configured
        // 如果scope为SCOPE_NONE,则不导出服务
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            // 如果scope不是SCOPE_LOCAL,则 导出 本地服务
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
			// 如果scope不是SCOPE_LOCAL,则 导出 远程服务
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
            	// 如果有服务中心注册地址
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
                            url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true");
                        }

                        //if protocol is only injvm ,not register
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
                        if (monitorUrl != null) {
                        	// 如果配置了监控地址,则服务调用信息会上报
                            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            if (url.getParameter(REGISTER_KEY, true)) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " +
                                        registryURL);
                            } else {
                                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                            }
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }

						// 通过 动态代理 转换成Invoker
						// registryURL存储的是 注册中心地址
						// 使用export作为key 追加 服务(的)元数据信息
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass,
                                registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

						// 把Invoker转换成Exporter
                        Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    // 没有注册中心,直接发布(暴露)服务
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                    exporters.add(exporter);
                }
				
				// 元数据存储
                MetadataUtils.publishServiceDefinition(url);
            }
        }
        this.urls.add(url);
    }
           

如果使用MetadataReportConfig设置了元数据存储信息,则将元数据 保存到 指定配置中心

在Dubbo 2.7.0中 对 服务(的)元数据进行了改造,其把原来都保存到服务注册中心的元数据进行了分类存储,注册中心 将只用于 存储 关键服务信息,比如 服务提供者地址列表、完整的接口定义等

Dubbo 2.7.0使用更专业的配置中心,如Nacos、Apollo、Consul和Etcd等,提供更灵活、更丰富的配置规则,包括服务和应用不同粒度的配置、更丰富的路由规则和集中式管理的动态参数规则等

重点看这几行

Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, 
	registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = 
	new DelegateProviderMetaDataInvoker(invoker, this);

Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);

// PROXY_FACTORY
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

// PROTOCOL
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
           

ProxyFactory 和 Protocol 都是 扩展接口的 适配器类

执行代码

PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url)

实际上是首先执行扩展接口ProxyFactory 的 适配器类ProxyFactory$Adaptive 的

getInvoker()

方法

getInvoker()

方法内部 根据 URL里的 (参数)proxy的类型 选择 具体的 代理工厂,这里默认的proxy类型为javassist

所以又调用了JavassistProxyFactory的

getInvoker()

方法获取了代理类

Dubbo服务发布端启动流程配置承载初始化远程服务发布(暴露)本地服务发布(暴露)

getExtensionLoader

方法返回的是ExtensionLoader<T>

getAdaptiveExtension

方法 是 通过动态代理 生成 扩展接口ProxyFactory 的 适配器类ProxyFactory$Adaptive 的 实例对象

JavassistProxyFactory的

getInvoker()

方法的代码如下:

public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}
           

JavassistProxyFactory首先把 服务实现类 转换为 Wrapper 类,是为了减少反射的调用,这里返回的是 AbstractProxyInvoker对象,其内部重写

doInvoke()

方法,并委托给Wrapper 实现具体功能

到这里就完成了 服务提供方实现类 到 Invoker 的转换

在子类中实现invokeMethod方法,方法体内 会为每个ref方法 都做 方法名和方法参数匹配校验,匹配成功则直接可调用

对比看一下JdkProxyFactory

public class JdkProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                return method.invoke(proxy, arguments);
            }
        };
    }

}
           

JdkProxyFactory 是通过反射获取真实对象的方法,然后调用

JavassistProxyFactory 相比 JdkProxyFactory 减少了反射开销

另外,当执行

protocol.export(wrapperInvoker)

方法的时候,实际调用了Protocol 的适配器类 Protocol$Adaptive 的

export()

方法

如果为远程服务发布(暴露),则 其内部 根据 URL中Protocol的类型为registry,会选择Protocol的实现类RegistryProtocol

如果为本地服务发布(暴露),则 其内部 根据URL中Protocol的类型为injvm,会选择Protocol的实现类InjvmProtocol

发布的时候Protocol的类型在哪里确定的?

在ConfigValidationUtils.loadRegistries方法中的

extractRegistryType(url)

另外,上面说到的"URL中Protocol的类型"的URL,指的是invoker中的url

由于Dubbo SPI 的扩展点使用了Wrapper自动增强,这里就使用了ProtocolFilterWrapper、ProtocolListenerWrapper、QosProtocolWrapper 对其进行了增强,所以需要一层层调用才会调用到RegistryProtocol的

export()

方法

7.RegistryProtocol#export(重要)

Dubbo服务发布端启动流程配置承载初始化远程服务发布(暴露)本地服务发布(暴露)
@Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        URL providerUrl = getProviderUrl(originInvoker);

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        // export invoker
        // 重要!!!
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry
        // 创建注册中心实例,重要!!!
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

        // decide if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
        	// 注册 服务(的)元数据,重要!!!
            registry.register(registeredProviderUrl);
        }

        // register stated url on provider model
        registerStatedUrl(registryUrl, registeredProviderUrl, register);


        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        notifyExport(exporter);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }
           

注意registryUrl 、providerUrl、registeredProviderUrl几个变量值的不同

Dubbo服务发布端启动流程配置承载初始化远程服务发布(暴露)本地服务发布(暴露)

8.RegistryProtocol#doLocalExport

RegistryProtocol的

doLocalExport()

方法内 调用了 Protocol的适配器类Protocol$Adaptive,这里 URL内的 协议类型是dubbo,所以 返回的SPI扩展实现类 是 DubboProtocol,由于DubboProtocol也被Wrapper类增强了,所以也是一层层调用后,才执行到上面时序图中的步骤8,即调用DubboProtocol的

export()

方法

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);

        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }
           

9.ProtocolFilterWrapper#export

@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (UrlUtils.isRegistry(invoker.getUrl())) {
            return protocol.export(invoker);
        }
        // 先构造 拦截器链 然后触发Dubbo协议 暴露
        return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
    }
           

在触发 Dubbo协议 暴露(服务)之前 先对 服务Invoker 做了一层拦截器构建,在 加载所有拦截器时 会过滤 只对provider生效的数据

传入到

buildInvokerChain

方法中的参数是:

String SERVICE_FILTER_KEY = "service.filter";

String PROVIDER = "provider";

看一下

buildInvokerChain

方法

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        // 根据url + service.filter + provider 获得 扩展接口Filter的 多个 自动激活实现类 的 实例对象
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                last = new FilterNode<T>(invoker, last, filter);
            }
        }

        return last;
    }
           

获取 真实服务ref 对应的 Invoker 并 挂载到 整个拦截器链的尾部,然后 逐层 包裹 其他拦截器

10.ProtocolListenerWrapper#export

@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    	// 如果 URl 协议是 registry 则不处理,直接转换invoker
        if (UrlUtils.isRegistry(invoker.getUrl())) {
            return protocol.export(invoker);
        }
        // 否则对 Exporter 进行增强,这里是 获取 SPI接口-ExporterListener的实现类 作为入参
        return new ListenerExporterWrapper<T>(protocol.export(invoker),
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
    }
           

11.QosProtocolWrapper#export

QoS(Quality of Service,服务质量)指 一个网络 能够 利用 各种基础技术,为 指定的网络通信 提供 更好的服务能力,是网络的一种安全机制, 是用来 解决 网络延迟和阻塞等问题 的 一种技术

dubbo为用户提供类似的网络服务 用来online和offline service 来解决 网络延迟,阻塞等问题

@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (UrlUtils.isRegistry(invoker.getUrl())) {
        	// 启动Qos 服务,其中通过Cas 判断是否已经开启过,确保只启动一次
            startQosServer(invoker.getUrl());
            return protocol.export(invoker);
        }
        return protocol.export(invoker);
    }
           

可以通过

qos.enable

参数控制是否开启

看一下

startQosServer

方法

private void startQosServer(URL url) {
        try {
            if (!hasStarted.compareAndSet(false, true)) {
                return;
            }

            boolean qosEnable = url.getParameter(QOS_ENABLE, true);
            if (!qosEnable) {
                logger.info("qos won't be started because it is disabled. " +
                        "Please check dubbo.application.qos.enable is configured either in system property, " +
                        "dubbo.properties or XML/spring-boot configuration.");
                return;
            }

            String host = url.getParameter(QOS_HOST);
            int port = url.getParameter(QOS_PORT, QosConstants.DEFAULT_PORT);
            boolean acceptForeignIp = Boolean.parseBoolean(url.getParameter(ACCEPT_FOREIGN_IP, "false"));
            Server server = Server.getInstance();
            server.setHost(host);
            server.setPort(port);
            server.setAcceptForeignIp(acceptForeignIp);
            server.start();

        } catch (Throwable throwable) {
            logger.warn("Fail to start qos server: ", throwable);
        }
    }


public interface QosConstants {

    String QOS_ENABLE = "qos.enable";

    String QOS_HOST = "qos.host";

    String QOS_PORT = "qos.port";

    String ACCEPT_FOREIGN_IP = "qos.accept.foreign.ip";
}
           

9-11 阅读参考,上面时序图中的顺序并不是真是的顺序:

  • https://blog.csdn.net/qq_36882793/article/details/114970511
  • https://blog.csdn.net/luzhensmart/article/details/108548095

12.DubboProtocol#export(重要)

@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        // 根据 服务分组、版本、接口、端口 构造key
        String key = serviceKey(url);
        // 转换成 DubboExporter
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        // 保存到缓存exporterMap中
        exporterMap.addExportMap(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            }
        }

		// 服务初次发布(暴露) 会 创建 监听服务器
        openServer(url);
        optimizeSerialization(url);

        return exporter;
    }
           

将Invoker 转换为 DubboExporter 对象

并且 把DubboExporter 保存到了 缓存exporterMap里(在 服务提供方 处理请求时 会从exporterMap中获取出来)

然后执行上面时序图中步骤10的

openServer()

方法

13.DubboProtocol#openServer

先来看一下openServer及后续流程的时序图

Dubbo服务发布端启动流程配置承载初始化远程服务发布(暴露)本地服务发布(暴露)
private void openServer(URL url) {
        // find server.
        // 提供者机器的地址
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        // 只有 服务端 提供 才会 启动监听
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
            ProtocolServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }
           

首先是获取 当前机器地址信息(ip:port)并作为key,然后判断当前是否为服务提供端

如果是,则以此key 为key 查看缓存serverMap中 是否有 对应的Server,如果没有则调用

createServer

方法来创建,否则 返回 缓存中的value

由于每个机器的ip:port是唯一的,所以 多个不同服务 启动时 只有 第一个会被创建,后面的服务都是直接从缓存中返回的

14.DubboProtocol#createServer

private ProtocolServer createServer(URL url) {
        url = URLBuilder.from(url)
                // send readonly event when server closes, it's enabled by default
                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // enable heartbeat by default
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                .addParameter(CODEC_KEY, DubboCodec.NAME)
                .build();
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        ExchangeServer server;
        try {
       		// 创建NettyServer 并 初始化Handler
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }

        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }

        return new DubboProtocolServer(server);
    }
           

15.Exchangers#bind

Exchangers位于交换层,倒数第三层

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }
           

看一下

getExchanger

方法

public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
    }

    String EXCHANGER_KEY = "exchanger";
	String DEFAULT_EXCHANGER = "header";

    public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }
           

获取url中key=exchanger的参数值,作为Exchanger的类型,Exchanger默认类型为header

实际上就是通过

getExtension

方法获取 扩展接口Exchanger 的 实现类 HeaderExchanger

然后调用 HeaderExchanger#bind

16.HeaderExchanger#bind

@Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
           

17.Transporters#bind

Transporters位于传输层,倒数第二层

public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }
           

getTransporter方法如下,就是获取 扩展接口Transporter的 适配器类 Transporter$Adaptive

public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }
           

然后 Transporter$Adaptive 的bind 获取URL中的server和transporter两个参数值,确定使用哪一个Transporter的具体实现了,默认是NettyTransporter,所以,最终调用的是NettyTransporter#bind方法

18.NettyTransporter#bind

@Override
    public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyServer(url, handler);
    }
           

NettyServer 的构造函数如下:

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
    }


    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();

        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = ANYHOST_VALUE;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
        try {
        	// 重要!!!
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        executor = executorRepository.createExecutorIfAbsent(url);
    }

    public AbstractEndpoint(URL url, ChannelHandler handler) {
        super(url, handler);
        this.codec = getChannelCodec(url);
        this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
    }
           

19.NettyServer#doOpen

@Override
    protected void doOpen() throws Throwable {
    	// 创建ServerBootstrap
        bootstrap = new ServerBootstrap();

		// 设置Netty的boss线程池 和 worker线程池
        bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
        workerGroup = NettyEventLoopFactory.eventLoopGroup(
                getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                "NettyServerWorker");

		// 配置NettyServer,添加 handler 到 管线
        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);

        bootstrap.group(bossGroup, workerGroup)
                .channel(NettyEventLoopFactory.serverSocketChannelClass())
                .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_KEEPALIVE, keepalive)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        // 添加handler到 接收链接 的 管线
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                            ch.pipeline().addLast("negotiation",
                                    SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
                        }
                        // 重要!!!
                        ch.pipeline()
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                // 心跳检查
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        // 绑定本地端口,并启动监听服务
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }
           

至此,服务提供方的NettyServer已经启动了

这里需要注意是,将 NettyServerHandler 和 编解码Handler 注册到了 每个接收链接 的 通道(channel)管理(动词)的管线中

20.RegistryProtocol#getRegistry

回到7-RegistryProtocol#export方法,继续往下看,在doLocalExport之后,就是getRegistry 和 register两步

先看一下时序图

Dubbo服务发布端启动流程配置承载初始化远程服务发布(暴露)本地服务发布(暴露)

步骤2 用来获取 服务注册中心,其中RegistryFactory为扩展接口,所以这里 通过适配器类 来确定 RegistryFactory的扩展实现 为 ZookeeperRegistryFactory

然后后者(ZookeeperRegistryFactory)内部调用

createRegistry()

方法创建了一个ZookeeperRegistry作为ZooKeeper注册中心

protected Registry getRegistry(final Invoker<?> originInvoker) {
        URL registryUrl = getRegistryUrl(originInvoker);
        return getRegistry(registryUrl);
    }

    protected Registry getRegistry(URL url) {
        try {
            return registryFactory.getRegistry(url);
        } catch (Throwable t) {
            LOGGER.error(t.getMessage(), t);
            throw t;
        }
    }
           

RegistryFactory 也是一个 扩展接口,对应的适配器类 是 RegistryFactory$Adaptive,会根据URL中的protocol参数获取具体的扩展实现,默认是DubboRegistryFactory

然后执行的是DubboRegistryFactory#getRegistry

如果设置protocol参数值=zookeeper,则会执行ZookeeperRegistryFactory#getRegistry

21.ZookeeperRegistryFactory#getRegistry

ZookeeperRegistryFactory继承了AbstractRegistryFactory

ZookeeperRegistryFactory#getRegistry 其实执行的是 AbstractRegistryFactory#getRegistry

@Override
    public Registry getRegistry(URL url) {

        Registry defaultNopRegistry = getDefaultNopRegistryIfDestroyed();
        if (null != defaultNopRegistry) {
            return defaultNopRegistry;
        }

        url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(EXPORT_KEY, REFER_KEY)
                .build();
        String key = createRegistryCacheKey(url);
        // Lock the registry access process to ensure a single instance of the registry
        // 独占锁 保证 同时 只有一个线程实例 创建 服务注册实例
        LOCK.lock();
        try {
            // double check
            // fix https://github.com/apache/dubbo/issues/7265.
            defaultNopRegistry = getDefaultNopRegistryIfDestroyed();
            if (null != defaultNopRegistry) {
                return defaultNopRegistry;
            }

            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            //create registry by spi/ioc
            // 创建服务注册中心
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            // 释放锁
            LOCK.unlock();
        }
    }

    protected static final ReentrantLock LOCK = new ReentrantLock();
	protected static final Map<String, Registry> REGISTRIES = new HashMap<>();

           

22.ZookeeperRegistryFactory#createRegistry

@Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }
           

23.FailbackRegistry#register(重要)

RegistryProtocol#export中

registry.register

调用的是FailbackRegistry#register

FailbackRegistry#register中会调用

super.register(url);

,也就是AbstractRegistryFactory#register

AbstractRegistryFactory#register仅仅是将注册地址 添加到 注册中心列表中

然后会执行

doRegister

方法,对应到ZookeeperRegistry#doRegister,主要的操作就是zkClient.create,将服务的url创建到zk的节点上

和上面时序图中的7、8步骤略有不同

相关内容见下面小结链接

小结

ExchangeServer、Exchangers、Transport 相关内容请见

  • Dubbo-Exchanger
  • Dubbo-Transport
  • Dubbo-RemotingServer

ZooKeeper注册中心相关内容请见

  • Dubbo注册中心(二)-ZooKeeper
  • Dubbo注册中心(七)-AbstractRegistry及相关内容
  • Dubbo注册中心(五)-FailbackRegistry+重试机制

本地服务发布(暴露)

很多使用Dubbo框架的应用 可能存在 同一个JVM暴露了远程服务,同时 同一个JVM内部 又引用了自身服务的情况

Dubbo会默认 把远程服务 用injvm协议 再暴露一份,这样 消费方 直接消费同一JVM内部的服务,避免了 跨网络进行远程通信

1.ServiceConfig#exportLocal

private void exportLocal(URL url) {
        URL local = URLBuilder.from(url)
                .setProtocol(LOCAL_PROTOCOL)
                .setHost(LOCALHOST_VALUE)
                .setPort(0)
                .build();
        Exporter<?> exporter = PROTOCOL.export(
                PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
    }

    String LOCAL_PROTOCOL = "injvm";

    String LOCALHOST_VALUE = "127.0.0.1";
           

上面的

export

方法会走到InjvmProtocol#export

InjvmProtocol这个协议比较特殊,不会做端口打开操作,仅仅是 把服务 保存到 内存中

2.InjvmProtocol#export

export方法直接返回InjvmExporter实例对象

@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        String serviceKey = invoker.getUrl().getServiceKey();
        InjvmExporter<T> tInjvmExporter = new InjvmExporter<>(invoker, serviceKey, exporterMap);
        exporterMap.addExportMap(serviceKey, tInjvmExporter);
        return tInjvmExporter;
    }
           
InjvmExporter(Invoker<T> invoker, String key, DelegateExporterMap delegateExporterMap) {
        super(invoker);
        this.key = key;
        this.delegateExporterMap = delegateExporterMap;
    }