天天看点

YARN : FairScheduler深入解析(队列维护,demand、fair share计算)

一、概要

首先,YARN FairScheduler主要做的事情:

① 处理NM心跳NodeUpdate,分配container。

② 树状维护队列和任务,定时计算fair share等信息,并进行排序。

本文重点分析②

二、代码

1、初始化FairScheduler

在RM启动时会初始化FairScheduler,

private void initScheduler(Configuration conf) throws IOException {
    synchronized (this) {
      this.conf = new FairSchedulerConfiguration(conf);
      validateConf(this.conf);
      minimumAllocation = this.conf.getMinimumAllocation();
      initMaximumResourceCapability(this.conf.getMaximumAllocation());
      incrAllocation = this.conf.getIncrementAllocation();
      // 持续调度,默认false,一般用于时效性高的实时任务
      continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
      continuousSchedulingSleepMs =
          this.conf.getContinuousSchedulingSleepMs();
      balanceSchedulingEnabled = this.conf.isBalanceSchedulingEnabled();
      ...
      preemptionUtilizationThreshold =
          this.conf.getPreemptionUtilizationThreshold();
      // 一次性分配多个container,加大吞吐
      assignMultiple = this.conf.getAssignMultiple();
      maxAssignDynamic = this.conf.isMaxAssignDynamic();
      maxAssign = this.conf.getMaxAssign();
      sizeBasedWeight = this.conf.getSizeBasedWeight();
      preemptionInterval = this.conf.getPreemptionInterval();
      waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
      usePortForNodeName = this.conf.getUsePortForNodeName();
      reservableNodesRatio = this.conf.getReservableNodes();

      if (this.conf.isCpuSchedulingEnabled()) {
        RESOURCE_CALCULATOR = new CpuResourceCalculator();
      } else {
        RESOURCE_CALCULATOR = new DefaultResourceCalculator();
      }

	  // 重新计算fair share的频率
      updateInterval = this.conf.getUpdateInterval();
      if (updateInterval < 0) {
        updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
        LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS
            + " is invalid, so using default value " +
            +FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
            + " ms instead");
      }

	  // 一些性能指标打点采集
      rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
      fsOpDurations = FSOpDurations.getInstance(true);

      // This stores per-application scheduling information
      this.applications = new ConcurrentHashMap<
          ApplicationId, SchedulerApplication<FSAppAttempt>>();
      this.eventLog = new FairSchedulerEventLog();
      eventLog.init(this.conf);

      allocConf = new AllocationConfiguration(conf);

      rmNodeLabelsManager = rmContext.getNodeLabelManager();
      try {
        // QueueManager管理队列及挂在队列下的application
        queueMgr.initialize(conf);
      } catch (Exception e) {
        throw new IOException("Failed to start FairScheduler", e);
      }

	  // 启动定时计算demand和fair share线程
      updateThread = new UpdateThread();
      updateThread.setName("FairSchedulerUpdateThread");
      updateThread.setDaemon(true);

	  // 启动持续调度线程
      if (continuousSchedulingEnabled) {
        // start continuous scheduling thread
        schedulingThread = new ContinuousSchedulingThread();
        schedulingThread.setName("FairSchedulerContinuousScheduling");
        schedulingThread.setDaemon(true);
      }
    }

	// 初始化AllocationFileLoaderService
    allocsLoader.init(conf);
    // If we fail to load allocations file on initialize, we want to fail
    // immediately.  After a successful load, exceptions on future reloads
    // will just result in leaving things as they are.
    try {
      allocsLoader.reloadAllocations();
      // 获取 NM label
      rmNodeLabelsManager.reinitializeQueueLabels(getQueueToLabels());
    } catch (Exception e) {
      throw new IOException("Failed to initialize FairScheduler", e);
    }
  }
           
2、updateDemand

更新每个队列、application资源需求

public void updateDemand() {
    // Compute demand by iterating through apps in the queue
    // Limit demand to maxResources
    Resource maxRes = scheduler.getAllocationConfiguration()
        .getMaxResources(getName());
    demand = Resources.createResource(0);
    readLock.lock();
    try {
      for (FSAppAttempt sched : runnableApps) {
        // demand达上限,break
        if (Resources.equals(demand, maxRes)) {
          break;
        }
        // 内部逻辑是把当前已经占用的资源加上额外请求的资源总和
        // 遍历每个额外请求,对所请求的资源求和,如果加起来大于
        // 最大资源限制,则将demand设为mapRes
        updateDemandForApp(sched, maxRes);
      }
      for (FSAppAttempt sched : nonRunnableApps) {
        if (Resources.equals(demand, maxRes)) {
          break;
        }
        updateDemandForApp(sched, maxRes);
      }
    } finally {
      readLock.unlock();
    }
  
    // sort it in advance.
    Comparator<Schedulable> comparator = policy.getComparator();
    writeLock.lock();
    try {
      // 对队列 application进行排序
      Collections.sort(runnableApps, comparator);
    } finally {
      writeLock.unlock();
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("The updated demand for " + getName() + " is " + demand
          + "; the max is " + maxRes);
    }
  }
           

队列排序规则,FairShareComparator:

public int compare(Schedulable s1, Schedulable s2) {
      Priority priority1 = s1.getPriority();
      Priority priority2 = s2.getPriority();
      // 先比较优先级,优先级低排后
      if (!priority1.equals(priority2)) {
        return priority1.compareTo(priority2);
      }

      Resource demand1 = s1.getDemand();
      Resource demand2 = s2.getDemand();
      // 不需要资源的,排后
      if (demand1.equals(Resources.none()) &&
          !demand2.equals(Resources.none())) {
        return 1;
      } else if (demand2.equals(Resources.none()) &&
          !demand1.equals(Resources.none())) {
        return -1;
      }

      double minShareRatio1, minShareRatio2;
      double useToWeightRatio1, useToWeightRatio2;
      double weight1, weight2;
      //Do not repeat the getResourceUsage calculation
      Resource resourceUsage1 = s1.getResourceUsageFaster();
      Resource resourceUsage2 = s2.getResourceUsageFaster();
      // 取min share和资源需求中的最小值
      Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
          s1.getMinShare(), demand1);
      Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
          s2.getMinShare(), demand2);
      // 根据当前使用资源和minShare比较,如果小于则需要资源
      boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
          resourceUsage1, minShare1);
      boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
          resourceUsage2, minShare2);
      // 内存使用占比
      minShareRatio1 = (double) resourceUsage1.getMemory()
          / Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory();
      minShareRatio2 = (double) resourceUsage2.getMemory()
          / Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory();

      // 比较队列权重
      weight1 = s1.getWeights().getWeight(ResourceType.MEMORY);
      weight2 = s2.getWeights().getWeight(ResourceType.MEMORY);
      if (weight1 > 0.0 && weight2 > 0.0) {
        // 根据队列权重计算比例
        useToWeightRatio1 = resourceUsage1.getMemory() / weight1;
        useToWeightRatio2 = resourceUsage2.getMemory() / weight2;
      } else { // Either weight1 or weight2 equals to 0
        if (weight1 == weight2) {
          // 权重相等则直接比较使用的内存
          useToWeightRatio1 = resourceUsage1.getMemory();
          useToWeightRatio2 = resourceUsage2.getMemory();
        } else {
          // 权重绝对值越接近0,排后
          useToWeightRatio1 = -weight1;
          useToWeightRatio2 = -weight2;
        }
      }

      int res = 0;
      if (s1Needy && !s2Needy)
        res = -1;
      else if (s2Needy && !s1Needy)
        res = 1;
      else if (s1Needy && s2Needy)
        // 比谁share大,用的多 排后
        res = (int) Math.signum(minShareRatio1 - minShareRatio2);
      else
        // 比谁使用权重占比大,大的排后
        res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
      if (res == 0) {
        // Apps are tied in fairness ratio. Break the tie by submit time and job
        // name to get a deterministic ordering, which is useful for unit tests.
        // 仍然相同则比开始时间,后开始的 排后
        res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
        if (res == 0)
          // 还相同,比name字符串,字典序靠前的先跑
          res = s1.getName().compareTo(s2.getName());
      }
      return res;
    }
           
3、recomputeShares

计算Instantaneous Fair Share和Steady Fair Share逻辑

ComputeFairShares.computeSharesInternal:

private static void computeSharesInternal(
      Collection<? extends Schedulable> allSchedulables,
      Resource totalResources, ResourceType type, boolean isSteadyShare) {

    // 前提:type均为memory,vcore在此不考虑,DominantResourceFairnessPolicy会同时考虑memory和vcore做排序。
    // fair-scheduler.xml中配置的所有队列minShare之和必小于集群NM资源总和

    // 过滤出需要参与计算fair share的队列
    // isSteadyShare=true,过滤掉weight和maxShare不符合规定的队列
    // isSteadyShare=false,过滤掉weight和maxShare不符合规定的队列、没有running application的队列
    Collection<Schedulable> schedulables = new ArrayList<Schedulable>();
    int takenResources = handleFixedFairShares(
        allSchedulables, schedulables, isSteadyShare, type);

    if (schedulables.isEmpty()) {
      return;
    }
    // Find an upper bound on R that we can use in our binary search. We start
    // at R = 1 and double it until we have either used all the resources or we
    // have met all Schedulables' max shares.
    // 获取所有队列的MaxShare总和
    int totalMaxShare = 0;
    for (Schedulable sched : schedulables) {
      int maxShare = getResourceValue(sched.getMaxShare(), type);
      totalMaxShare = (int) Math.min((long)maxShare + (long)totalMaxShare,
          Integer.MAX_VALUE);
      if (totalMaxShare == Integer.MAX_VALUE) {
        break;
      }
    }

    // 剩余参与fair share分配的总资源
    int totalResource = Math.max((getResourceValue(totalResources, type) -
        takenResources), 0);
    totalResource = Math.min(totalMaxShare, totalResource);

    // rMax为第一个能够满足所有队列资源均在min和max之间,且大于集群总资源,从1开始每次扩大一倍
    double rMax = 1.0;
    while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)
        < totalResource) {
      rMax *= 2.0;
    }
    // 二分rMax,迭代25次或算出来的资源刚好等于totalResource
    // 目的是得出来的总资源标准尽量接近真实总资源
    double left = 0;
    double right = rMax;
    for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
      double mid = (left + right) / 2.0;
      int plannedResourceUsed = resourceUsedWithWeightToResourceRatio(
          mid, schedulables, type);
      if (plannedResourceUsed == totalResource) {
        right = mid;
        break;
      } else if (plannedResourceUsed < totalResource) {
        left = mid;
      } else {
        right = mid;
      }
    }

    // 给每个队列设置fair share 或 steady fair share
    for (Schedulable sched : schedulables) {
      if (isSteadyShare) {
        setResourceValue(computeShare(sched, right, type),
            ((FSQueue) sched).getSteadyFairShare(), type);
      } else {
        setResourceValue(
            computeShare(sched, right, type), sched.getFairShare(), type);
      }
    }
  }
           

handleFixedFairShares:

private static int handleFixedFairShares(
      Collection<? extends Schedulable> schedulables,
      Collection<Schedulable> nonFixedSchedulables,
      boolean isSteadyShare, ResourceType type) {
    // 所有队列资源总和
    int totalResource = 0;

    for (Schedulable sched : schedulables) {
      // 若maxShare或者weight配置为0,这个队列在任何时候都不会运行任何app,即固定队列,并且分配给他的fair share或instaneous fair share都为0
      // 若计算instaneous fair share,且队列没有app运行,那么,这个队列的instaneous fair share是0,并且这个队列被判定为fix sheduler,所以这个队列不再参与instaneous fair share的计算
      // 若计算的steady fair share,steady fair share值只和该队列的min max配置有关,和是否有app正在运行无关
      int fixedShare = getFairShareIfFixed(sched, isSteadyShare, type);
      if (fixedShare < 0) {
        nonFixedSchedulables.add(sched);
      } else {
        // 若isSteadyShare=true,即steady fairshares,则将其steady fair share设置为fixedShare
        // 若isSteadyShare=false,即instaneous fair share,则将instaneous fair share设置为fixedShare
        setResourceValue(fixedShare,
            isSteadyShare
                ? ((FSQueue)sched).getSteadyFairShare()
                : sched.getFairShare(),
            type);
        totalResource = (int) Math.min((long)totalResource + (long)fixedShare,
            Integer.MAX_VALUE);
      }
    }
    return totalResource;
  }
           

resourceUsedWithWeightToResourceRatio:

private static int resourceUsedWithWeightToResourceRatio(double w2rRatio,
      Collection<? extends Schedulable> schedulables, ResourceType type) {
    long resourcesTaken = 0;
    // 对每个队列计算share,再求和
    for (Schedulable sched : schedulables) {
      int share = computeShare(sched, w2rRatio, type);
      resourcesTaken += share;
    }
    return (int)Math.min(resourcesTaken, Integer.MAX_VALUE);
  }
           

computeShare:

private static int computeShare(Schedulable sched, double w2rRatio,
      ResourceType type) {
    // 根据总share*权重计算出该队列share值,并控制在minShare和maxShare之间
    double share = sched.getWeights().getWeight(type) * w2rRatio;
    share = Math.max(share, getResourceValue(sched.getMinShare(), type));
    share = Math.min(share, getResourceValue(sched.getMaxShare(), type));
    return (int) share;
  }
           

三、SLS性能测试

根据jmx采集的指标,加上SLS可以针对RM进行性能评估。

具体暂时先不介绍了,以后空了再写。

官方文档 YARN Scheduler Load Simulator

四、Metrics采集

FSOpDuration
{
  "beans" : [ {
    "name" : "Hadoop:service=ResourceManager,name=FSOpDurations",
    "modelerType" : "FSOpDurations",
    "tag.FSOpDurations" : "FSOpDurations",
    "tag.Context" : "fairscheduler-op-durations",
    "tag.Hostname" : "host",
    "ContinuousSchedulingRunNumOps" : 0,
    "ContinuousSchedulingRunAvgTime" : 0.0,
    "ContinuousSchedulingRunStdevTime" : 0.0,
    "ContinuousSchedulingRunIMinTime" : 3.4028234663852886E38,
    "ContinuousSchedulingRunIMaxTime" : 1.401298464324817E-45,
    "ContinuousSchedulingRunMinTime" : 3.4028234663852886E38,
    "ContinuousSchedulingRunMaxTime" : 1.401298464324817E-45,
    // 处理NodeUpdate总次数
    "NodeUpdateCallNumOps" : 19230,
    // 一次采样周期内NodeUpdate平均耗时ms
    "NodeUpdateCallAvgTime" : 14.101851851851853,
    "NodeUpdateCallStdevTime" : 7.589392615725218,
    "NodeUpdateCallIMinTime" : 10.0,
    "NodeUpdateCallIMaxTime" : 113.0,
    "NodeUpdateCallMinTime" : 3.0,
    "NodeUpdateCallMaxTime" : 1403.0,
    // fair share计算总次数
    "UpdateThreadRunNumOps" : 115,
    // 一次采样周期内计算fair share平均耗时ms
    "UpdateThreadRunAvgTime" : 2350.0,
    "UpdateThreadRunStdevTime" : 110.30865786510141,
    "UpdateThreadRunIMinTime" : 2272.0,
    "UpdateThreadRunIMaxTime" : 2428.0,
    "UpdateThreadRunMinTime" : 4.0,
    "UpdateThreadRunMaxTime" : 6177.0,
    // 和上面差不多,这些指标统计更内层的方法,不包括ContinuousScheduling
    "UpdateCallNumOps" : 115,
    "UpdateCallAvgTime" : 2350.0,
    "UpdateCallStdevTime" : 110.30865786510141,
    "UpdateCallIMinTime" : 2272.0,
    "UpdateCallIMaxTime" : 2428.0,
    "UpdateCallMinTime" : 4.0,
    "UpdateCallMaxTime" : 6169.0,
    "PreemptCallNumOps" : 0,
    "PreemptCallAvgTime" : 0.0,
    "PreemptCallStdevTime" : 0.0,
    "PreemptCallIMinTime" : 3.4028234663852886E38,
    "PreemptCallIMaxTime" : 1.401298464324817E-45,
    "PreemptCallMinTime" : 3.4028234663852886E38,
    "PreemptCallMaxTime" : 1.401298464324817E-45,
    // 处理AssignContainer请求
    "AssignContainerCallNumOps" : 2860902,
    "AssignContainerCallAvgTime" : 31.927672432911855,
    "AssignContainerCallStdevTime" : 575.0777254854487,
    "AssignContainerCallIMinTime" : 16.0,
    "AssignContainerCallIMaxTime" : 100921.0,
    "AssignContainerCallMinTime" : 15.0,
    "AssignContainerCallMaxTime" : 1393653.0,
    // 处理CompletedContainer请求
    "CompletedContainerCallNumOps" : 1293897,
    "CompletedContainerCallAvgTime" : 26.40577603228669,
    "CompletedContainerCallStdevTime" : 22.917249699220484,
    "CompletedContainerCallIMinTime" : 14.0,
    "CompletedContainerCallIMaxTime" : 2026.0,
    "CompletedContainerCallMinTime" : 13.0,
    "CompletedContainerCallMaxTime" : 991449.0
  } ]
}
           

继续阅读