天天看點

eureka-client服務啟動

我們在看流程前看下下面的一個類圖,其中client的流程基本都在DiscoveryClient的構造方法中,而eureka-server的邏輯大部分在PeerAwareInstanceRegistryImpl和PeerEurekaNodes中。

eureka-client服務啟動

這裡再介紹下InstanceInfo和EurekClient的差別, 主要用于标示這個應用的執行個體資訊,如應用名稱、consumer(provider)等,而EurekClient就是一個用戶端的資訊,如位址、注冊資訊等

那麼eureka-client啟動之前,我們先理一下它應該需要做那些事情。

1、告訴eureka-server(注冊)。

2、拉取eureka-server上面的注冊資訊緩存到本地,實作高可用。

3、同步client的狀态給eureka-server(心跳等)。

一、首先介紹下client向eureka-server注冊的實作:

那麼我們先看eureka-client的注冊是怎麼實作的,并不是直接注冊的,而是通過背景線程是實作的,看下代碼:

//DiscoveryClient.java
 @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {
            //代碼省略...
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
        //代碼省略...
        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
        initScheduledTasks();
        //代碼省略...
    }           
private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            //省略代碼...
            // InstanceInfo replicator建立 應用執行個體資訊複制器
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

    //省略代碼...
           //開啟應用執行個體資訊複制器          
          instanceInfoReplicator.start(clientConfig.
            getInitialInstanceInfoReplicationIntervalSeconds());
        
    }           

在看下instanceInfoReplicator是如何實作注冊的,繼續看它的start方法:

public void start(int initialDelayMs) {
        //
        if (started.compareAndSet(false, true)) {
            instanceInfo.setIsDirty();  // for initial register
            //定時任務
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            //onDemandUpdate方法使用
            scheduledPeriodicRef.set(next);
        }
    }           

看它的run方法:

public void run() {
        try {
            //重新整理執行個體資訊
            discoveryClient.refreshInstanceInfo();
            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            //是否已經注冊過,
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            //schedule
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }           

到這裡終于看到register方法了,不過在register之前有個refreshInstanceInfo方法,是用于重新整理應用執行個體資訊,跟進代碼後我們發現了這些應用執行個體資訊的 

hostName

 、 

ipAddr等的

屬性的變化。因為IP變了後服務是無法知曉的,隻能這個定時任務是随時變化。

下面我們看register方法,代碼如下:

boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }           

無非就是調用 

AbstractJerseyEurekaHttpClient#register(...)

 方法,請求 Eureka-Server 的 

apps/${APP_NAME}

 接口,參數為 InstanceInfo ,實作注冊執行個體資訊的注冊。

補上一點,當refreshInstanceInfo方法刷到status變化時會被喚醒notify(),然後調用onDemandUpdate()告訴server修改對應的client資訊。

statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    //狀态變化,通過server
                    instanceInfoReplicator.onDemandUpdate();
                }
            };           
public boolean onDemandUpdate() {
        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
            if (!scheduler.isShutdown()) {
                scheduler.submit(new Runnable() {
                    @Override
                    public void run() {
                        Future latestPeriodic = scheduledPeriodicRef.get();
                        if (latestPeriodic != null && !latestPeriodic.isDone()) {
                            //取消任務
                            latestPeriodic.cancel(false);
                        }
                        //再次拉起
                        InstanceInfoReplicator.this.run();
                    }
                });
                return true;
            //代碼省略。。。
    }           

二、下面我們看下server端是如何接受這個注冊請求的,請求的路徑是:"apps/" + info.getAppName(),也就是apps/${APP_NAME}。

是在com.netflix.eureka.resources.ApplicationResource中

ApplicationResource#addInstance()
    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
        //省略驗證的代碼。。。
        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }           
@Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        //注冊isReplication用來區分是eureka-server節點同步還是直接注冊的
        super.register(info, leaseDuration, isReplication);
        //同步到對應的節點
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }           

最終進入了AbstractInstanceRegistry中的registry的Map中,

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<>();

不過其中使用了Lease(契約),非常nice(有興趣可以自行了解下~),心跳、下線後續會補充上來。

有什麼不對的地方還請幫忙指正!

繼續閱讀