Skip to content
This repository was archived by the owner on Oct 22, 2024. It is now read-only.

Commit ececede

Browse files
committed
pmem-csi-driver: ListVolumes pagination should reflect intermediate changes
ListVolumes pagination should handle the concurrency cases where volumes added/deleted while serving. We keep a array copy of the current volumes at initial ListVolumes call, and keep update(append) the list with newly added volumes. While preparing the ListVolumes entry, we check the next array element in the copy is valid, else we skip that element. At we serve all the elements in the copy then we clear the copy.
1 parent f6bf5a4 commit ececede

File tree

1 file changed

+30
-17
lines changed

1 file changed

+30
-17
lines changed

pkg/pmem-csi-driver/controllerserver-master.go

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ type pmemVolume struct {
6060
type masterController struct {
6161
*DefaultControllerServer
6262
rs *registryserver.RegistryServer
63-
pmemVolumes map[string]*pmemVolume //map of reqID:pmemVolume
63+
pmemVolumes map[string]*pmemVolume // map of reqID:pmemVolume
64+
volListCopy []*pmemVolume // Copy of pmemVolumes being server by ListVolumes
6465
mutex sync.Mutex // mutex for pmemVolumes
6566
}
6667

@@ -259,6 +260,10 @@ func (cs *masterController) CreateVolume(ctx context.Context, req *csi.CreateVol
259260
cs.mutex.Lock()
260261
defer cs.mutex.Unlock()
261262
cs.pmemVolumes[volumeID] = vol
263+
// append to ListVolumes copy
264+
if cs.volListCopy != nil {
265+
cs.volListCopy = append(cs.volListCopy, vol)
266+
}
262267
klog.V(3).Infof("Controller CreateVolume: Record new volume as %v", *vol)
263268
}
264269

@@ -373,16 +378,18 @@ func (cs *masterController) ListVolumes(ctx context.Context, req *csi.ListVolume
373378
cs.mutex.Lock()
374379
defer cs.mutex.Unlock()
375380

376-
// Copy from map into array for pagination.
377-
vols := make([]*pmemVolume, 0, len(cs.pmemVolumes))
378-
for _, vol := range cs.pmemVolumes {
379-
vols = append(vols, vol)
381+
if cs.volListCopy == nil {
382+
// Copy from map into array for pagination.
383+
cs.volListCopy = make([]*pmemVolume, 0, len(cs.pmemVolumes))
384+
for _, vol := range cs.pmemVolumes {
385+
cs.volListCopy = append(cs.volListCopy, vol)
386+
}
380387
}
381388

382389
// Code originally copied from https://github.com/kubernetes-csi/csi-test/blob/f14e3d32125274e0c3a3a5df380e1f89ff7c132b/mock/service/controller.go#L309-L365
383390

384391
var (
385-
ulenVols = int32(len(vols))
392+
ulenVols = int32(len(cs.volListCopy))
386393
maxEntries = req.MaxEntries
387394
startingToken int32
388395
)
@@ -415,27 +422,33 @@ func (cs *masterController) ListVolumes(ctx context.Context, req *csi.ListVolume
415422
}
416423

417424
var (
418-
i int
425+
i int32
419426
j = startingToken
420-
entries = make(
421-
[]*csi.ListVolumesResponse_Entry,
422-
maxEntries)
427+
entries = make([]*csi.ListVolumesResponse_Entry, 0, maxEntries)
423428
)
424429

425-
for i = 0; i < len(entries); i++ {
426-
vol := vols[j]
427-
entries[i] = &csi.ListVolumesResponse_Entry{
430+
for i < maxEntries && j < ulenVols {
431+
vol := cs.volListCopy[j]
432+
j++
433+
// validate if the volume still exists
434+
if cs.pmemVolumes[vol.id] == nil {
435+
// volume might have deleted
436+
continue
437+
}
438+
entries = append(entries, &csi.ListVolumesResponse_Entry{
428439
Volume: &csi.Volume{
429440
VolumeId: vol.id,
430441
CapacityBytes: vol.size,
431442
},
432-
}
433-
j++
443+
})
444+
i++
434445
}
435446

436447
var nextToken string
437-
if n := startingToken + int32(i); n < ulenVols {
438-
nextToken = fmt.Sprintf("%d", n)
448+
if j < ulenVols {
449+
nextToken = fmt.Sprintf("%d", j)
450+
} else {
451+
cs.volListCopy = nil
439452
}
440453

441454
return &csi.ListVolumesResponse{

0 commit comments

Comments
 (0)