Skip to content

Commit

Permalink
feat: change binder to use public functions from pkg/plugins and to …
Browse files Browse the repository at this point in the history
…take NMNodes into account when checking
  • Loading branch information
slipegg committed Sep 28, 2024
1 parent 402679c commit e0e8153
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 177 deletions.
60 changes: 30 additions & 30 deletions pkg/binder/framework/plugins/interpodaffinity/interpodaffinity.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/kubewharf/godel-scheduler/pkg/binder/framework/handle"
framework "github.com/kubewharf/godel-scheduler/pkg/framework/api"
interpodScheduler "github.com/kubewharf/godel-scheduler/pkg/scheduler/framework/plugins/interpodaffinity"
utils "github.com/kubewharf/godel-scheduler/pkg/plugins/interpodaffinity"
podutil "github.com/kubewharf/godel-scheduler/pkg/util/pod"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -52,33 +52,33 @@ func (pl *InterPodAffinity) CheckConflicts(_ context.Context, cycleState *framew
return framework.NewStatus(framework.Error, err.Error())
}
topologyLabels := nodeInfo.GetNodeLabels(podLauncher)
matchedNodeInfos, err := pl.getNodesWithSameTopologyLabels(topologyLabels)
matchedNodeInfos, err := pl.getNodesWithSameTopologyLabels(topologyLabels, podLauncher)
if err != nil {
return framework.NewStatus(framework.Unschedulable, ErrorReasonWhenFilterNodeWithSameTopology)
}

existingPodAntiAffinityMap := interpodScheduler.GetTPMapMatchingExistingAntiAffinity(pod, matchedNodeInfos, podLauncher)
existingPodAntiAffinityMap := utils.GetTPMapMatchingExistingAntiAffinity(pod, matchedNodeInfos, podLauncher)

podInfo := framework.NewPodInfo(pod)
incomingPodAffinityMap, incomingPodAntiAffinityMap := interpodScheduler.GetTPMapMatchingIncomingAffinityAntiAffinity(podInfo, matchedNodeInfos, podLauncher)
incomingPodAffinityMap, incomingPodAntiAffinityMap := utils.GetTPMapMatchingIncomingAffinityAntiAffinity(podInfo, matchedNodeInfos, podLauncher)

state := &interpodScheduler.PreFilterState{
state := &utils.PreFilterState{
TopologyToMatchedExistingAntiAffinityTerms: existingPodAntiAffinityMap,
TopologyToMatchedAffinityTerms: incomingPodAffinityMap,
TopologyToMatchedAntiAffinityTerms: incomingPodAntiAffinityMap,
PodInfo: podInfo,
}

if !interpodScheduler.SatisfyPodAffinity(state, nodeInfo, podLauncher) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, interpodScheduler.ErrReasonAffinityNotMatch, interpodScheduler.ErrReasonAffinityRulesNotMatch)
if !utils.SatisfyPodAffinity(state, nodeInfo, podLauncher) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, utils.ErrReasonAffinityNotMatch, utils.ErrReasonAffinityRulesNotMatch)
}

if !interpodScheduler.SatisfyPodAntiAffinity(state, nodeInfo, podLauncher) {
return framework.NewStatus(framework.Unschedulable, interpodScheduler.ErrReasonAffinityNotMatch, interpodScheduler.ErrReasonAntiAffinityRulesNotMatch)
if !utils.SatisfyPodAntiAffinity(state, nodeInfo, podLauncher) {
return framework.NewStatus(framework.Unschedulable, utils.ErrReasonAffinityNotMatch, utils.ErrReasonAntiAffinityRulesNotMatch)
}

if !interpodScheduler.SatisfyExistingPodsAntiAffinity(state, nodeInfo, podLauncher) {
return framework.NewStatus(framework.Unschedulable, interpodScheduler.ErrReasonAffinityNotMatch, interpodScheduler.ErrReasonExistingAntiAffinityRulesNotMatch)
if !utils.SatisfyExistingPodsAntiAffinity(state, nodeInfo, podLauncher) {
return framework.NewStatus(framework.Unschedulable, utils.ErrReasonAffinityNotMatch, utils.ErrReasonExistingAntiAffinityRulesNotMatch)
}

return nil
Expand All @@ -90,35 +90,35 @@ func New(_ runtime.Object, handle handle.BinderFrameworkHandle) (framework.Plugi
}, nil
}

func (pl *InterPodAffinity) getNodesWithSameTopologyLabels(topologyLabels map[string]string) ([]framework.NodeInfo, error) {
nodeLister := pl.frameworkHandle.SharedInformerFactory().Core().V1().Nodes().Lister()

func (pl *InterPodAffinity) getNodesWithSameTopologyLabels(topologyLabels map[string]string, podLauncher podutil.PodLauncher) ([]framework.NodeInfo, error) {
var matchedNodeInfos []framework.NodeInfo
nodeSet := make(map[string]*v1.Node) // Used to remove duplicates
nodeInfoSet := make(map[string]framework.NodeInfo) // Used to remove duplicates

// 针对每个 label key-value 进行筛选,并合并结果
for key, value := range topologyLabels {
selector := labels.NewSelector()

// 为每个 label key-value 创建一个筛选条件
requirement, _ := labels.NewRequirement(key, selection.Equals, []string{value})
selector = selector.Add(*requirement)

// 获取符合条件的节点
nodes, err := nodeLister.List(selector)
if err != nil {
return nil, fmt.Errorf("failed to list nodes for selector %s: %v", selector.String(), err)
}

// 将筛选结果加入到 nodeSet 中,确保不重复添加节点
for _, node := range nodes {
nodeSet[node.Name] = node
if podLauncher == podutil.Kubelet {
nodes, err := pl.frameworkHandle.SharedInformerFactory().Core().V1().Nodes().Lister().List(selector)
if err != nil {
return nil, fmt.Errorf("failed to list nodes for selector %s: %v", selector.String(), err)
}
for _, node := range nodes {
nodeInfoSet[node.Name] = pl.frameworkHandle.GetNodeInfo(node.Name)
}
} else {
nodes, err := pl.frameworkHandle.CRDSharedInformerFactory().Node().V1alpha1().NMNodes().Lister().List(selector)
if err != nil {
return nil, fmt.Errorf("failed to list nodes for selector %s: %v", selector.String(), err)
}
for _, node := range nodes {
nodeInfoSet[node.Name] = pl.frameworkHandle.GetNodeInfo(node.Name)
}
}
}

// 将去重后的节点列表转为切片
for _, node := range nodeSet {
nodeInfo := pl.frameworkHandle.GetNodeInfo(node.Name)
for _, nodeInfo := range nodeInfoSet {
matchedNodeInfos = append(matchedNodeInfos, nodeInfo)
}

Expand Down
Loading

0 comments on commit e0e8153

Please sign in to comment.