Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rbd: protect against concurrent gRPC calls #92

Merged
merged 1 commit into from
Oct 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions pkg/rbd/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
}

volumeNameMutex.LockKey(req.GetName())
defer volumeNameMutex.UnlockKey(req.GetName())

// Need to check for already existing volume name, and if found
// check for the requested capacity and already allocated capacity
if exVol, err := getRBDVolumeByName(req.GetName()); err == nil {
Expand Down Expand Up @@ -156,6 +159,8 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
}
// For now the image get unconditionally deleted, but here retention policy can be checked
volumeID := req.GetVolumeId()
volumeIDMutex.LockKey(volumeID)
defer volumeIDMutex.UnlockKey(volumeID)
rbdVol := &rbdVolume{}
if err := loadVolInfo(volumeID, path.Join(PluginFolder, "controller"), rbdVol); err != nil {
if os.IsNotExist(errors.Cause(err)) {
Expand All @@ -174,8 +179,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
}
// Removing persistent storage file for the unmapped volume
if err := deleteVolInfo(volumeID, path.Join(PluginFolder, "controller")); err != nil {
// TODO: we can theoretically end up here when two DeleteVolume calls
// get invoked concurrently. Serialize?
return nil, err
}

Expand Down Expand Up @@ -214,6 +217,9 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
return nil, status.Error(codes.InvalidArgument, "Source Volume ID cannot be empty")
}

snapshotNameMutex.LockKey(req.GetName())
defer snapshotNameMutex.UnlockKey(req.GetName())

// Need to check for already existing snapshot name, and if found
// check for the requested source volume id and already allocated source volume id
if exSnap, err := getRBDSnapshotByName(req.GetName()); err == nil {
Expand Down Expand Up @@ -332,6 +338,9 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
if len(snapshotID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Snapshot ID cannot be empty")
}
snapshotIDMutex.LockKey(snapshotID)
defer snapshotIDMutex.UnlockKey(snapshotID)

rbdSnap := &rbdSnapshot{}
if err := loadSnapInfo(snapshotID, path.Join(PluginFolder, "controller-snap"), rbdSnap); err != nil {
return nil, err
Expand Down Expand Up @@ -368,6 +377,7 @@ func (cs *controllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap
sourceVolumeId := req.GetSourceVolumeId()

// TODO (sngchlko) list with token
// TODO (#94) protect concurrent access to global data structures

// list only a specific snapshot which has snapshot ID
if snapshotID := req.GetSnapshotId(); len(snapshotID) != 0 {
Expand Down
5 changes: 5 additions & 0 deletions pkg/rbd/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
s := strings.Split(strings.TrimSuffix(targetPath, "/mount"), "/")
volName := s[len(s)-1]

targetPathMutex.LockKey(targetPath)
defer targetPathMutex.UnlockKey(targetPath)

notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
if err != nil {
if os.IsNotExist(err) {
Expand Down Expand Up @@ -97,6 +100,8 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis

func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
targetPath := req.GetTargetPath()
targetPathMutex.LockKey(targetPath)
defer targetPathMutex.UnlockKey(targetPath)

notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions pkg/rbd/rbd_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,19 @@ type rbdSnapshot struct {
}

var (
// serializes operations based on "<rbd pool>/<rbd image>" as key
attachdetachMutex = keymutex.NewKeyMutex()
// serializes operations based on "volume name" as key
volumeNameMutex = keymutex.NewKeyMutex()
// serializes operations based on "volume id" as key
volumeIDMutex = keymutex.NewKeyMutex()
// serializes operations based on "snapshot name" as key
snapshotNameMutex = keymutex.NewKeyMutex()
// serializes operations based on "snapshot id" as key
snapshotIDMutex = keymutex.NewKeyMutex()
// serializes operations based on "mount target path" as key
targetPathMutex = keymutex.NewKeyMutex()

supportedFeatures = sets.NewString("layering")
)

Expand Down