天天看点

scheduler优选算法中的map-reduce

查看k8s1.16 scheduler的代码中发现其中有这么一段代码:

pkg/scheduler/core/generic_scheduler.go:730

results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))

	// DEPRECATED: we can remove this when all priorityConfigs implement the
	// Map-Reduce pattern.
	for i := range priorityConfigs {
		if priorityConfigs[i].Function != nil {
			wg.Add(1)
			go func(index int) {
				defer wg.Done()
				var err error
				results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
				if err != nil {
					appendError(err)
				}
			}(i)
		} else {
			results[i] = make(schedulerapi.HostPriorityList, len(nodes))
		}
	}

	workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
		nodeInfo := nodeNameToInfo[nodes[index].Name]
		for i := range priorityConfigs {
			if priorityConfigs[i].Function != nil {
				continue
			}

			var err error
			results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
			if err != nil {
				appendError(err)
				results[i][index].Host = nodes[index].Name
			}
		}
	})
           

其中给node节点的打分有两种方式,一种是

results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
           

还有一种是:

results[i] = make(schedulerapi.HostPriorityList, len(nodes))
           

第一种方式是后面要被替换的,第二种方式是改进后的,通过map-reduce方式做优选打分。

看到这边心里有个疑问,为什么要改进优选算法的打分方式,第一种打分有什么劣势?

首先我们查看上古1.3版本中优选NodeAffinity算法的实现:

// CalculateNodeAffinityPriority prioritizes nodes according to node affinity scheduling preferences
// indicated in PreferredDuringSchedulingIgnoredDuringExecution. Each time a node match a preferredSchedulingTerm,
// it will a get an add of preferredSchedulingTerm.Weight. Thus, the more preferredSchedulingTerms
// the node satisfies and the more the preferredSchedulingTerm that is satisfied weights, the higher
// score the node gets.
func (s *NodeAffinity) CalculateNodeAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
	var maxCount int
	counts := map[string]int{}

	nodes, err := nodeLister.List()
	if err != nil {
		return nil, err
	}

	affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations)
	if err != nil {
		return nil, err
	}

	// A nil element of PreferredDuringSchedulingIgnoredDuringExecution matches no objects.
	// An element of PreferredDuringSchedulingIgnoredDuringExecution that refers to an
	// empty PreferredSchedulingTerm matches all objects.
	if affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
		// Match PreferredDuringSchedulingIgnoredDuringExecution term by term.
		for _, preferredSchedulingTerm := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
			if preferredSchedulingTerm.Weight == 0 {
				continue
			}

			nodeSelector, err := api.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)
			if err != nil {
				return nil, err
			}

			for _, node := range nodes.Items {
				if nodeSelector.Matches(labels.Set(node.Labels)) {
					counts[node.Name] += int(preferredSchedulingTerm.Weight)
				}

				if counts[node.Name] > maxCount {
					maxCount = counts[node.Name]
				}
			}
		}
	}

	result := []schedulerapi.HostPriority{}
	for _, node := range nodes.Items {
		fScore := float64(0)
		if maxCount > 0 {
			fScore = 10 * (float64(counts[node.Name]) / float64(maxCount))
		}
		result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
		glog.V(10).Infof("%v -> %v: NodeAffinityPriority, Score: (%d)", pod.Name, node.Name, int(fScore))
	}
	return result, nil
}
           

函数签名和第一种优选调度算法的打分方式一模一样,通过传入nodes对象for...range不同的nodes给每个节点打分,一个一个节点串行打分,不存在并发。

再看通过map给node节点打分:

workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
		nodeInfo := nodeNameToInfo[nodes[index].Name]
		for i := range priorityConfigs {
			if priorityConfigs[i].Function != nil {
				continue
			}

			var err error
			results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
			if err != nil {
				appendError(err)
				results[i][index].Host = nodes[index].Name
			}
		}
	})
           

通过workqueue.parallelizeUntil启动16个协程,每个协程给node节点打分。由于是并发,因此存在资源竞争问题,代码中是怎么规避的?我们看results这个对象的数据结构:

results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), 
           

是一个slice,每个元素为HostPriorityList对象,HostPriorityList是什么:

type HostPriority struct {
    Host string
    Score int
}
type HostPriorityList []HostPriority
           

因为参加优选的nodes长度固定,算法长度固定,因此使用了索引方式,写入二维数组:

results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
           

这样就不存在资源竞争,不用加锁。

通过map方式无锁化并行的给每个node节点打分可以提高调度器优选算法的吞吐量,因此这个改进比最初的实现要好很多。

参考:

https://www.cnblogs.com/buyicoding/p/12206536.html

http://www.mianquan.net/tutorial/k8s-source-code-analysis/core-scheduler-priority.md

继续阅读