天天看點

大衆點評排程系統Kepler的設計與實作

.

5.2.1.1排程系統Kepler整體設計

該企業排程系統的整體實作思路:

  1. 用SpringQuartz來負責定時任務這一塊,用定時任務來周期性更新執行個體和容器的狀态,即用定時任務做排程系統的引擎。
  2. 用狀态管理器管理執行個體,資料總管控制資源,容器管理器管理容器。它們都用單例模式實作,并持有執行個體,容器,資源在記憶體中的隊列。
  3. 在記憶體和資料庫同時儲存執行個體資訊,記憶體中采用多級狀态隊列的形式。在記憶體中有資料是為了排程的速度和及時性,在資料庫中存副本是為了容災恢複現場。
  4. 執行機上容器的管理用遠端DockerClient API進行。定時任務也會使用這些API中,來定期更新記憶體容器隊列。
  5. Docker容器和執行個體關系的映射存儲到資料庫中來建立兩者的聯系。
  6. 針對不同任務,制作不同Docker鏡像,把鏡像放在本地倉庫中供執行機拉取。

5.2.1.2排程系統Kepler具體設計與實作

首先分析Init定時任務和Ready定時任務。下面(圖5.1)是兩者的流程設計。

大衆點評排程系統Kepler的設計與實作

圖ETL資料傳輸平台設計與實作.19 Init和Ready定時任務流程設計

在Quartz的使用方式上[20],我們把Quartz和Spring內建在一起。在Spring配置檔案中配置InitExecutor以及ReadyExecutor的jobDetail(就是定時任務運作的類和方法)和Trigger(就是Cron表達式)。在容器啟動過程中,Quartz會在配置檔案中查找配置為jobDetail的類,找到它對應的Trigger,建立定時任務并按Cron表達式對應的時間觸發[21]。這樣spring就會幫我們把這兩個定時任務每十分鐘排程起來。

InitExecutor的定時方法是doExecutor()方法,它會調用InitInstances()方法,首先通過TaskService類擷取所有生效任務,再調用InitInstance(Datebegin,Dateend)逐一檢測在這個時間段内觸發的任務,并生成這些任務的執行個體。如果這些執行個體的任務存在工作流上的前驅,則設定執行個體的前驅關系。然後調用狀态管理器(StatusManager)的createInitInstance(InstanceDOinstanceDO)方法,把執行個體狀态為Init的任務加入狀态管理器的InitQueue中,并用InstanceService類的saveInstance(InstanceDOinstance)方法插入資料庫的執行個體表中。

ReadyExecutor配置的定時方法同樣是doExecutor()方法,這是因為他們都繼承自同一個抽象類BaseExecutor,實作了它的抽象方法doExecutor(),在Spring中進行了統一配置。doExecutor()方法中又進一步調用了InitReady()方法,首先周遊InitQueue中的所有執行個體,判斷執行個體的前驅執行個體是否準備好,如果準備好,再判斷目前執行個體的并發數是否超過允許值(一般執行個體不允許并發是以并發數預設為1),如果這兩項都滿足,則把執行個體先暫存到一個List局部變量中。剩下則是用StatusManager的acquireResource(InstanceDOinstanceDO)方法先嘗試擷取任務的一些公共資源,如hive,Mysql的并發數,如果有一項擷取不到,則放棄配置設定(這樣做是為了防止死鎖),否則一次性擷取所有資源,相應的,在StatusManager中增加資源的用量。如果這一切都沒問題的話,就把執行個體的狀态改為Ready,用InstanceService的updateInstance(InstanceDOinstanceDO)更新資料庫中任務狀态,并把這些執行個體從StatusManager的InitQueue移動到ReadyQueue。

上面有涉及到兩個資料庫服務,InstanceService和TaskService,這分别是任務類的資料庫服務和執行個體類的資料庫服務,在接下來的介紹中,我們還會涉及。這兩個服務并不是排程系統本身提供的服務,而是通過服務調用中間件內建到本地spring中的遠端RPC調用。

其次,我們介紹比較複雜的Running程序(時序圖為圖5.2)。定時任務方法execute()方法調用doExecute()方法,後者調用ContainerExecuter類的doExecuteHeartBeat()方法從Zookeeper上擷取所有存活的執行機機器,然後更新ContainerExecuter類中的存活的主機清單。然後調用ContainerService的getContainerByHost(Stringhost)方法擷取此主機上所有的容器。然後調用DockerClient的inspect指令檢查Docker容器的狀态,然後用InstanceService的updateInstance(InstanceDOinstanceDO)方法更新容器對應的任務執行個體的狀态,如果此容器是運作狀态,則把它加入到RunningContainer清單。如果不在運作,則檢查運作結果是成功還是失敗。并相應更新RunningContainer清單。最後還要還要調用DockerClient的remove指令删除掉這個容器。同時還要更新對應的執行個體中的資料庫狀态,此外還要把運作完成或異常退出的執行個體從StatusManager的RunningQueue中去掉。這樣,執行機狀态和容器狀态就得以實時更新。

大衆點評排程系統Kepler的設計與實作

圖ETL資料傳輸平台設計與實作.20 Running定時任務流程設計一

接下來讨論RunningExecutor對容器的管理(見圖5.3)。同樣是繼承自父類的execute()方法調用RunningExecutor裡的doExecute()方法,在這個方法裡會具體調用getReadyInstances()方法,傳回StatusManager裡的readyQueue,這是所有狀态為Ready的執行個體。然後周遊所有執行個體,為每個執行個體調用acquireContainerForTask(InstanceDOinstanceDO)方法配置設定執行機,具體的配置設定的流程是,若某執行機沒有運作的容器,則優先配置設定到此執行機上,否則,選擇還有槽位(對應一定的cpu、記憶體和帶寬資源,就是一個容器所需标準資源)且運作容器最少的機器作為目标執行機。然後再周遊執行機上所有用于Docker容器的端口,去掉其中被占用的端口選擇序号最小的那一個端口。最後把這些參數都包

裝起來成ContainerDO傳回。然後調用ContainerExecutor的runInstance(…)方法,

後者調用Dockerservice(這個類是為了管理Docker容器的持久化工作)的runInstance(…)方法,這個方法中會調用ImageService方法查詢任務對應的image(這個在建立任務的時候就已經插入了相關資訊)。查詢完之後,調用DockerInstanceService(這個是封裝了DockerClientAPI本地服務)的createContainer(…)的方法在執行host上建立容器,然後再調用startContainer(…)

大衆點評排程系統Kepler的設計與實作

圖ETL資料傳輸平台設計與實作.21 Running定時任務流程設計二

方法啟動容器,同時還要調用ContainerService的saveContainer()方法将容器儲存

在資料庫中,同時還要把這個已啟動的container加入記憶體的RunningContainers隊列中。這樣之後,傳回到RunningExecutor中,再調用StatusManager的createRunningInstance(InstanceDOinstanceDO)方法,用InstanceService更新執行個體在資料庫中的狀态,并且把執行個體加入記憶體中的RunningQueue中,這樣排程系統的整個架構系統就已經介紹完畢。

下面介紹排程系統的三個管理類:StatusManager(狀态管理器),ResourceManager(資料總管),ContainerScheduler(容器排程器)。

大衆點評排程系統Kepler的設計與實作

圖ETL資料傳輸平台設計與實作.22 StatusMananger類的設計

StatusManager管理排程系統所有的執行個體,所有記憶體中的執行個體都是資料庫中資料的副本(如圖5.4)。StatusManager成員變量如下:

  1. ContainnerService類執行個體containnerService。這個成員變量主要是把容器相關資訊,特别是運作的容器和執行個體的映射關系。
  2. InstanceService類執行個體instanceService。這個成員變量主要是負責執行個體資訊資料持久化。
  3. ResourceManager類執行個體resourceManager。這個成員變量是排程系統的資料總管的單例執行個體,這個類下文會詳細介紹,主要負責内部共享資源和外部共享資源的管理。
  4. Map<String,instanceDO>類的執行個體initQueue。這個成員變量主要是負責存儲到達觸發時間的任務被排程系統初始化的執行個體。Map類的鍵值就是instanceDO執行個體的id。
  5. Map<String,instanceDO>類的執行個體calcReadyQueue。這個成員變量主要是存儲狀态為Ready的計算任務執行個體,各個狀态隊列之間經常出現執行個體的轉移。Map類的鍵值就是instanceDO執行個體的id。
  6. Map<String,instanceDO>類的執行個體transferReadyQueue。這個成員變量主要是存儲狀态為Ready的傳輸任務執行個體,之是以區分計算任務執行個體和傳輸任務執行個體主要是因為兩者各運作在專用的兩組執行機上,内部運作機制也有不同,對應的Docker容器的image也有所不同。是以申請資源等操作有所不同,必須差別對待。
  7. Map<String,instanceDO>類的執行個體runningQueue。這個成員變量主要是存儲狀态為Running的計算任務執行個體。與上面相同,Map類的鍵值就是instanceDO執行個體的id。兩種不同類型的readyQueue執行個體會轉移到同一個runningQueue中。
  8. Map<String,instanceDO>類的執行個體timeoutQueue。這個成員變量主要是存儲狀态為Timeout的計算任務執行個體。與上面相同,Map類的鍵值就是instanceDO執行個體的ID。執行個體之是以會出現在Timeout隊列中是因為它的運作時間超過了它對應的任務設定的逾時時間進而從Running隊列中移出的執行個體,運作時間大大超過正常時間的執行個體往往是一種異常現象,需要任務的開發人員或ETL平台的管理者檢視日志進行排錯。

StatusMananger的成員變量已經講述完畢。下面深入分析StatusManger的成員方法。

  1. loadInstancesInfo():void方法。該方法前有@PostConstruct注釋,這說明本方法是statusManger對象的生命周期方法,在statusManager被容器建立後,将調用此方法,主要是為了恢複現場,即從資料庫中讀取狀态為Init,Ready,Running和Timeout的執行個體,分别加入到各狀态對應的隊列中。這方法一般是在排程系統新的版本上線後,系統重新開機中,利用資料庫中的資料快速建構記憶體中的四級隊列。
  2. addReadyQueue(InstanceDOinstanceDO):void方法。該方法是把參數中的執行個體從InitQueue中移出并根據執行個體對應的任務類型插入到計算任務Ready隊列或者傳輸任務Ready隊列中。本方法是個工具方法,主要是内部調用,内部調用它的方法是loadInstancesInfo方法和createReadyInstance方法。
  3. calcResourceUsage(InstanceDOinstanceDO):void方法。本方法主要是把目前執行個體依賴的資源加入到ResourceManager的資源池中進行管控。本方法也是一個内部工具方法,調用時機是loadInstancesInfo方法重建記憶體執行個體狀态四級隊列時。
  4. isInstanceInited(StringinstanceId):boolean方法。本方法功能比較簡單,就是判斷指定的instanceId的執行個體是否在InitQueue中。本方法也是一個内部工具方法,調用它的方法是createInitInstance方法。
  5. createReadyInstance(InstanceDOinstanceDO):boolean方法。此方法就是調用addReadyQueue方法完成執行個體在Init隊列和Ready隊列之間的移動并更新資料庫中執行個體的狀态。在調用addReadyQueue方法之前,必須檢查這個執行個體是否在已經在ReadyQueue中了,在則傳回false,無需往下進行。不在才調用addReadyQueue方法。
  6. createRunningInstance(InstanceDOinstanceDO):boolean方法。此方法主要是調用addRunningQueue方法完成執行個體在readyQueue和runningQueue之間的移動。在此之前判斷此執行個體是否已在ReadyQueue中以免重複移動,最後持久化執行個體的狀态為Running。
  7. addRunningQueue(InstanceDOinstanceDO): void方法。此方法功能在上一個方法中已經介紹。
  8. createInitInstance(InstanceDOinstanceDO):void方法。此方法把執行個體加入Init隊列,并把建立執行個體的插入資料庫中。

ResourceManger是排程系統的内部和外部資料總管,這裡所有資源都以Slot為機關。對于不同資源來說,一個Slot代表的意義并不同。對于一台執行機來說,一個Slot可能代表2g記憶體,2個Vcpu,100Mbps的帶寬。對于Hive公共資源來說,一個Slot代表可以并發通路的一個并發數。内部資源就是任務執行機資源,外部資源就是Hive,Mysql等公共共享資源。

ResourceManager類(如圖5.5)比較簡單,它的成員變量有:

  1. KeplerLionInfo類的keplerLionInfo執行個體。該成員變量是為了持有存在一個叫Lion的,搭建在Zookeeper叢集之上的配置中心的各項有關排程系統的配置。顯而易見,它存儲一定資訊,在static代碼中加載Lion中關于Kepler排程系統的配置。
大衆點評排程系統Kepler的設計與實作

圖ETL資料傳輸平台設計與實作.23 ResourceMananger類的設計

  1. ConcurrentHashMap<String,Integer>類的currentResourceUsage執行個體。這個是用來存儲各種内外部資源的已使用的Slot個數的,每種資源的最大資源使用量放在Lion資源配置中心。這個Map類的鍵是資源的名字,而值是資源的Slot個數。而之是以用ConcurrentHashMap是因為資源種類較多,可以用此類的分段鎖來提高并發性能。

下面介紹ResourceManager的成員函數。

  1. addResourceUsage(int,String):boolean方法。這個方法的用途是添加指定資源名字的Slot數到currentResourceUsage中。
  2. acquireResource(InstanceDOinstanceDO):boolean方法。這個方法主要内容是調用tryAcquireResource和doAcquireResource來擷取資源,前者是為了避免死鎖檢視所有資源是否能夠一次性擷取到,後者是真正的擷取資源。
  3. tryAcquireResource(InstanceDOinstanceDO):boolean方法。這個方法是檢查是否一次性擷取執行個體的資源會死鎖。首先擷取執行個體依賴的資源清單。然後逐一檢查目前已經使用的資源數加上欲配置設定的資源數是否超過資源的最大限度。如果超過,則傳回false,提示可能造成死鎖。
  4. doAcquireResource(InstanceDOinstanceDO):boolean方法。這個方法與tryAcquireResource方法有類似的流程,但不同的是,此方法會真正的配置設定資源,并調用addResourceUsage方法把資源加入到concurrentUsageMap中。
  5. releaseResource(InstanceDOinstanceDO):boolean方法。這方法和前面的acquireResource方法相反。是逐個釋放執行個體所擁有的資源,即減少concurrentUsageMap該資源的用量。
大衆點評排程系統Kepler的設計與實作

圖ETL資料傳輸平台設計與實作.24 ContainerScheduler類的設計

ContainerScheduler是執行機上Docker容器的管理類(如圖5.6),裡面很多方法都可遠端改變容器狀态。是Docker容器和執行個體的橋梁。ContainerScheduler的成員變量有:

  1. ConcurrentHashMap<String,Set<ContainerDO>>類執行個體runningContainers。這個成員變量的鍵值是執行機的host,值是這個執行機host上運作的容器集合。
  2. Set<String>類執行個體aliveTransferHost。這個成員變量就是存儲存活的傳輸任務執行機的集合。
  3. Set<String>類執行個體aliveCalcHost。這個成員變量就是存儲存活的計算任務執行機的集合。
  4. Set<String>類執行個體deadHost。這個成員變量就是存儲下線或當機的執行機。
  5. ConcurrentHashMap<String,Set<ContainerDO>>類的執行個體heartbeats。這個成員變量就是存儲定時心跳擷取的各個執行機的心跳情況,比如上一次正常心跳的時間等,排程系統通過距上一次正常心跳的時間差來判斷此執行機是否仍然正常執行。
  6. KeplerLionConfs類的執行個體keplerLionConfs。正如上面介紹,這個執行個體隻是儲存存在Lion配置中心的配置。
  7. DockerInstanceService類的執行個體dockerInstanceService。這個執行個體是封裝Docker遠端API。封裝的指令包括create、start、stop、remove,inspect等。
  8. ContainerService類的執行個體containerService。這個是用來儲存Docker容器相關資訊到資料庫中的持久化操作。最主要是存儲容器和執行個體的映射。
  9. InstanceService類的執行個體instanceService。這個成員變量主要是管理執行個體持久化操作。

ContainerSheduler類的成員方法很多,這裡我們隻介紹主要的幾個方法:

  1. LoadContainerInfo():void方法。此方法主要是為了恢複記憶體中的runningContainers隊列,這裡是從資料庫中載入資料,具體容器的狀态還需要調用checkInstanceStatus方法來檢查容器狀态。
  2. checkContainerStatus(Containercontainer):void方法。此方法是為了檢查每一個容器的狀态,方法的核心是Docker的inspect指令(通過DockerInstanceService封裝的方法)去檢查ContainerId對應的容器狀态。
  3. executeContainerHeartbeat():void方法。此方法另外啟動一個線程去zookeeper叢集裡擷取所有執行機存活的資訊,并把擷取的每一個執行機的心跳資訊儲存在HeartBeatDO中,更新aliveTransferHost、aliveCalcHost和deadHost隊列,維護記憶體中存活機器的清單。
  4. acquireContainerForTask(InstanceDOinstanceDO):Container方法。此方法主要是為執行個體的DockerContainer選擇執行機,選擇的邏輯的是周遊所有的執行機,擷取指定執行機上的所有運作的Container清單,如果該Host上沒有Container運作,則優先選擇此Host。否則把所有還有空的槽位(一個槽位是一個容器運作所需資源)的執行機加入一個隊列中,選擇運作Container最少的機器。此外,還要調用getContainerPort方法來擷取執行機上一個未占用的端口用來映射Docker容器的SSH服務。最後把所有的容器資訊封裝成一個ContainerDO類傳回。
  5. runInstance(ContainerDOcontainerDO,InstanceDOinstanceDO)方法,此方法調用containerService的同名方法,最後是調用DockerInstanceService的createContainer和startContainer方法(這兩個方法封裝的就是DockerClient的create和start方法)。當然在建立之前要用imageService的queryImage方法來擷取此執行個體對應的容器鏡像。
  6. getContainerPort(Set<Container>containers)方法。此方法是為了要啟動的容器擷取一個要映射到主機的端口,這就要檢查該主機上所有容器已經映射的端口,再從容器專用端口中把它們去掉,選擇其中一個端口最小的傳回。