概述
最近回顧了一下之前針對Heron的Blog内容,發現大部分都是在對Heron進行使用和實驗部分的内容。對其理論方面的分析比較少。而Heron作為很新的一代流式計算平台,在目前國内Flink的熱潮中,還是顯得很冷清。
從Spark streaming的模拟流,到storm,flink實作真正的流處理,再到Twitter自我革命開源Heron。作為在國内較早接觸Heron的一撥人(Heron2015年開源,自己2017年開始進行了實驗環境的搭建和相關的研究,當時國内連環境搭建的文章都查不到,目前除了自己的好像也沒見到,如果有人見到可以告訴我一下),也開始發現除了之前Heron的官方論文和文章,也多多少少多了一些關注。從在國内找不到資料,到目前CSDN中出現了一些文章,自己也成了一個見證者。是以後面計劃就理論和源碼層面對Heron寫一些文章,來對這個開源系統多多少少做一些能做的事情。下面就開始這次的這部分内容吧。
Heron CLI(command line interface)中提供了如下可用的Topology管理指令,在scheduler相關的源碼下可以看到相關的枚舉對象。
/***
* This enum defines commands invoked from heron client
*/
public enum Command {
// TODO(mfu): Move ACTIVATE & DEACTIVATE out? They are non-related to Scheduling
SUBMIT,
KILL,
ACTIVATE,
DEACTIVATE,
UPDATE,
RESTART;
public static Command makeCommand(String commandString) {
return Command.valueOf(commandString.toUpperCase());
}
}
Heron CLI的update指令允許使用者在Topology運作的過程中動态的修改component的并行度(parallelism),使用的指令格式如下:
heron update local/ads/PROD my-topology
--component-parallelism=my-spout:2
--component-parallelism=my-bolt:4
具體的update指令的使用方式,可以通過Heron的文檔進行了解。這裡對其中update指令背後的實作進行拆解和分析,來看看是否能為我們提供一些其他的思路。
ApiServer
在Heron中,ApiServer負責處理使用者從Heron CLI中送出的所有指令,其中當然包括了update的指令。是以我們從這裡的源碼開始。
package com.twitter.heron.apiserver.actions;
public class TopologyRuntimeAction implements Action {
private final Config config;
private final Command command;
TopologyRuntimeAction(Config config, Command command) {
this.config = config;
this.command = command;
}
@Override
public void execute() {
final RuntimeManagerMain runtimeManagerMain = new RuntimeManagerMain(config, command);
runtimeManagerMain.manageTopology();
}
}
可以看到在ApiServer中,構造函數中根據使用者傳入的指令建構了command對象(這裡省略了指令的解析和該類對象的調用過程)。然後在執行的execute方法中,根據Config對象(該對象就是使用者在Topology指定參數并啟動時的配置對象,封裝了topology送出時的所有靜态配置資訊)和command對象建構了RuntimeManagerMain類的對象并調用了manageTopology()方法。該類是在運作時管理拓撲的核心處理類,也就是在topology運作過程中,所有對topology的管理和動态的調整均是由該類中的方法進行處理的。下面我們開看一下該類中調用的manageTopology方法。
RuntimeManagerMain
上面在action類中建構該類對象的時候調用了構造方法,該構造方法隻是對config和command對象進行了初始化,沒有額外的操作:
public RuntimeManagerMain(Config config, Command command) {
// initialize the options
this.config = config;
this.command = command;
}
下面來看一下manageTopology方法:
/**
* Manager a topology
* 1. Instantiate necessary resources
* 2. Valid whether the runtime management is legal
* 3. Complete the runtime management for a specific command
*/
public void manageTopology()
throws TopologyRuntimeManagementException, TMasterException, PackingException {
String topologyName = Context.topologyName(config);
// 1. Do prepare work
// create an instance of state manager
String statemgrClass = Context.stateManagerClass(config);
IStateManager statemgr;
try {
statemgr = ReflectionUtils.newInstance(statemgrClass);
} catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
throw new TopologyRuntimeManagementException(String.format(
"Failed to instantiate state manager class '%s'",
statemgrClass), e);
}
// Put it in a try block so that we can always clean resources
try {
// initialize the statemgr
statemgr.initialize(config);
// TODO(mfu): timeout should read from config
SchedulerStateManagerAdaptor adaptor = new SchedulerStateManagerAdaptor(statemgr, 5000);
validateRuntimeManage(adaptor, topologyName);
// 2. Try to manage topology if valid
// invoke the appropriate command to manage the topology
LOG.log(Level.FINE, "Topology: {0} to be {1}ed", new Object[]{topologyName, command});
// build the runtime config
Config runtime = Config.newBuilder()
.put(Key.TOPOLOGY_NAME, Context.topologyName(config))
.put(Key.SCHEDULER_STATE_MANAGER_ADAPTOR, adaptor)
.build();
// Create a ISchedulerClient basing on the config
ISchedulerClient schedulerClient = getSchedulerClient(runtime);
callRuntimeManagerRunner(runtime, schedulerClient);
} finally {
// 3. Do post work basing on the result
// Currently nothing to do here
// 4. Close the resources
SysUtils.closeIgnoringExceptions(statemgr);
}
}
可以從注釋中看到這裡包含了幾個主要的步驟:
1. 準備工作。包括擷取topologyname,根據config配置對象擷取statemgrClass類名。然後使用反射,根據statemgrClass擷取statemgr對象。該對象就是對Heron狀态中繼資料進行保持并維護的對象,這裡對應的就是Zookeeper。是以後面使用config對象對statemgr進行了初始化并連接配接(逾時時間為5000ms)。最後開始根據topologyname驗證目前拓撲是否處于合法的狀态,能夠進行update操作。
2. 根據指令進行topology的管理。首先這裡建立了一個Config類的runtime對象,這個對象為在運作時的動态配置對象(相對于config對象,runtime中的配置項可以改變)。後面可以看到之後的runtime和config對象,均是從這裡傳入并貫穿整個過程的。
runtime建立完成後,調用了如下getSchedulerClient()的方法,來擷取了schedulerClient對象。為什麼需要擷取scheduler對象呢?如果看過Heron中scheduler的代碼可以知道,每個scheduler執行對topology進行update的具體過程都不相同,是以需要調用具體的scheduer中update方法以完成update指令。是以需要建構scheduelr對象。
protected ISchedulerClient getSchedulerClient(Config runtime)
throws SchedulerException {
return new SchedulerClientFactory(config, runtime).getSchedulerClient();
}
可以看到這裡使用了工廠模式來建構scheduler對象。工廠中的getSchedulerClient方法如下:
/**
* Implementation of getSchedulerClient - Used to create objects
* Currently it creates either HttpServiceSchedulerClient or LibrarySchedulerClient
*
* @return getSchedulerClient created. return null if failed to create ISchedulerClient instance
*/
public ISchedulerClient getSchedulerClient() throws SchedulerException {
LOG.fine("Creating scheduler client");
ISchedulerClient schedulerClient;
if (Context.schedulerService(config)) {
// get the instance of the state manager
SchedulerStateManagerAdaptor statemgr = Runtime.schedulerStateManagerAdaptor(runtime);
Scheduler.SchedulerLocation schedulerLocation =
statemgr.getSchedulerLocation(Runtime.topologyName(runtime));
if (schedulerLocation == null) {
throw new SchedulerException("Failed to get scheduler location from state manager");
}
LOG.log(Level.FINE, "Scheduler is listening on location: {0} ", schedulerLocation.toString());
schedulerClient =
new HttpServiceSchedulerClient(config, runtime, schedulerLocation.getHttpEndpoint());
} else {
// create an instance of scheduler
final IScheduler scheduler = LauncherUtils.getInstance()
.getSchedulerInstance(config, runtime);
LOG.fine("Invoke scheduler as a library");
schedulerClient = new LibrarySchedulerClient(config, runtime, scheduler);
}
return schedulerClient;
}
可以清楚的看到,過程根據if條件分為了兩部分。那麼這個if語句中是對config中的哪個配置項進行驗證呢?其實就是在scheduler.yaml檔案中的如下配置項:
heron.scheduler.is.service: false
該值預設為false,也就是不會将scheduler作為service進行啟動。如果改為true,heron則會将scheduler作為一項http服務保持運作,在用到scheduler時,隻需要使用service的URL來調用即可。是以這也是if語句成立中的基本過程。而我們保持了預設false,那麼scheduler在送出拓撲之後執行個體就會被釋放,是以這裡需要根據配置檔案重新建立一個scheduler對象來在後面調用其中的udpate方法。LauncherUtils.getInstance().getSchedulerInstance(config, runtime)的具體過程如下。根據類名反射建立指定的scheduler執行個體對象,并初始化即可。很好了解。
/**
* Creates and initializes scheduler instance
*
* @return initialized scheduler instances
*/
public IScheduler getSchedulerInstance(Config config, Config runtime)
throws SchedulerException {
String schedulerClass = Context.schedulerClass(config);
IScheduler scheduler;
try {
// create an instance of scheduler
scheduler = ReflectionUtils.newInstance(schedulerClass);
} catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
throw new SchedulerException(String.format("Failed to instantiate scheduler using class '%s'",
schedulerClass));
}
scheduler.initialize(config, runtime);
return scheduler;
}
該部分過程的最後一個步驟是使用scheduler對象,建立了一個LibrarySchedulerClient對象,該對象正是在後面使用scheduler來調用scheduler.onUpdate方法的地方。該類中包含了scheduler中對應的幾個方法調用過程:
@Override
public boolean restartTopology(Scheduler.RestartTopologyRequest restartTopologyRequest) {
// ...
}
@Override
public boolean killTopology(Scheduler.KillTopologyRequest killTopologyRequest) {
// ...
}
@Override
public boolean updateTopology(Scheduler.UpdateTopologyRequest updateTopologyRequest) {
boolean ret = false;
try {
scheduler.initialize(config, runtime);
ret = scheduler.onUpdate(updateTopologyRequest);
} finally {
SysUtils.closeIgnoringExceptions(scheduler);
}
return ret;
}
之後的過程,則是調用了callRuntimeManagerRunner(runtime, schedulerClient); 方法:
protected void callRuntimeManagerRunner(Config runtime, ISchedulerClient schedulerClient)
throws TopologyRuntimeManagementException, TMasterException, PackingException {
// create an instance of the runner class
RuntimeManagerRunner runtimeManagerRunner =
new RuntimeManagerRunner(config, runtime, command, schedulerClient);
// invoke the appropriate handlers based on command
runtimeManagerRunner.call();
}
該方法主要是建立了RuntimeManagerRunner對象來進行具體哪一個的command操作。(上面的分析中ApiServer隻是建構了command對象,但如何對應的調用update方法還沒有出現。正是這裡。)
public void call()
throws TMasterException, TopologyRuntimeManagementException,
PackingException, UpdateDryRunResponse {
// execute the appropriate command
String topologyName = Context.topologyName(config);
switch (command) {
case ACTIVATE: ...
case DEACTIVATE: ...
case RESTART: ...
case KILL: ...
case UPDATE: updateTopologyHandler(topologyName, config.getStringValue(NEW_COMPONENT_PARALLELISM_KEY));
break;
default:
LOG.severe("Unknown command for topology: " + command);
}
}
這裡主要關注update對應的過程:
@VisibleForTesting
void updateTopologyHandler(String topologyName, String newParallelism)
throws TopologyRuntimeManagementException, PackingException, UpdateDryRunResponse {
// 1. 擷取topo基本資訊
LOG.fine(String.format("updateTopologyHandler called for %s with %s",
topologyName, newParallelism));
SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime);
TopologyAPI.Topology topology = manager.getTopology(topologyName);
Map<String, Integer> changeRequests = parseNewParallelismParam(newParallelism);
PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName);
// 2. 變化檢查并建構新packingplan
if (!changeDetected(currentPlan, changeRequests)) {
throw new TopologyRuntimeManagementException(
String.format("The component parallelism request (%s) is the same as the "
+ "current topology parallelism. Not taking action.", newParallelism));
}
PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
topology);
// 3. dryRun模式
if (Context.dryRun(config)) {
PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
PackingPlan oldPlan = deserializer.fromProto(currentPlan);
PackingPlan newPlan = deserializer.fromProto(proposedPlan);
throw new UpdateDryRunResponse(topology, config, newPlan, oldPlan, changeRequests);
}
// 4. 建構updateRequest并進行topo更新操作
Scheduler.UpdateTopologyRequest updateTopologyRequest =
Scheduler.UpdateTopologyRequest.newBuilder()
.setCurrentPackingPlan(currentPlan)
.setProposedPackingPlan(proposedPlan)
.build();
LOG.fine("Sending Updating topology request: " + updateTopologyRequest);
if (!schedulerClient.updateTopology(updateTopologyRequest)) {
throw new TopologyRuntimeManagementException(String.format(
"Failed to update topology with Scheduler, updateTopologyRequest="
+ updateTopologyRequest));
}
// Clean the connection when we are done.
LOG.fine("Scheduler updated topology successfully.");
}
上面在注釋中對該過程進行了劃分。可以看到其中的主要過程如下:
- 擷取topo的基本資訊。包括從runtime中取回adapter(該對象為zookeer的連接配接對象,儲存在了runtime中,這樣在連接配接實效前都可以使用該對象連接配接zookeeper而不用重建連接配接,以提高實時性)。根據topologyName擷取完整的topo資訊。擷取update指令中指定的parallelism的更新資訊,以及擷取目前的packingplan。
- 驗證packingplan中的元件并行度是否發生了變化。
- 根據指定更新的parallelism建構新的packingplan(proposedPackingPlan)
- 使用新的packingplan來建立updateTopologyRequest對象。
- 最後調用schedulerClient.updateTopology()方法,傳入updateTopologyRequest參數,來實施具體的更新過程。
該過程很重要(for me),後面會在其他文章中進行解釋。
到這裡我們代碼是不是就回到了上面分析過的LibrarySchedulerClient中的updateTopology方法了,如下(和上面的一樣):
@Override
public boolean updateTopology(Scheduler.UpdateTopologyRequest updateTopologyRequest) {
boolean ret = false;
try {
scheduler.initialize(config, runtime);
ret = scheduler.onUpdate(updateTopologyRequest);
} finally {
SysUtils.closeIgnoringExceptions(scheduler);
}
return ret;
}
也就是具體執行一個scheduler中的onUpdate方法,對應于我目前的狀況就是AuroraScheduler中的onUpdate方法。這裡先放一下,我們後面再看。
還記得這是上面整體步驟的第2部分嗎?【手動捂臉】。下面先看一下剩下的兩個收尾步驟。
3. 做一些response的工作。根據代碼,目前這部分還沒有任何過程。
4. 結束部分:就是關閉各種資源。其實就是statemgr,zookeeper的連接配接資源。
AuroraScheduler
到這裡,我們上面整體分析了所有的過程,但具體到某個scheduler中的onUpdate方法還沒有進行分析。現在就具體進行這個過程。AuroraScheduler中的onUpdate方法如下:
@Override
public boolean onUpdate(Scheduler.UpdateTopologyRequest request) {
try {
updateTopologyManager.updateTopology(
request.getCurrentPackingPlan(), request.getProposedPackingPlan());
} catch (ExecutionException | InterruptedException e) {
LOG.log(Level.SEVERE, "Could not update topology for request: " + request, e);
return false;
}
return true;
}
可以看到這裡就是調用了updateTopologyManager.updateTopology()方法,并傳入了目前的currentPackingPlan和新的proposedPackingPlan。updateTopologyManager的建立在scheduler的initialize方法中:
@Override
public void initialize(Config mConfig, Config mRuntime) {
this.config = Config.toClusterMode(mConfig);
this.runtime = mRuntime;
try {
this.controller = getController();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
LOG.severe("AuroraController initialization failed " + e.getMessage());
}
this.updateTopologyManager =
new UpdateTopologyManager(config, runtime, Optional.<IScalable>of(this));
}
下面就具體來看一下updateTopologyManager.updateTopology(currentPackingPlan, proposedPackingPlan)的過程:
/**
* Scales the topology out or in based on the proposedPackingPlan
*
* @param existingProtoPackingPlan the current plan. If this isn't what's found in the state
* manager, the update will fail
* @param proposedProtoPackingPlan packing plan to change the topology to
*/
public void updateTopology(final PackingPlans.PackingPlan existingProtoPackingPlan,
final PackingPlans.PackingPlan proposedProtoPackingPlan)
throws ExecutionException, InterruptedException, ConcurrentModificationException {
String topologyName = Runtime.topologyName(runtime);
SchedulerStateManagerAdaptor stateManager = Runtime.schedulerStateManagerAdaptor(runtime);
Lock lock = stateManager.getLock(topologyName, IStateManager.LockName.UPDATE_TOPOLOGY);
if (lock.tryLock(5, TimeUnit.SECONDS)) {
try {
PackingPlans.PackingPlan foundPackingPlan = getPackingPlan(stateManager, topologyName);
if (!deserializer.fromProto(existingProtoPackingPlan)
.equals(deserializer.fromProto(foundPackingPlan))) {
throw new ConcurrentModificationException(String.format(
"The packing plan in state manager is not the same as the submitted existing "
+ "packing plan for topology %s. Another actor has changed it and has likely"
+ "performed an update on it. Failing this request, try again once other "
+ "update is complete", topologyName));
}
updateTopology(existingProtoPackingPlan, proposedProtoPackingPlan, stateManager);
} finally {
lock.unlock();
}
} else {
throw new ConcurrentModificationException(String.format(
"The update lock can not be obtained for topology %s. Another actor is performing an "
+ "update on it. Failing this request, try again once current update is complete",
topologyName));
}
}
從上面的過程可以明顯看到了分布式鎖,Heron中使用了Zookeer實作的分布式鎖,來保證這種更新過程的分布式一緻性。具體分析一下這部分過程:
1. 擷取topo的基本資訊。同樣還是包括topologyName,stateManager對象。
2. 擷取分布式鎖并進行更新操作。該過程中擷取的鎖key對應于topologyName,鎖名為指定的常量updateTopology。然後加鎖過程為5s,鎖中進行同步的過程如下:
- (1)使用stateManager,根據topologyName從zookeeper中擷取目前之前儲存的packingPlan資訊。然後通過反序列化之後與方法中傳入的currentPackingPlan進行比較,看是否一緻。這裡其實是一個校驗的過程,防止待更新的packingPlan和zookeeper中維護的packingPlan資料不一緻的問題。
- (2)調用同名的重載方法執行具體的更新過程。
- (3)釋放鎖。
下面就來看一下調用的同名重載方法的内容:
private void updateTopology(final PackingPlans.PackingPlan existingProtoPackingPlan,
final PackingPlans.PackingPlan proposedProtoPackingPlan,
SchedulerStateManagerAdaptor stateManager)
throws ExecutionException, InterruptedException {
// 1. 擷取反序列化後的topo基本資訊
String topologyName = Runtime.topologyName(runtime);
PackingPlan existingPackingPlan = deserializer.fromProto(existingProtoPackingPlan);
PackingPlan proposedPackingPlan = deserializer.fromProto(proposedProtoPackingPlan);
Preconditions.checkArgument(proposedPackingPlan.getContainers().size() > 0, String.format(
"proposed packing plan must have at least 1 container %s", proposedPackingPlan));
// 2. 驗證并計算container的update數量
ContainerDelta containerDelta = new ContainerDelta(
existingPackingPlan.getContainers(), proposedPackingPlan.getContainers());
int newContainerCount = containerDelta.getContainersToAdd().size();
int removableContainerCount = containerDelta.getContainersToRemove().size();
String message = String.format("Topology change requires %s new containers and removing %s "
+ "existing containers, but the scheduler does not support scaling, aborting. "
+ "Existing packing plan: %s, proposed packing plan: %s",
newContainerCount, removableContainerCount, existingPackingPlan, proposedPackingPlan);
Preconditions.checkState(newContainerCount + removableContainerCount == 0
|| scalableScheduler.isPresent(), message);
// 3. 驗證目前topo的狀态是否合法
TopologyAPI.Topology topology = getTopology(stateManager, topologyName);
boolean initiallyRunning = topology.getState() == TopologyAPI.TopologyState.RUNNING;
// 4. 開始更新操作
// (1)暫定topo運作
// deactivate and sleep
if (initiallyRunning) {
// Update the topology since the state should have changed from RUNNING to PAUSED
// Will throw exceptions internally if tmaster fails to deactivate
deactivateTopology(stateManager, topology, proposedPackingPlan);
}
// (2)調用目前scheduler的addContainer或者removeAll方法來更新topo
Set<PackingPlan.ContainerPlan> updatedContainers =
new HashSet<>(proposedPackingPlan.getContainers());
// request new resources if necessary. Once containers are allocated we should make the changes
// to state manager quickly, otherwise the scheduler might penalize for thrashing on start-up
if (newContainerCount > 0 && scalableScheduler.isPresent()) {
Set<PackingPlan.ContainerPlan> containersToAdd = containerDelta.getContainersToAdd();
Set<PackingPlan.ContainerPlan> containersAdded =
scalableScheduler.get().addContainers(containersToAdd);
// Update the PackingPlan with new container-ids
if (containersAdded != null) {
updatedContainers.removeAll(containersToAdd);
updatedContainers.addAll(containersAdded);
}
}
// (3)根據更新後的container數量,重新建構一個新的packingPlan對象(updatedPackingPlan)
PackingPlan updatedPackingPlan =
new PackingPlan(proposedPackingPlan.getId(), updatedContainers);
PackingPlanProtoSerializer serializer = new PackingPlanProtoSerializer();
PackingPlans.PackingPlan updatedProtoPackingPlan = serializer.toProto(updatedPackingPlan);
LOG.fine("The updated Packing Plan: " + updatedProtoPackingPlan);
// (4)序列化後更新stateManager中的packingplan資料
// update packing plan to trigger the scaling event
logInfo("Update new PackingPlan: %s",
stateManager.updatePackingPlan(updatedProtoPackingPlan, topologyName));
// 5. 重新激活拓撲
// reactivate topology
if (initiallyRunning) {
// wait before reactivating to give the tmaster a chance to receive the packing update and
// delete the packing plan. Instead we could message tmaster to invalidate the physical plan
// and/or possibly even update the packing plan directly
SysUtils.sleep(Duration.ofSeconds(10));
// Will throw exceptions internally if tmaster fails to deactivate
reactivateTopology(stateManager, topology, removableContainerCount);
}
if (removableContainerCount > 0 && scalableScheduler.isPresent()) {
scalableScheduler.get().removeContainers(containerDelta.getContainersToRemove());
}
}
具體的過程分解在代碼中進行了标注。在5步驟中,可以看到在執行update操作後,拓撲從deactivate到reactivate過程中,會睡眠10s的時間,以為tmaster留出時間來更新packingplan和physicalplan的資料資訊。
結束語
至此,我們分析完了Heron CLI中update指令背後的實作過程。其中可能有些細節未涉及到,請參考源碼,自己後續還會進行一些補充。感謝您的閱讀,如有錯誤之處,歡迎指正。
對一項技術的掌握,熟練使用應該是第一個層面,而從底層出發來看一項功能,一個元件的具體實作時,則會對該技術掌握的更加清晰。那麼什麼是底層,源碼和架構是我認為的學習的兩個角度。同時,開源項目的源碼有很多可以學習和深究的内容。例如,上面的使用zookeeper實作分布式鎖的實作部分,就是一個很好的學習機會。後面有機會也會對這方面的内容進行拆解和分析。而這次的update指令實作解析,也是為了之後的内容做了一個基礎,一個鋪墊。
部落客碼字不易,轉載請注明出處。