一,题目
Instruction:
Please
complete the following task. Use whatever tools you find appropriate
(using the Internet, for example, is assumed). Linux, C++, Python,
CMake, Qt, and OpenCV are preferred for development but you may use
other tools if you are not familiar with these. Please include the
following support material
A
discussion of your approach and all the assumptions and tradeoffs
that you made
Any
build configuration files and instructions for compiling
A
list of any packages/utilities required to build and execute your
solution
Any
test code, or test data sets you generated
References to other
materials as appropriate
Please
do not spend more than 8‐10 hours on this task and feel free to ask
questions. While we would love to see a completed product, we expect
that your final solution may not be completely “finished” in the
available time. We are more interested in seeing how you approach a
problem, how you manage the deadline, how you communicate about your
work, and the quality of your work product.
Task: Build a planner
Engineers
are expected to be able to reason about large systems and the complex
interactions they may have. We need to be able to write software that
will coordinate and manage a variety of different tasks.
Please
write software that will tackle the following problem. Use whatever
tools you find appropriate, Please document you work, discuss your
approach, and enumerate all the assumptions and trade-offs that you
made. Please do not spend more than 8-10 hours on this, We are most
interested in seeing how you approach a problem, how you manage the
deadline, how you communicate your thoughts around the problem, and
what your view of “quality” is. We wish you to submit your
software, instructions on how to run it, an example data set that we
can run, and documentation on any design decisions or assumptions you
made.
Imagine
you are preparing a system to be able to coordinate the execution of
a large number of tasks. Many of these tasks are dependent on each
other and all have different requirements. As resources, you have a
heterogeneous cluster of computers. You wish to exploit parallelism
and complete the entire set of tasks as quickly as possible.
We
will define a list of tasks, and their dependencies, in a YAML file
that looks like so:
task1:
cores_required: 2
execution_time: 100
task2:
cores_required: 1
execution_time: 200
parent_tasks: task1
task3:
cores_required: 4
execution_time: 50
parent_tasks: “task1,
task2”
cores_required
means that the all the cores must be available on the same computing
resource, execution_time is the number of global “ticks” that
pass before the task will be complete and can release all its
computing resources, and the parent_tasks must be completely finished
executing before a particular task can start.
We
will then also specify the computing resources and their number of
available cores as so:
compute1:
2
compute2:
2
compute3:
6
Please
implement a program that reads those two files, and then emits an
ordered scheduling plan that manages all the dependencies (execution
time, number of cores, parent tasks). The goal is to execute the full
lists of tasks as quickly as possible.
The
plan may look something like:
task1:
compute1
task2:
compute1
task3:
compute3
二,解答
1.这个任务的时间规划如下:
(1)30分钟分析这个任务,包括调研一些可能使用到一些技术方案,例如这个里面涉及到YAML文件的解析,通过网络需要找到一个可用的库并简单demo验证;
(2)使用6小时30分钟进行代码编写和测试;
(3)最后一个小时做一些文档整理和软件发布准备;
(4)如果其中有些计划延迟,可以在多出一点时间来修补,这个必须要控制在两个小时以内。
其中第二步的代码编写和测试有最大的不可控,特别是在完成任务调度算法的时候。
2.下面在说说自己完成这个任务的一些思路
(1)解析任务:第一遍完整的看完这个题目,想到的就是一个任务调度功能,由于平时自己接触过一些任务调度,例如操作系统里面的任务调度,还有很多开源的任务调度框架,包括hadoop里面的最新yarn也涉及到业务调度。所以第一个想法就是可不可以找一个比较好的开源任务调度框架来完成这个功能,但是仔细一想,这个题目也自己特殊的要求,例如从配置文件读取任务列表和计算资源池,而且任务还有依赖。那么我的第二个想法是能不能找一个简单的框架进行改进来完成,其实大部分框架都是比较复杂,如果要改造那么需要对框架有深入的掌握,这个可能也不是在8-10个小时内能够完成的。所以经过简单想法验证以后,我还是决定了从0开始自己实现,毕竟是一个功能比较单一的任务。不过下这个决定的时候自己也不清楚在有限的时间内能够完成到什么样的程度,不过我想基本的功能应该实现是没有问题的,因为自己以前了解过大部分的任务调度算法,不过肯定达不到最好的产品和最好的算法(性能)的要求。
(2)分解任务:我首先简单把这个任务分解成两个子任务,一个是解析配置文件,二是实现具体的调度算法。第一个比较简单,也很容易验证测试;第二个刚开始也认为不是很难,不过后面在考虑这个情况和测试的时候会遇到很多问题。
(3)实现任务:因为分解成了两个任务,所以先实现第一个任务。第一个任务涉及到两种不同配置文件解析。第一个是yaml格式的,因为yaml格式以前没有接触过,但是我想既然是一种通用的格式,应该很容易找到开源的实现,经过简单的查找和验证,snakeyaml这个开源库很容易实现了解析yaml格式的需求。至于第二个文件格式,一看就有点像是键值对的实现,简单一查java自带的就有。
第二个任务,也就是实现任务调度算法。从配置文件拿到了任务列表和计算资源池,那么就可以根据这些进行任务调度了。这个算法给人第一映像很简单,不就是给任务分配合适的资源嘛。但是真正开始构思算法的时候才发现有很多情况需要考虑。先说一下异常情况,就是不能完成调度的情况。题目有一个限制就是一个任务必须一次性在一个计算资源上进行分配,那么就存在这样一个情况,如果有一个任务的计算资源很大,没有一个计算资源能够提供,那么肯定就完不成;第二种情况就是如果任务存在这循环依赖,肯定也是完不成的,检查循环依赖就是附加出来的一个算法,这个是前期考虑不足的,所以在计划时间外,那么在有限的时间里面,肯定要重新调整一下。在实现计算资源是否满足需要和验证任务循环依赖,我都单独来完成,而不是和调度算法一起,因为一起做考虑,毕竟很复杂,也许那样性能会好一些(不需要一次一次的遍历所有资源池和任务列表)。在实现算法的时候,需要考虑时间和资源需求的双重搭配,完成一次计算需要重新计算调度,已经计算资源可以重复利用,每次完成一次调度,需要重新更新资源池大小和任务队列,很多情况需要考虑。每一种情况有需要完成一点就验证一点,具体就不详细解释了。
3.最终完成介绍
(1)把实现的功能打成一个jar包,执行如下命令可以运行:
java
-jar taskdispatch-0.0.1-SNAPSHOT.jar
/home/wuyouqiang/workspace/taskdispatch/src/main/resource/task.yaml
/home/wuyouqiang/workspace/taskdispatch/src/main/resource/compute.conf
后面两个参数分别是任务列表的配置文件和计算资源池的配置文件(具体配置文件的路径根据运行时的环境确定,上面的只是我电脑上的演示)。如果没有提供,程序有简单提示,如果多了也只认前两个。
上面两个配置文件分别如下:
/home/wuyouqiang/workspace/taskdispatch/src/main/resource/task.yaml:
task1:
cores_required:
2
execution_time:
100
task2:
cores_required:
1
execution_time:
200
task3:
cores_required:
4
execution_time:
50
parent_tasks:
“task1, task2”
task4:
cores_required:
2
execution_time:
80
parent_tasks:
“task2”
/home/wuyouqiang/workspace/taskdispatch/src/main/resource/compute.conf:
compute1:
2
compute2:
2
compute3:
6
下面是运行的结果(中间有一些记录任务调度过程,方便调试的信息):
开始调度任务 : [task1]到计算资源compute2
开始调度任务 : [task2]到计算资源compute1
删除任务
:task1
compute2恢复资源
: 2
删除任务[task3]的依赖任务 : [task1]
删除任务
:task2
compute1恢复资源
: 1
删除任务[task3]的依赖任务 : [task2]
删除任务[task4]的依赖任务 : [task2]
开始调度任务 : [task3]到计算资源compute3
开始调度任务 : [task4]到计算资源compute1
删除任务
:task3
compute3恢复资源
: 4
删除任务
:task4
compute1恢复资源
: 2
任务调度成功,调度结果如下:
task1:compute2
task2:compute1
task3:compute3
task4:compute1
(2)也提供maven或者eclipse功能,代码里有一些简单的注释,是自己写代码时的思考,不一定对,但对我的思路有一个记录作用,里面有一些简单的测试样例。
4.思考
(1)首先代码肯定实现不够完美,测试用例也不够完善,最终实现的算法也不一定是最优化的,也不一定是完全正确的。只能说在有限的时间里完成了题目的要求,并且我也觉得可用了。
(2)这个只是一个任务,如果真正做成一个完整的产品可能需要更多的考虑。我们目前完成这个最多算一个beta版本。其实产品的考虑更多提供一个通用的方式,例如提供接口直接传递任务和计算资源信息。另外在互联网公司本能的考虑到7*24小时的可靠性,所以怎么做防单点,基本的利用zookeeper等从master和slave的切换等。
(3)想法太多,不过优先需要考虑怎么在规定的时间里把任务完成的最好,因为任何产品都是可以无止境的优化和改进。
(4)技术方案选择上,同一个任务,有太多的方案可以选择,而且没有任何一种方案是绝对的有优势,所以我一般选择能够满足任务要求的并且最容易实现的。这个题目选择了java的技术体系和eclipse+maven做为开发工具,虽然我对他们不是很熟悉,但是我相信通过网络最容易找到他们的资料。所以选择,如果选择其他语言体系来实现,可能8-10小时不容易。
三,代码实现
1.工程结构如下:
2.各个文件代码详细实现,代码中有注释
(1)App.java
package com.fa.taskdispatch;
import java.util.ArrayList;
import java.util.HashMap;
public class App {
public static void main(String[] args) {
if (args.length < 2) {
System.out.println("Usage : 需要两个运行参数,第一个是任务列表的文件路径,第二个是计算资源的文件路径");
System.exit(1);
}
AnalyticalResource ar = new AnalyticalResource(args[0], args[1]);
HashMap<String, Task> taskMap = new HashMap<String, Task>();
if (!ar.resolveTaskListFile(taskMap)) {
System.out.println("解析任务列表的YAML文件失败,系统退出.");
System.exit(1);
}
ArrayList<ComputeResource> computeResourceList = new ArrayList<ComputeResource>();
if (!ar.resolveComputeResourceFile(computeResourceList)) {
System.out.println("解析计算资源文件失败,系统退出.");
System.exit(1);
}
TaskDispatch td = new TaskDispatch(taskMap, computeResourceList);
ArrayList<String> dispatchResult = new ArrayList<String>();
if (td.dispatch(dispatchResult)) {
System.out.println("任务调度成功,调度结果如下: ");
for (String name : dispatchResult) {
System.out.println(name);
}
} else {
System.out.println("任务调度失败.");
}
}
}
复制
(2) ComputeResource.java
package com.fa.taskdispatch;
public class ComputeResource {
private String name;
private int cores;
public ComputeResource(String name, int cores) {
this.name = name;
this.cores = cores;
}
public void subCores(int number) {
cores = cores - number;
}
public void addCores(int number) {
cores = cores + number;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getCores() {
return cores;
}
public void setCores(int cores) {
this.cores = cores;
}
}
复制
(3) Task.java
package com.fa.taskdispatch;
import java.util.ArrayList;
public class Task {
private String name;
private int coresRequired;
private int executionTime;
private ArrayList<String> parentTasks;
public Task(String name, int coresRequired, int executionTime) {
this(name, coresRequired, executionTime, new ArrayList<String>());
}
public Task(String name, int coresRequired, int executionTime, ArrayList<String> parentTasks) {
this.name = name;
this.coresRequired = coresRequired;
this.executionTime = executionTime;
this.parentTasks = parentTasks;
}
public boolean isDependMyself() {
boolean result = false;
for (String p : parentTasks) {
if (p.equals(name)) {
result = true;
}
}
return result;
}
public boolean parentTasksIsEmpty() {
return parentTasks == null || parentTasks.isEmpty();
}
//删除依赖任务中的一个
public void deleteOneParentTasks(String name) {
parentTasks.remove(name);
}
public void subExecutionTime(int number) {
executionTime = executionTime - number;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getCoresRequired() {
return coresRequired;
}
public void setCoresRequired(int coresRequired) {
this.coresRequired = coresRequired;
}
public int getExecutionTime() {
return executionTime;
}
public void setExecutionTime(int executionTime) {
this.executionTime = executionTime;
}
public ArrayList<String> getParentTasks() {
return parentTasks;
}
public void setParentTasks(ArrayList<String> parentTasks) {
this.parentTasks = parentTasks;
}
}
复制
(4) AnalyticalResource.java
package com.fa.taskdispatch;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.yaml.snakeyaml.Yaml;
public class AnalyticalResource {
private String taskListFile = "";
private String computeResource = "";
public AnalyticalResource(String taskListFile, String computeResource) {
this.taskListFile = taskListFile;
this.computeResource = computeResource;
}
public boolean resolveTaskListFile(HashMap<String, Task> tasks) {
boolean result = true;
if (tasks == null) {
tasks = new HashMap<String, Task>();
}
Yaml y = new Yaml();
File f = new File(taskListFile);
try {
HashMap tasksMap = (HashMap) y.load(new FileInputStream(f));
for (Object taskName : tasksMap.keySet()) {
HashMap taskMap = (HashMap)tasksMap.get(taskName);
int coresRequired = (Integer)taskMap.get("cores_required");
int executionTime = (Integer)taskMap.get("execution_time");
String parentTasks = (String)taskMap.get("parent_tasks");
String name = (String)taskName;
if (parentTasks != null && !parentTasks.isEmpty()) {
String parents[] = parentTasks.split(",");
ArrayList<String> parentList = new ArrayList<String>();
for (String parent : parents) {
parentList.add(parent.trim());
}
Task t = new Task(name.trim(), coresRequired, executionTime, parentList);
tasks.put(name.trim(), t);
} else {
Task t = new Task(name.trim(), coresRequired, executionTime);
tasks.put(name.trim(), t);
}
}
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
result = false;
e.printStackTrace();
}
return result;
}
public boolean resolveComputeResourceFile(ArrayList<ComputeResource> computeResourceList){
boolean result = true;
if (computeResourceList == null) {
computeResourceList = new ArrayList<ComputeResource>();
}
try {
Properties pt = new Properties();
pt.load(new FileInputStream(computeResource));
if (pt.isEmpty()) {
result = false;
System.out.println("没有计算资源,没有办法进行调度,也算失败,需要退出.");
}
for (Map.Entry<Object, Object> entry : pt.entrySet()) {
String name = (String)entry.getKey();
int cores = Integer.parseInt(entry.getValue().toString().trim());
ComputeResource cr = new ComputeResource(name.trim(), cores);
computeResourceList.add(cr);
}
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
result = false;
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
result = false;
e.printStackTrace();
}
return result;
}
}
复制
(5)最重要的类TaskDispatch.java
package com.fa.taskdispatch;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
public class TaskDispatch {
private HashMap<String, Task> taskMap;
private ArrayList<ComputeResource> computeResourceList;
public TaskDispatch(HashMap<String, Task> taskMap, ArrayList<ComputeResource> computeResourceList) {
this.taskMap = taskMap;
this.computeResourceList = computeResourceList;
}
//检查从某一个任务开始是否存在循环依赖
public boolean checkOneTaskCycleTaskDepend(Task t) {
//检查循环依赖就是循环的检查parent_tasks是否会到达任务本身
//这个是需要改变的,所有单独copy一份出来
ArrayList<String> parentList = new ArrayList<String>(t.getParentTasks());
String checkTaskName = t.getName();
while (parentList != null && !parentList.isEmpty()) {
ArrayList<String> newParentList = new ArrayList<String>();
for (String parent : parentList) {
if (parent.equals(checkTaskName)) {
System.out.println("任务["+ checkTaskName + "]存在了循环依赖,不能完成调度.");
return true;
}
Task dependTask = taskMap.get(parent);
if (dependTask.isDependMyself()) {
System.out.println("任务["+ dependTask.getName() + "]存在自己依赖自己的循环依赖,不能完成调度.");
return true;
}
if (dependTask!= null && !dependTask.parentTasksIsEmpty()) {
newParentList.addAll(dependTask.getParentTasks());
}
}
parentList = newParentList;
}
return false;
}
//检查任务是否存在循环依赖,如果存在也是不能调度
public boolean checkCycleTaskDepend() {
boolean result = false;
for (String taskName : taskMap.keySet()) {
Task t = taskMap.get(taskName);
if (t.parentTasksIsEmpty()) {
continue;
}
if (checkOneTaskCycleTaskDepend(t)) {
result = true;
}
if (result) {
break;
}
}
return result;
}
public boolean dispatch(ArrayList<String> dispatchResults) {
//1.验证可调度,如果有一个任务需要的cores_required超过了所有计算机的资源就不能调度
//(1)找出所有任务中需要最大cores_required
boolean result = true;
int maxCoresRequired = 0;
for (String taskName : taskMap.keySet()) {
Task t = taskMap.get(taskName);
if (t.getCoresRequired() > maxCoresRequired) {
maxCoresRequired = t.getCoresRequired();
}
}
//(2)找出计算资源的最大值
int maxComputeResource = 0;
for (ComputeResource cr : computeResourceList) {
if (cr.getCores() > maxComputeResource) {
maxComputeResource = cr.getCores();
}
}
//(3)比较
if (maxCoresRequired > maxComputeResource) {
System.out.println("单个任务需要的计算资源大于了任何一台计算机能够提供的资源,所以不能完成这个任务");
return false;
}
//2.验证是否有循环依赖
if (checkCycleTaskDepend()) {
System.out.println("任务列表里面存在这个循环依赖,不能完成任务调度");
return false;
}
//3.调度任务:首先需要做任务的编排(因为任务之间有依赖)
//(1)首先找出本次可以调度的所有任务(对于第一次当然就是没有任何依赖的任务);
//(2)然后有一个以上的任务完成以后在查找依赖这些以完成任务的任务是否可以进行调度了;
//(3)每次调度都需要有资源的情况下进行;
//(4)最终形成一个可调用的任务和可使用的计算资源进行匹配;
if (dispatchResults == null) {
dispatchResults = new ArrayList<String>();
}
ArrayList<Task> readyTask = new ArrayList<Task>();
HashMap<String, Task> runningTask = new HashMap<String, Task>();
while (!taskMap.isEmpty()) {//直到所有任务都分配计算资源以后结束调度
for (String taskName : taskMap.keySet()) {
Task t = taskMap.get(taskName);
if (t.parentTasksIsEmpty() && !readyTask.contains(t)) {
readyTask.add(t);
}
}
//根据可调度的任务和可以使用的资源进行一次调度,调度之前先对任务和资源进行排序
//1.把可以调度的任务按照时间从短到长排序
Comparator<Task> taskComparator = new Comparator<Task>(){
public int compare(Task t1, Task t2) {
return t1.getExecutionTime() - t2.getExecutionTime();
}
};
Collections.sort(readyTask, taskComparator);
//2.把可以利用的计算资源排序
Comparator<ComputeResource> computeComparator = new Comparator<ComputeResource>(){
public int compare(ComputeResource c1, ComputeResource c2) {
return c1.getCores() - c2.getCores();
}
};
Collections.sort(computeResourceList, computeComparator);
oneDispatchTask(dispatchResults, runningTask, readyTask, computeResourceList);
//调度完成一次,需要把时间最短(可能有多个)的任务结束掉,从可调度的任务里面删除掉,并且重新计算可调度的任务,并且恢复资源
int minTime = readyTask.get(0).getExecutionTime();
ArrayList<Task> deleteTaskName = new ArrayList<Task>();
for (Task t : readyTask) {
if (minTime >= t.getExecutionTime()) {//删除掉已经完成的任务,就是时间最小的
System.out.println("删除任务 :" + t.getName());
deleteTaskName.add(t);//记住删除了那些任务,后面用于更新readyTask
runningTask.remove(t.getName());
taskMap.remove(t.getName());
//恢复资源
for (String name : dispatchResults) {
if (name.contains(t.getName())) {
String arr[] = name.split(":");
String computeName = arr[1];
for (ComputeResource cr : computeResourceList) {
if (cr.getName().equals(computeName)) {
System.out.println(computeName + "恢复资源 : " + t.getCoresRequired());
cr.addCores(t.getCoresRequired());
}
}
}
}
//继续删除其他任务依赖这个任务的依赖任务
for (String taskName : taskMap.keySet()) {
Task task = taskMap.get(taskName);
if (task.getParentTasks().contains(t.getName())) {
task.deleteOneParentTasks(t.getName());
System.out.println("删除任务[" + task.getName() + "]的依赖任务 : [" + t.getName() + "]" );
}
}
}
}
//其他已经在调度的任务需要减去这个时间
for (String taskName : runningTask.keySet()) {
Task t = runningTask.get(taskName);
t.subExecutionTime(minTime);
}
//删除可调度任务里面的已经完成的任务
for (Task t : deleteTaskName) {
readyTask.remove(t);
}
}
return result;
}
//调度算法:每次调度的时候选择按照从时间从短到长的排序,优先调度时间短的,并且找一个资源能够满足调度任务的最小的计算资源分配
public void oneDispatchTask(ArrayList<String> dispatchResult, HashMap<String, Task> runningTask,
ArrayList<Task> readyTask, ArrayList<ComputeResource> availableResources) {
for (Task t : readyTask) {
//如果任务已经在运行就不需要在继续分配资源调度了
if (runningTask.containsKey(t.getName())) {
continue;
}
for (ComputeResource cr : availableResources) {
//如果满足资源需求,就分配这个任务到这一个计算资源了
if (t.getCoresRequired() <= cr.getCores()) {
dispatchResult.add(t.getName() + ":" + cr.getName());
runningTask.put(t.getName(), t);
System.out.println("开始调度任务 : [" + t.getName() + "]到计算资源" + cr.getName());
//减去这个计算资源已经使用的cores
cr.subCores(t.getCoresRequired());
break;
}
}
}
}
}
复制