Skip to content

Commit

Permalink
rbd: add volume locks for reclaimspace operations
Browse files Browse the repository at this point in the history
This commit adds locks on reclaimspace operations to
prevent multiple process executing rbd sparsify/fstrim
on same volume.

Signed-off-by: Praveen M <m.praveen@ibm.com>
  • Loading branch information
iPraveenParihar committed May 27, 2024
1 parent 822794c commit 993a7c2
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 18 deletions.
26 changes: 22 additions & 4 deletions internal/csi-addons/rbd/reclaimspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ import (
// of CSI-addons reclaimspace controller service spec.
type ReclaimSpaceControllerServer struct {
*rs.UnimplementedReclaimSpaceControllerServer
// Embed ControllerServer as it implements helper functions
*rbdutil.ControllerServer
}

// NewReclaimSpaceControllerServer creates a new ReclaimSpaceControllerServer which handles
// the ReclaimSpace Service requests from the CSI-Addons specification.
func NewReclaimSpaceControllerServer() *ReclaimSpaceControllerServer {
return &ReclaimSpaceControllerServer{}
func NewReclaimSpaceControllerServer(c *rbdutil.ControllerServer) *ReclaimSpaceControllerServer {
return &ReclaimSpaceControllerServer{ControllerServer: c}
}

func (rscs *ReclaimSpaceControllerServer) RegisterService(server grpc.ServiceRegistrar) {
Expand All @@ -64,6 +66,13 @@ func (rscs *ReclaimSpaceControllerServer) ControllerReclaimSpace(
}
defer cr.DeleteCredentials()

if acquired := rscs.VolumeLocks.TryAcquire(volumeID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)

return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rscs.VolumeLocks.Release(volumeID)

rbdVol, err := rbdutil.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
if err != nil {
return nil, status.Errorf(codes.Aborted, "failed to find volume with ID %q: %s", volumeID, err.Error())
Expand All @@ -90,12 +99,14 @@ func (rscs *ReclaimSpaceControllerServer) ControllerReclaimSpace(
// of CSI-addons reclaimspace controller service spec.
type ReclaimSpaceNodeServer struct {
*rs.UnimplementedReclaimSpaceNodeServer
// Embed NodeServer as it implements helper functions
*rbdutil.NodeServer
}

// NewReclaimSpaceNodeServer creates a new IdentityServer which handles the
// Identity Service requests from the CSI-Addons specification.
func NewReclaimSpaceNodeServer() *ReclaimSpaceNodeServer {
return &ReclaimSpaceNodeServer{}
func NewReclaimSpaceNodeServer(n *rbdutil.NodeServer) *ReclaimSpaceNodeServer {
return &ReclaimSpaceNodeServer{NodeServer: n}
}

func (rsns *ReclaimSpaceNodeServer) RegisterService(server grpc.ServiceRegistrar) {
Expand All @@ -116,6 +127,13 @@ func (rsns *ReclaimSpaceNodeServer) NodeReclaimSpace(
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
}

if acquired := rsns.VolumeLocks.TryAcquire(volumeID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)

return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rsns.VolumeLocks.Release(volumeID)

// path can either be the staging path on the node, or the volume path
// inside an application container
path := req.GetStagingTargetPath()
Expand Down
6 changes: 4 additions & 2 deletions internal/csi-addons/rbd/reclaimspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (

rs "github.com/csi-addons/spec/lib/go/reclaimspace"
"github.com/stretchr/testify/require"

rbdutil "github.com/ceph/ceph-csi/internal/rbd"
)

// TestControllerReclaimSpace is a minimal test for the
Expand All @@ -30,7 +32,7 @@ import (
func TestControllerReclaimSpace(t *testing.T) {
t.Parallel()

controller := NewReclaimSpaceControllerServer()
controller := NewReclaimSpaceControllerServer(&rbdutil.ControllerServer{})

req := &rs.ControllerReclaimSpaceRequest{
VolumeId: "",
Expand All @@ -47,7 +49,7 @@ func TestControllerReclaimSpace(t *testing.T) {
func TestNodeReclaimSpace(t *testing.T) {
t.Parallel()

node := NewReclaimSpaceNodeServer()
node := NewReclaimSpaceNodeServer(&rbdutil.NodeServer{})

req := &rs.NodeReclaimSpaceRequest{
VolumeId: "",
Expand Down
23 changes: 11 additions & 12 deletions internal/rbd/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type Driver struct {
cas *csiaddons.CSIAddonsServer
}

var nodeLabels, topology, crushLocationMap map[string]string

// NewDriver returns new rbd driver.
func NewDriver() *Driver {
return &Driver{}
Expand Down Expand Up @@ -70,14 +72,14 @@ func NewNodeServer(
d *csicommon.CSIDriver,
t string,
nodeLabels, topology, crushLocationMap map[string]string,
) (*rbd.NodeServer, error) {
) *rbd.NodeServer {
cliReadAffinityMapOptions := util.ConstructReadAffinityMapOption(crushLocationMap)
ns := rbd.NodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, cliReadAffinityMapOptions, topology, nodeLabels),
VolumeLocks: util.NewVolumeLocks(),
}

return &ns, nil
return &ns
}

// Run start a non-blocking grpc controller,node and identityserver for
Expand All @@ -86,10 +88,7 @@ func NewNodeServer(
// This also configures and starts a new CSI-Addons service, by calling
// setupCSIAddonsServer().
func (r *Driver) Run(conf *util.Config) {
var (
err error
nodeLabels, topology, crushLocationMap map[string]string
)
var err error
// update clone soft and hard limit
rbd.SetGlobalInt("rbdHardMaxCloneDepth", conf.RbdHardMaxCloneDepth)
rbd.SetGlobalInt("rbdSoftMaxCloneDepth", conf.RbdSoftMaxCloneDepth)
Expand Down Expand Up @@ -144,10 +143,8 @@ func (r *Driver) Run(conf *util.Config) {
if err != nil {
log.FatalLogMsg(err.Error())
}
r.ns, err = NewNodeServer(r.cd, conf.Vtype, nodeLabels, topology, crushLocationMap)
if err != nil {
log.FatalLogMsg("failed to start node server, err %v\n", err)
}
r.ns = NewNodeServer(r.cd, conf.Vtype, nodeLabels, topology, crushLocationMap)

var attr string
attr, err = rbd.GetKrbdSupportedFeatures()
if err != nil && !errors.Is(err, os.ErrNotExist) {
Expand Down Expand Up @@ -213,7 +210,7 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error {
r.cas.RegisterService(is)

if conf.IsControllerServer {
rs := casrbd.NewReclaimSpaceControllerServer()
rs := casrbd.NewReclaimSpaceControllerServer(NewControllerServer(r.cd))
r.cas.RegisterService(rs)

fcs := casrbd.NewFenceControllerServer()
Expand All @@ -224,7 +221,9 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error {
}

if conf.IsNodeServer {
rs := casrbd.NewReclaimSpaceNodeServer()
rs := casrbd.NewReclaimSpaceNodeServer(
NewNodeServer(r.cd, conf.Vtype, nodeLabels, topology, crushLocationMap),
)
r.cas.RegisterService(rs)
}

Expand Down

0 comments on commit 993a7c2

Please sign in to comment.