Skip to content

Commit ef9d420

Browse files
committed
DRA: integrate BasicClusterSnapshot with the DRA snapshot
1 parent 0a11e9c commit ef9d420

File tree

3 files changed

+184
-24
lines changed

3 files changed

+184
-24
lines changed

cluster-autoscaler/dynamicresources/resource_claim_utils.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2525
"k8s.io/apimachinery/pkg/types"
2626
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
27+
"k8s.io/component-helpers/scheduling/corev1"
2728
"k8s.io/utils/ptr"
2829
)
2930

@@ -60,6 +61,20 @@ func ClaimReservedForPod(claim *resourceapi.ResourceClaim, pod *apiv1.Pod) bool
6061
return false
6162
}
6263

64+
// ClaimAvailableOnNode returns whether the provided claim is allocated and available on the provided Node.
65+
func ClaimAvailableOnNode(claim *resourceapi.ResourceClaim, node *apiv1.Node) (bool, error) {
66+
if !ClaimAllocated(claim) {
67+
// Not allocated so not available anywhere.
68+
return false, nil
69+
}
70+
selector := claim.Status.Allocation.NodeSelector
71+
if selector == nil {
72+
// nil means available everywhere.
73+
return true, nil
74+
}
75+
return corev1.MatchNodeSelectorTerms(node, claim.Status.Allocation.NodeSelector)
76+
}
77+
6378
// DeallocateClaimInPlace clears the allocation of the provided claim.
6479
func DeallocateClaimInPlace(claim *resourceapi.ResourceClaim) {
6580
claim.Status.Allocation = nil

cluster-autoscaler/dynamicresources/snapshot.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
apiv1 "k8s.io/api/core/v1"
2323
resourceapi "k8s.io/api/resource/v1alpha3"
24+
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
2425
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
2526
)
2627

@@ -57,6 +58,42 @@ func (s Snapshot) DeviceClasses() schedulerframework.DeviceClassLister {
5758
return snapshotClassLister(s)
5859
}
5960

61+
// WrapSchedulerNodeInfo wraps the provided *schedulerframework.NodeInfo into an internal *framework.NodeInfo, adding
62+
// dra information. Node-local ResourceSlices are added to the NodeInfo, and all ResourceClaims referenced by each Pod
63+
// are added to each PodInfo. Returns an error if any of the Pods is missing a ResourceClaim.
64+
func (s Snapshot) WrapSchedulerNodeInfo(schedNodeInfo *schedulerframework.NodeInfo) (*framework.NodeInfo, error) {
65+
var pods []*framework.PodInfo
66+
for _, podInfo := range schedNodeInfo.Pods {
67+
podClaims, err := s.PodClaims(podInfo.Pod)
68+
if err != nil {
69+
return nil, err
70+
}
71+
pods = append(pods, &framework.PodInfo{
72+
Pod: podInfo.Pod,
73+
NeededResourceClaims: podClaims,
74+
})
75+
}
76+
nodeSlices, _ := s.NodeResourceSlices(schedNodeInfo.Node())
77+
return &framework.NodeInfo{
78+
NodeInfo: schedNodeInfo,
79+
LocalResourceSlices: nodeSlices,
80+
Pods: pods,
81+
}, nil
82+
}
83+
84+
// WrapSchedulerNodeInfos calls WrapSchedulerNodeInfo() on a list of *schedulerframework.NodeInfos.
85+
func (s Snapshot) WrapSchedulerNodeInfos(schedNodeInfos []*schedulerframework.NodeInfo) ([]*framework.NodeInfo, error) {
86+
var result []*framework.NodeInfo
87+
for _, schedNodeInfo := range schedNodeInfos {
88+
nodeInfo, err := s.WrapSchedulerNodeInfo(schedNodeInfo)
89+
if err != nil {
90+
return nil, err
91+
}
92+
result = append(result, nodeInfo)
93+
}
94+
return result, nil
95+
}
96+
6097
// Clone returns a copy of this Snapshot that can be independently modified without affecting this Snapshot.
6198
// The only mutable objects in the Snapshot are ResourceClaims, so they are deep-copied. The rest is only a
6299
// shallow copy.

cluster-autoscaler/simulator/clustersnapshot/basic.go

Lines changed: 132 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ limitations under the License.
1717
package clustersnapshot
1818

1919
import (
20+
"context"
2021
"fmt"
2122

2223
apiv1 "k8s.io/api/core/v1"
24+
resourceapi "k8s.io/api/resource/v1alpha3"
2325
"k8s.io/autoscaler/cluster-autoscaler/dynamicresources"
2426
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
2527
"k8s.io/klog/v2"
@@ -35,6 +37,7 @@ type BasicClusterSnapshot struct {
3537
}
3638

3739
type internalBasicSnapshotData struct {
40+
owningSnapshot *BasicClusterSnapshot
3841
nodeInfoMap map[string]*schedulerframework.NodeInfo
3942
pvcNamespacePodMap map[string]map[string]bool
4043
draSnapshot dynamicresources.Snapshot
@@ -122,8 +125,9 @@ func (data *internalBasicSnapshotData) removePvcUsedByPod(pod *apiv1.Pod) {
122125
}
123126
}
124127

125-
func newInternalBasicSnapshotData() *internalBasicSnapshotData {
128+
func newInternalBasicSnapshotData(owningSnapshot *BasicClusterSnapshot) *internalBasicSnapshotData {
126129
return &internalBasicSnapshotData{
130+
owningSnapshot: owningSnapshot,
127131
nodeInfoMap: make(map[string]*schedulerframework.NodeInfo),
128132
pvcNamespacePodMap: make(map[string]map[string]bool),
129133
draSnapshot: dynamicresources.Snapshot{},
@@ -143,60 +147,145 @@ func (data *internalBasicSnapshotData) clone() *internalBasicSnapshotData {
143147
}
144148
}
145149
return &internalBasicSnapshotData{
150+
owningSnapshot: data.owningSnapshot,
146151
nodeInfoMap: clonedNodeInfoMap,
147152
pvcNamespacePodMap: clonedPvcNamespaceNodeMap,
148153
draSnapshot: data.draSnapshot.Clone(),
149154
}
150155
}
151156

152-
func (data *internalBasicSnapshotData) addNode(node *apiv1.Node) error {
157+
func (data *internalBasicSnapshotData) addNode(node *apiv1.Node, extraSlices []*resourceapi.ResourceSlice) error {
153158
if _, found := data.nodeInfoMap[node.Name]; found {
154159
return fmt.Errorf("node %s already in snapshot", node.Name)
155160
}
161+
162+
if data.owningSnapshot.draEnabled && len(extraSlices) > 0 {
163+
// We need to add extra ResourceSlices to the DRA snapshot. The DRA snapshot should contain all real slices after Initialize(),
164+
// so these should be fake node-local slices for a fake duplicated Node.
165+
err := data.draSnapshot.AddNodeResourceSlices(node.Name, extraSlices)
166+
if err != nil {
167+
return fmt.Errorf("couldn't add ResourceSlices to DRA snapshot: %v", err)
168+
}
169+
}
170+
156171
nodeInfo := schedulerframework.NewNodeInfo()
157172
nodeInfo.SetNode(node)
158173
data.nodeInfoMap[node.Name] = nodeInfo
159174
return nil
160175
}
161176

162177
func (data *internalBasicSnapshotData) removeNodeInfo(nodeName string) error {
163-
if _, found := data.nodeInfoMap[nodeName]; !found {
178+
nodeInfo, found := data.nodeInfoMap[nodeName]
179+
if !found {
164180
return ErrNodeNotFound
165181
}
166-
for _, pod := range data.nodeInfoMap[nodeName].Pods {
182+
for _, pod := range nodeInfo.Pods {
167183
data.removePvcUsedByPod(pod.Pod)
184+
if data.owningSnapshot.draEnabled {
185+
data.draSnapshot.RemovePodClaims(pod.Pod)
186+
}
168187
}
169188
delete(data.nodeInfoMap, nodeName)
189+
if data.owningSnapshot.draEnabled {
190+
data.draSnapshot.RemoveNodeResourceSlices(nodeName)
191+
}
170192
return nil
171193
}
172194

173-
func (data *internalBasicSnapshotData) schedulePod(pod *apiv1.Pod, nodeName string) error {
174-
if _, found := data.nodeInfoMap[nodeName]; !found {
195+
func (data *internalBasicSnapshotData) schedulePod(pod *apiv1.Pod, nodeName string, reserveState *schedulerframework.CycleState, extraClaims []*resourceapi.ResourceClaim) error {
196+
nodeInfo, found := data.nodeInfoMap[nodeName]
197+
if !found {
175198
return ErrNodeNotFound
176199
}
177-
data.nodeInfoMap[nodeName].AddPod(pod)
200+
201+
if needAllocatedClaims := len(pod.Spec.ResourceClaims); needAllocatedClaims > 0 && data.owningSnapshot.draEnabled {
202+
err := data.handlePodClaimsScheduling(pod, nodeInfo.Node(), reserveState, extraClaims)
203+
if err != nil {
204+
return err
205+
}
206+
}
207+
208+
nodeInfo.AddPod(pod)
178209
data.addPvcUsedByPod(pod)
179210
return nil
180211
}
181212

213+
func (data *internalBasicSnapshotData) handlePodClaimsScheduling(pod *apiv1.Pod, node *apiv1.Node, reserveState *schedulerframework.CycleState, extraClaims []*resourceapi.ResourceClaim) error {
214+
if len(extraClaims) > 0 {
215+
// We need to add some extra ResourceClaims to the DRA snapshot. The DRA snapshot should contain all real claims after Initialize(),
216+
// so these should be fake pod-owned claims for a fake duplicated pod.
217+
err := data.draSnapshot.AddClaims(extraClaims)
218+
if err != nil {
219+
return fmt.Errorf("error while adding allocated ResosurceClaims for pod %s/%s: %v", pod.Namespace, pod.Name, err)
220+
}
221+
}
222+
if reserveState != nil {
223+
// We need to run the scheduler Reserve phase to allocate the appropriate ResourceClaims in the DRA snapshot. The allocations are
224+
// actually computed and cached in the Filter phase, and Reserve only grabs them from the cycle state. So this should be quick, but
225+
// it needs the cycle state from after running the Filter phase.
226+
err := data.owningSnapshot.runReserveNodeForPod(pod, reserveState, node.Name)
227+
if err != nil {
228+
return fmt.Errorf("error while trying to run Reserve node %s for pod %s/%s: %v", node.Name, pod.Namespace, pod.Name, err)
229+
}
230+
}
231+
232+
// The pod isn't added to the ReservedFor field of the claim during the Reserve phase (it happens later, in PreBind). We can just do it
233+
// manually here. It shouldn't fail, it only fails if ReservedFor is at max length already, but that is checked during the Filter phase.
234+
err := data.draSnapshot.ReservePodClaims(pod)
235+
if err != nil {
236+
return fmt.Errorf("couldnn't add pod reservations to claims, this shouldn't happen: %v", err)
237+
}
238+
239+
// Verify that all needed claims are tracked in the DRA snapshot, allocated, and available on the Node.
240+
claims, err := data.draSnapshot.PodClaims(pod)
241+
if err != nil {
242+
return fmt.Errorf("couldn't obtain pod %s/%s claims: %v", pod.Namespace, pod.Name, err)
243+
}
244+
for _, claim := range claims {
245+
if available, err := dynamicresources.ClaimAvailableOnNode(claim, node); err != nil || !available {
246+
return fmt.Errorf("pod %s/%s needs claim %s to schedule, but it isn't available on node %s (allocated: %v, available: %v, err: %v)", pod.Namespace, pod.Name, claim.Name, node.Name, dynamicresources.ClaimAllocated(claim), available, err)
247+
}
248+
}
249+
return nil
250+
}
251+
182252
func (data *internalBasicSnapshotData) unschedulePod(namespace, podName, nodeName string) error {
183253
nodeInfo, found := data.nodeInfoMap[nodeName]
184254
if !found {
185255
return ErrNodeNotFound
186256
}
187-
logger := klog.Background()
257+
var foundPod *apiv1.Pod
188258
for _, podInfo := range nodeInfo.Pods {
189259
if podInfo.Pod.Namespace == namespace && podInfo.Pod.Name == podName {
190-
data.removePvcUsedByPod(podInfo.Pod)
191-
err := nodeInfo.RemovePod(logger, podInfo.Pod)
192-
if err != nil {
193-
data.addPvcUsedByPod(podInfo.Pod)
194-
return fmt.Errorf("cannot remove pod; %v", err)
195-
}
196-
return nil
260+
foundPod = podInfo.Pod
261+
break
197262
}
198263
}
199-
return fmt.Errorf("pod %s/%s not in snapshot", namespace, podName)
264+
265+
if foundPod == nil {
266+
return fmt.Errorf("pod %s/%s not in snapshot", namespace, podName)
267+
}
268+
269+
data.removePvcUsedByPod(foundPod)
270+
271+
logger := klog.Background()
272+
err := nodeInfo.RemovePod(logger, foundPod)
273+
if err != nil {
274+
data.addPvcUsedByPod(foundPod)
275+
return fmt.Errorf("cannot remove pod; %v", err)
276+
}
277+
278+
if len(foundPod.Spec.ResourceClaims) == 0 || !data.owningSnapshot.draEnabled {
279+
return nil
280+
}
281+
282+
err = data.draSnapshot.UnreservePodClaims(foundPod)
283+
if err != nil {
284+
nodeInfo.AddPod(foundPod)
285+
data.addPvcUsedByPod(foundPod)
286+
return fmt.Errorf("cannot unreserve pod's dynamic requests: %v", err)
287+
}
288+
return nil
200289
}
201290

202291
// NewBasicClusterSnapshot creates instances of BasicClusterSnapshot.
@@ -210,25 +299,44 @@ func (snapshot *BasicClusterSnapshot) getInternalData() *internalBasicSnapshotDa
210299
return snapshot.data[len(snapshot.data)-1]
211300
}
212301

302+
func (snapshot *BasicClusterSnapshot) runReserveNodeForPod(pod *apiv1.Pod, preReserveCycleState *schedulerframework.CycleState, nodeName string) error {
303+
snapshot.fwHandle.DelegatingLister.UpdateDelegate(snapshot)
304+
defer snapshot.fwHandle.DelegatingLister.ResetDelegate()
305+
306+
status := snapshot.fwHandle.Framework.RunReservePluginsReserve(context.Background(), preReserveCycleState, pod, nodeName)
307+
if !status.IsSuccess() {
308+
return fmt.Errorf("couldn't reserve node %s for pod %s/%s: %v", nodeName, pod.Namespace, pod.Name, status.Message())
309+
}
310+
return nil
311+
}
312+
213313
func (snapshot *BasicClusterSnapshot) GetNodeInfo(nodeName string) (*framework.NodeInfo, error) {
214-
schedNodeInfo, err := snapshot.getInternalData().getNodeInfo(nodeName)
314+
data := snapshot.getInternalData()
315+
schedNodeInfo, err := data.getNodeInfo(nodeName)
215316
if err != nil {
216317
return nil, err
217318
}
319+
if snapshot.draEnabled {
320+
return data.draSnapshot.WrapSchedulerNodeInfo(schedNodeInfo)
321+
}
218322
return framework.WrapSchedulerNodeInfo(schedNodeInfo), nil
219323
}
220324

221325
func (snapshot *BasicClusterSnapshot) ListNodeInfos() ([]*framework.NodeInfo, error) {
222-
schedNodeInfos := snapshot.getInternalData().listNodeInfos()
326+
data := snapshot.getInternalData()
327+
schedNodeInfos := data.listNodeInfos()
328+
if snapshot.draEnabled {
329+
return data.draSnapshot.WrapSchedulerNodeInfos(schedNodeInfos)
330+
}
223331
return framework.WrapSchedulerNodeInfos(schedNodeInfos), nil
224332
}
225333

226334
func (snapshot *BasicClusterSnapshot) AddNodeInfo(nodeInfo *framework.NodeInfo) error {
227-
if err := snapshot.getInternalData().addNode(nodeInfo.Node()); err != nil {
335+
if err := snapshot.getInternalData().addNode(nodeInfo.Node(), nodeInfo.LocalResourceSlices); err != nil {
228336
return err
229337
}
230338
for _, podInfo := range nodeInfo.Pods {
231-
if err := snapshot.getInternalData().schedulePod(podInfo.Pod, nodeInfo.Node().Name); err != nil {
339+
if err := snapshot.getInternalData().schedulePod(podInfo.Pod, nodeInfo.Node().Name, nil, podInfo.NeededResourceClaims); err != nil {
232340
return err
233341
}
234342
}
@@ -245,14 +353,14 @@ func (snapshot *BasicClusterSnapshot) Initialize(nodes []*apiv1.Node, scheduledP
245353

246354
knownNodes := make(map[string]bool)
247355
for _, node := range nodes {
248-
if err := baseData.addNode(node); err != nil {
356+
if err := baseData.addNode(node, nil); err != nil {
249357
return err
250358
}
251359
knownNodes[node.Name] = true
252360
}
253361
for _, pod := range scheduledPods {
254362
if knownNodes[pod.Spec.NodeName] {
255-
if err := baseData.schedulePod(pod, pod.Spec.NodeName); err != nil {
363+
if err := baseData.schedulePod(pod, pod.Spec.NodeName, nil, nil); err != nil {
256364
return err
257365
}
258366
}
@@ -267,7 +375,7 @@ func (snapshot *BasicClusterSnapshot) RemoveNodeInfo(nodeName string) error {
267375

268376
// SchedulePod adds pod to the snapshot and schedules it to given node.
269377
func (snapshot *BasicClusterSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string, reserveState *schedulerframework.CycleState) error {
270-
return snapshot.getInternalData().schedulePod(pod, nodeName)
378+
return snapshot.getInternalData().schedulePod(pod, nodeName, reserveState, nil)
271379
}
272380

273381
// UnschedulePod removes pod from the snapshot.
@@ -306,7 +414,7 @@ func (snapshot *BasicClusterSnapshot) Commit() error {
306414

307415
// Clear reset cluster snapshot to empty, unforked state
308416
func (snapshot *BasicClusterSnapshot) Clear() {
309-
baseData := newInternalBasicSnapshotData()
417+
baseData := newInternalBasicSnapshotData(snapshot)
310418
snapshot.data = []*internalBasicSnapshotData{baseData}
311419
}
312420

0 commit comments

Comments
 (0)