Skip to content

Commit 2ecfad9

Browse files
committed
CA: extend SchedulerPluginRunner with RunReserveOnNode
RunReserveOnNode runs the Reserve phase of schedulerframework, which is necessary to obtain ResourceClaim allocations computed by the DRA scheduler plugin. RunReserveOnNode isn't used anywhere yet, so this should be a no-op.
1 parent 0dd0f88 commit 2ecfad9

File tree

3 files changed

+40
-20
lines changed

3 files changed

+40
-20
lines changed

cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshot clustersnapsh
4444
// function - until a Node where the Filters pass is found. Filters are only run for matching Nodes. If no matching Node with passing Filters is found, an error is returned.
4545
//
4646
// The node iteration always starts from the next Node from the last Node that was found by this method. TODO: Extract the iteration strategy out of SchedulerPluginRunner.
47-
func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) {
47+
func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (*apiv1.Node, *schedulerframework.CycleState, clustersnapshot.SchedulingError) {
4848
nodeInfosList, err := p.snapshot.ListNodeInfos()
4949
if err != nil {
50-
return "", clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error listing NodeInfos: %v", err))
50+
return nil, nil, clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error listing NodeInfos: %v", err))
5151
}
5252

5353
p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot)
@@ -61,7 +61,7 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM
6161
if !preFilterStatus.IsSuccess() {
6262
// If any of the plugin PreFilter methods isn't successful, the corresponding Filter method can't be run, so the whole scheduling cycle is aborted.
6363
// Match that behavior here.
64-
return "", clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
64+
return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
6565
}
6666

6767
for i := range nodeInfosList {
@@ -92,18 +92,18 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM
9292
if filterStatus.IsSuccess() {
9393
// Filter passed for all plugins, so this pod can be scheduled on this Node.
9494
p.lastIndex = (p.lastIndex + i + 1) % len(nodeInfosList)
95-
return nodeInfo.Node().Name, nil
95+
return nodeInfo.Node(), state, nil
9696
}
9797
// Filter didn't pass for some plugin, so this Node won't work - move on to the next one.
9898
}
99-
return "", clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod)
99+
return nil, nil, clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod)
100100
}
101101

102102
// RunFiltersOnNode runs the scheduler framework PreFilter and Filter phases to check if the given pod can be scheduled on the given node.
103-
func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError {
103+
func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) (*apiv1.Node, *schedulerframework.CycleState, clustersnapshot.SchedulingError) {
104104
nodeInfo, err := p.snapshot.GetNodeInfo(nodeName)
105105
if err != nil {
106-
return clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err))
106+
return nil, nil, clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err))
107107
}
108108

109109
p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot)
@@ -113,10 +113,10 @@ func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string
113113
// Run the PreFilter phase of the framework for the Pod and check the results. See the corresponding comments in RunFiltersUntilPassingNode() for more info.
114114
preFilterResult, preFilterStatus, _ := p.fwHandle.Framework.RunPreFilterPlugins(context.TODO(), state, pod)
115115
if !preFilterStatus.IsSuccess() {
116-
return clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
116+
return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
117117
}
118118
if !preFilterResult.AllNodes() && !preFilterResult.NodeNames.Has(nodeInfo.Node().Name) {
119-
return clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter filtered the Node out", "")
119+
return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter filtered the Node out", "")
120120
}
121121

122122
// Run the Filter phase of the framework for the Pod and the Node and check the results. See the corresponding comments in RunFiltersUntilPassingNode() for more info.
@@ -128,10 +128,22 @@ func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string
128128
if !filterStatus.IsRejected() {
129129
unexpectedErrMsg = fmt.Sprintf("unexpected filter status %q", filterStatus.Code().String())
130130
}
131-
return clustersnapshot.NewFailingPredicateError(pod, filterName, filterReasons, unexpectedErrMsg, p.failingFilterDebugInfo(filterName, nodeInfo))
131+
return nil, nil, clustersnapshot.NewFailingPredicateError(pod, filterName, filterReasons, unexpectedErrMsg, p.failingFilterDebugInfo(filterName, nodeInfo))
132132
}
133133

134134
// PreFilter and Filter phases checked, this Pod can be scheduled on this Node.
135+
return nodeInfo.Node(), state, nil
136+
}
137+
138+
// RunReserveOnNode runs the scheduler framework Reserve phase to update the scheduler plugins state to reflect the Pod being scheduled on the Node.
139+
func (p *SchedulerPluginRunner) RunReserveOnNode(pod *apiv1.Pod, nodeName string, postFilterState *schedulerframework.CycleState) error {
140+
p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot)
141+
defer p.fwHandle.DelegatingLister.ResetDelegate()
142+
143+
status := p.fwHandle.Framework.RunReservePluginsReserve(context.Background(), postFilterState, pod, nodeName)
144+
if !status.IsSuccess() {
145+
return fmt.Errorf("couldn't reserve node %s for pod %s/%s: %v", nodeName, pod.Namespace, pod.Name, status.Message())
146+
}
135147
return nil
136148
}
137149

cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,10 @@ func TestRunFiltersOnNode(t *testing.T) {
144144
err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...))
145145
assert.NoError(t, err)
146146

147-
predicateError := pluginRunner.RunFiltersOnNode(tt.testPod, tt.node.Name)
147+
node, state, predicateError := pluginRunner.RunFiltersOnNode(tt.testPod, tt.node.Name)
148148
if tt.expectError {
149+
assert.Nil(t, node)
150+
assert.Nil(t, state)
149151
assert.NotNil(t, predicateError)
150152
assert.Equal(t, clustersnapshot.FailingPredicateError, predicateError.Type())
151153
assert.Equal(t, "NodeResourcesFit", predicateError.FailingPredicateName())
@@ -154,6 +156,8 @@ func TestRunFiltersOnNode(t *testing.T) {
154156
assert.Contains(t, predicateError.Error(), "Insufficient cpu")
155157
} else {
156158
assert.Nil(t, predicateError)
159+
assert.NotNil(t, state)
160+
assert.Equal(t, tt.node, node)
157161
}
158162
})
159163
}
@@ -243,12 +247,15 @@ func TestRunFilterUntilPassingNode(t *testing.T) {
243247
err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(n2000))
244248
assert.NoError(t, err)
245249

246-
nodeName, err := pluginRunner.RunFiltersUntilPassingNode(tc.pod, func(info *framework.NodeInfo) bool { return true })
250+
node, state, err := pluginRunner.RunFiltersUntilPassingNode(tc.pod, func(info *framework.NodeInfo) bool { return true })
247251
if tc.expectError {
252+
assert.Nil(t, node)
253+
assert.Nil(t, state)
248254
assert.Error(t, err)
249255
} else {
250256
assert.NoError(t, err)
251-
assert.Contains(t, tc.expectedNodes, nodeName)
257+
assert.NotNil(t, state)
258+
assert.Contains(t, tc.expectedNodes, node.Name)
252259
}
253260
})
254261
}
@@ -278,7 +285,7 @@ func TestDebugInfo(t *testing.T) {
278285
err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1))
279286
assert.NoError(t, err)
280287

281-
predicateErr := defaultPluginnRunner.RunFiltersOnNode(p1, "n1")
288+
_, _, predicateErr := defaultPluginnRunner.RunFiltersOnNode(p1, "n1")
282289
assert.NotNil(t, predicateErr)
283290
assert.Contains(t, predicateErr.FailingPredicateReasons(), "node(s) had untolerated taint {SomeTaint: WhyNot?}")
284291
assert.Contains(t, predicateErr.Error(), "node(s) had untolerated taint {SomeTaint: WhyNot?}")
@@ -308,7 +315,7 @@ func TestDebugInfo(t *testing.T) {
308315
err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1))
309316
assert.NoError(t, err)
310317

311-
predicateErr = customPluginnRunner.RunFiltersOnNode(p1, "n1")
318+
_, _, predicateErr = customPluginnRunner.RunFiltersOnNode(p1, "n1")
312319
assert.Nil(t, predicateErr)
313320
}
314321

cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (s *PredicateSnapshot) RemoveNodeInfo(nodeName string) error {
7474

7575
// SchedulePod adds pod to the snapshot and schedules it to given node.
7676
func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError {
77-
if schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName); schedErr != nil {
77+
if _, _, schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName); schedErr != nil {
7878
return schedErr
7979
}
8080
if err := s.ClusterSnapshotStore.ForceAddPod(pod, nodeName); err != nil {
@@ -85,14 +85,14 @@ func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) cluster
8585

8686
// SchedulePodOnAnyNodeMatching adds pod to the snapshot and schedules it to any node matching the provided function.
8787
func (s *PredicateSnapshot) SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, anyNodeMatching func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) {
88-
nodeName, schedErr := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching)
88+
node, _, schedErr := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching)
8989
if schedErr != nil {
9090
return "", schedErr
9191
}
92-
if err := s.ClusterSnapshotStore.ForceAddPod(pod, nodeName); err != nil {
92+
if err := s.ClusterSnapshotStore.ForceAddPod(pod, node.Name); err != nil {
9393
return "", clustersnapshot.NewSchedulingInternalError(pod, err.Error())
9494
}
95-
return nodeName, nil
95+
return node.Name, nil
9696
}
9797

9898
// UnschedulePod removes the given Pod from the given Node inside the snapshot.
@@ -102,5 +102,6 @@ func (s *PredicateSnapshot) UnschedulePod(namespace string, podName string, node
102102

103103
// CheckPredicates checks whether scheduler predicates pass for the given pod on the given node.
104104
func (s *PredicateSnapshot) CheckPredicates(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError {
105-
return s.pluginRunner.RunFiltersOnNode(pod, nodeName)
105+
_, _, err := s.pluginRunner.RunFiltersOnNode(pod, nodeName)
106+
return err
106107
}

0 commit comments

Comments
 (0)