天天看点

eureka-client服务启动

我们在看流程前看下下面的一个类图,其中client的流程基本都在DiscoveryClient的构造方法中,而eureka-server的逻辑大部分在PeerAwareInstanceRegistryImpl和PeerEurekaNodes中。

eureka-client服务启动

这里再介绍下InstanceInfo和EurekClient的区别, 主要用于标示这个应用的实例信息,如应用名称、consumer(provider)等,而EurekClient就是一个客户端的信息,如地址、注册信息等

那么eureka-client启动之前,我们先理一下它应该需要做那些事情。

1、告诉eureka-server(注册)。

2、拉取eureka-server上面的注册信息缓存到本地,实现高可用。

3、同步client的状态给eureka-server(心跳等)。

一、首先介绍下client向eureka-server注册的实现:

那么我们先看eureka-client的注册是怎么实现的,并不是直接注册的,而是通过后台线程是实现的,看下代码:

//DiscoveryClient.java
 @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {
            //代码省略...
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
        //代码省略...
        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
        initScheduledTasks();
        //代码省略...
    }           
private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            //省略代码...
            // InstanceInfo replicator创建 应用实例信息复制器
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

    //省略代码...
           //开启应用实例信息复制器          
          instanceInfoReplicator.start(clientConfig.
            getInitialInstanceInfoReplicationIntervalSeconds());
        
    }           

在看下instanceInfoReplicator是如何实现注册的,继续看它的start方法:

public void start(int initialDelayMs) {
        //
        if (started.compareAndSet(false, true)) {
            instanceInfo.setIsDirty();  // for initial register
            //定时任务
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            //onDemandUpdate方法使用
            scheduledPeriodicRef.set(next);
        }
    }           

看它的run方法:

public void run() {
        try {
            //刷新实例信息
            discoveryClient.refreshInstanceInfo();
            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            //是否已经注册过,
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            //schedule
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }           

到这里终于看到register方法了,不过在register之前有个refreshInstanceInfo方法,是用于刷新应用实例信息,跟进代码后我们发现了这些应用实例信息的 

hostName

 、 

ipAddr等的

属性的变化。因为IP变了后服务是无法知晓的,只能这个定时任务是随时变化。

下面我们看register方法,代码如下:

boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }           

无非就是调用 

AbstractJerseyEurekaHttpClient#register(...)

 方法,请求 Eureka-Server 的 

apps/${APP_NAME}

 接口,参数为 InstanceInfo ,实现注册实例信息的注册。

补上一点,当refreshInstanceInfo方法刷到status变化时会被唤醒notify(),然后调用onDemandUpdate()告诉server修改对应的client信息。

statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    //状态变化,通过server
                    instanceInfoReplicator.onDemandUpdate();
                }
            };           
public boolean onDemandUpdate() {
        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
            if (!scheduler.isShutdown()) {
                scheduler.submit(new Runnable() {
                    @Override
                    public void run() {
                        Future latestPeriodic = scheduledPeriodicRef.get();
                        if (latestPeriodic != null && !latestPeriodic.isDone()) {
                            //取消任务
                            latestPeriodic.cancel(false);
                        }
                        //再次拉起
                        InstanceInfoReplicator.this.run();
                    }
                });
                return true;
            //代码省略。。。
    }           

二、下面我们看下server端是如何接受这个注册请求的,请求的路径是:"apps/" + info.getAppName(),也就是apps/${APP_NAME}。

是在com.netflix.eureka.resources.ApplicationResource中

ApplicationResource#addInstance()
    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
        //省略验证的代码。。。
        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }           
@Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        //注册isReplication用来区分是eureka-server节点同步还是直接注册的
        super.register(info, leaseDuration, isReplication);
        //同步到对应的节点
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }           

最终进入了AbstractInstanceRegistry中的registry的Map中,

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<>();

不过其中使用了Lease(契约),非常nice(有兴趣可以自行了解下~),心跳、下线后续会补充上来。

有什么不对的地方还请帮忙指正!

继续阅读