天天看点

dubbo源码学习四-服务注册以及服务提供者总结

文章参考官方文档以及丁威老师的文章。前面我们已经知道服务暴露分为本地服务暴露和远程服务暴露,同时远程服务暴露又分为:进行服务暴露、服务注册到注册中心、服务订阅。而服务暴露的过程中,其实是进行相关配置的完善,进行相关协议的适配,然后适配到服务器,从而进行createServer操作,而采用默认的dubbo协议时,走的就是NettyServer,进行bind操作之后,进入到NettyServer的创建包括两种线程池:boss和worker线程池,添加到ChannelFactory中.,同时创建PipeFactory工厂进行bind操作到Netty框架中创建服务。

RegistryProtocol#export

@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
   /**===============注册到注册中心**+==================**/
        // url to registry
        //获取Registry
        final Registry registry = getRegistry(originInvoker);
        //拿到url进行注册  注册服务提供者url
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
        // decide if we need to delay publish
        //延迟暴露的时候需要
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        //注册
        if (register) {
            //注册服务
            register(registryUrl, registeredProviderUrl);      
  }
        // Deprecated! Subscribe to override rules in 2.6.x or before.
        /**==================进行订阅操作===============**/
        //向注册中心进行订阅Overrider数据
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //设置注册服务提供者url、设置获取订阅服务url
        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        //notify暴露
        notifyExport(exporter);
        //Ensure that a new exporter instance is returned every time export
        //确保这是一个每次暴露的都是一个新的暴露服务实例
        //创建并返回 DestroyableExporter
        return new DestroyableExporter<>(exporter);
 }           

复制

服务注册:

1.拿到注册中心的url和注册服务提供url

2.是否进行延迟暴露

3.进行服务注册

4.拿到注册服务、注册url、注册服务提供者,而获取注册中心是最重要的

5.获取注册中心getRegistry(registryUrl),拿到服务提供者的相关信息

1)首先进行SPI适配,进行销毁操作校验,然后进行url构建,构建之后,创建注册中心缓存key

2)通过key去获取缓存,如果缓存不存在,则直接创建注册中心Regsitry实例

(1)创建注册中心,注册中心构造函数,而创建的过程其实是一个连接connect+监听的过程,然后启动注册中心

(2)可以看到注册中心zookeeper的实现底层用到了cas+线程池+监听+定时任务实现的

3)放入到缓存中,以便下次使用,创建注册中心实例采用了ReentrantLock来实现

6.完善服务提供者的相关信息

这里我们重点了解服务注册到注册中心:

RegistryProtocol#register

//注册服务 注册url、注册服务提供者url
public void register(URL registryUrl, URL registeredProviderUrl) {
    /**======================重要 getRegistry(registryUrl)===============**/
    //获取Registry注册中心  重要
    Registry registry = registryFactory.getRegistry(registryUrl);
    //注册服务
    registry.register(registeredProviderUrl);
    //服务提供者相关信息
    ProviderModel model = ApplicationModel.getProviderModel(registeredProviderUrl.getServiceKey());
    model.addStatedUrl(new ProviderModel.RegisterStatedURL(
            registeredProviderUrl,
            registryUrl,
            true
    ));
}           

复制

RegistryFactory#getRegistry(URL url)

//SPI 默认dubbo
@SPI("dubbo")
public interface RegistryFactory {
    //默认dubboProtocol协议,进入AbstractRegistryFactory
    @Adaptive({"protocol"})
    Registry getRegistry(URL url);

}           

复制

AbstractRegistryFactory#getRegistry

/**-=================zookeeper注册中心 ==================**/
//进行zookeeper注册中心进行分析
@Override
public Registry getRegistry(URL url) {
    //如果已经进行销毁,则进行警告,说服务已经被销毁,返回
    if (destroyed.get()) {
        LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
                "Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
        return DEFAULT_NOP_REGISTRY;
    }
    //url信息构建
    url = URLBuilder.from(url)
            .setPath(RegistryService.class.getName())
            .addParameter(INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(EXPORT_KEY, REFER_KEY)
            .build();
    //创建注册中心缓存key
    String key = createRegistryCacheKey(url);
    // Lock the registry access process to ensure a single instance of the registry
    LOCK.lock();
    try {
        //访问缓存
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        //create registry by spi/ioc
        //缓存未命中,创建Registry实例  重要
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        //写入缓存中
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // Release the lock
        LOCK.unlock();
    }
}           

复制

ZookeeperRegistryFactory#createRegistry

//创建注册中心
@Override
public Registry createRegistry(URL url) {
    //创建ZookeeperRegistry
    return new ZookeeperRegistry(url, zookeeperTransporter);
}           

复制

ZookeeperRegistry#ZookeeperRegistry构造函数

//注册中心构造函数
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    //调用FailbackRegistry的构造方法,而FailbackRegistry
    //又调用AbstractRegistry,可以看到底层结构包含时间轮HashWheelTimer
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    //获取组名,默认为dubbo
    String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(PATH_SEPARATOR)) {
        //group="/"+group
        group = PATH_SEPARATOR + group;
    }
    this.root = group;
    //创建zookeeper客户端,默认为CuratorZookeeperTransporter,重点
    zkClient = zookeeperTransporter.connect(url);
    //添加状态监听器
    zkClient.addStateListener((state) -> {
        if (state == StateListener.RECONNECTED) {
            logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
                    " Since ephemeral ZNode will not get deleted for a connection lose, " +
                    "there's no need to re-register url of this instance.");
            ZookeeperRegistry.this.fetchLatestAddresses();
        } else if (state == StateListener.NEW_SESSION_CREATED) {
            logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
            try {
                ZookeeperRegistry.this.recover();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        } else if (state == StateListener.SESSION_LOST) {
            logger.warn("Url of this instance will be deleted from registry soon. " +
                    "Dubbo client will try to re-register once a new session is created.");
        } else if (state == StateListener.SUSPENDED) {

        } else if (state == StateListener.CONNECTED) {

        }
    });
}           

复制

ZookeeperTransporter#connect

//默认curator
@SPI("curator")
public interface ZookeeperTransporter {
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    ZookeeperClient connect(URL url);
}           

复制

AbstractZookeeperTransporter#connect

//适配zookeeper,进行connect
//这个方法用于创建zookeeper客户端,创建好zookeeper客户端,
// 意味着注册中心的创建过程就结束了。
@Override
public ZookeeperClient connect(URL url) {
    ZookeeperClient zookeeperClient;
    // address format: {[username:password@]address}
    //地址格式化:地址列表 {[username:password@]address}
    List<String> addressList = getURLBackupAddress(url);
    // The field define the zookeeper server , including protocol, host, port, username, password
    //字段定义zookeeper服务器,包含协议、host、端口、用户名、密码
    if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
        logger.info("find valid zookeeper client from the cache for address: " + url);
        return zookeeperClient;
    }
    // avoid creating too many connections, so add lock
    synchronized (zookeeperClientMap) {
        if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
            logger.info("find valid zookeeper client from the cache for address: " + url);
            return zookeeperClient;
        }
        //重点关注  创建zookeeper客户端
        zookeeperClient = createZookeeperClient(url);
        logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
        writeToClientMap(addressList, zookeeperClient);
    }
    return zookeeperClient;
}           

复制

AbstractZookeeperTransporter#createZookeeperClient

//创建zookeeper客户端
protected abstract ZookeeperClient createZookeeperClient(URL url);           

复制

CuratorZookeeperTransporter#createZookeeperClient

//创建zookeeper客户端
@Override
public ZookeeperClient createZookeeperClient(URL url) {
    return new CuratorZookeeperClient(url);
}           

复制

CuratorZookeeperClient# CuratorZookeeperClient

//创建CuratorZookeeper客户端
public CuratorZookeeperClient(URL url) {
    //url信息 注册中心url
    super(url);
    //创建CuratorFramework构造器
    try {
        int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS);
        int sessionExpireMs = url.getParameter(ZK_SESSION_EXPIRE_KEY, DEFAULT_SESSION_TIMEOUT_MS);
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                .connectString(url.getBackupAddress())
                .retryPolicy(new RetryNTimes(1, 1000))
                .connectionTimeoutMs(timeout)
                .sessionTimeoutMs(sessionExpireMs);
        String authority = url.getAuthority();
        if (authority != null && authority.length() > 0) {
            builder = builder.authorization("digest", authority.getBytes());
        }
        //构建curatorFramework实例
        client = builder.build();
        //添加监听器
        client.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(url));
        //启动客户端
        client.start();
        //阻塞直到连接成功
        boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
        if (!connected) {
            throw new IllegalStateException("zookeeper not connected");
        }
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}           

复制

CuratorFrameworkImpl#start:

public void start() {
    this.log.info("Starting");
    //进行状态判断,如果启动则抛异常
    if (!this.state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED)) {
        throw new IllegalStateException("Cannot be started more than once");
    } else {
        try {
            //连接状态管理启动
            this.connectionStateManager.start();
            //创建连接状态监听对象
            ConnectionStateListener listener = new ConnectionStateListener() {
                //状态改变
                public void stateChanged(CuratorFramework client, ConnectionState newState) {
                    //已经连接或者重连状态,设置为错误连接
                    if (ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState) {
                        CuratorFrameworkImpl.this.logAsErrorConnectionErrors.set(true);
                    }

                }
            };
            //添加监听
            this.getConnectionStateListenable().addListener(listener);
            //启动客户端
            this.client.start();
            //定时任务
            this.executorService = Executors.newSingleThreadScheduledExecutor(this.threadFactory);
            //进行任务操作
            this.executorService.submit(new Callable<Object>() {
                public Object call() throws Exception {
                    CuratorFrameworkImpl.this.backgroundOperationsLoop();
                    return null;
                }
            });
            if (this.ensembleTracker != null) {
                this.ensembleTracker.start();
            }

            this.log.info(this.schemaSet.toDocumentation());
        } catch (Exception var2) {
            ThreadUtils.checkInterrupted(var2);
            this.handleBackgroundOperationException((OperationAndData)null, var2);
        }

    }
}           

复制

下面我们来总结一下服务提供者的相关操作:

dubbo服务注册的原理:

首先在ServiceConfig中可以看到export方法同步方法,进行服务暴露操作,其中:首先进行延迟暴露操作校验,如果要进行延迟暴露操作,则会进行延迟定时任务,否则进行服务暴露。

进行服务提供暴露:首先进行url的组装,组装好url之后,进行服务的暴露:可以分为本地服务暴露Injvm、远程暴露以及直连操作。

本地服务暴露:进行适配在InjvmProtocol进行服务暴露。

远程服务暴露:首先会进行RegistryProtocol中我们可以看到服务暴露、服务注册、服务订阅、服务notify等操作,而服务暴露可进入DubboProtocol中,看到其进行暴露的操作:export操作,完善服务暴露信息,进行服务启动,openServer,进行服务器的启动,首先进行双重校验,首先进行缓存查看,如果没有,则进行createServer操作,进行适配,经过一系列的bind操作,如果采用默认的服务器,Netty,会进行服务的启动,在NettyServer中,我们可以看到创建服务的过程:两种线程池:boss、worker以及PipelineFactory的创建。

然后将服务信息注册到注册中心:

如果以zookeeper为注册中心,进入RegistryProtocol的export方法中的register(regsitryUrl,registryProviderUrl)方法,获取注册中心实例,进行服务注册。获取服务实例和创建服务器一样,都会先到缓存中去取,如果没有,则进行创建注册中心createRegistry,以zookeeper为注册中心为例,重点关注ZookeepTransporter的connect方法,其底层是基于CuratorFrameworkImpl的start方法实现,其会进行状态判断,如果启动,则抛异常,如果没有启动,则进行连接状态启动、连接状态监听对象的创建、添加监听,启动客户端,启动定时任务,进行loop操作。

利用zookeeper的可视化工具,我们可以看到树状节点信息:服务提供者的url信息以及配置信息。首先我们可以通过源码知道其会进行缓存,因此,如果zookeeper宕机了,不会影响其原来的服务调用,而dubbo Admin中是看不到服务信息的。