diff --git a/internal/cephfs/core/metadata.go b/internal/cephfs/core/metadata.go index 9e2b90d5f468..fd99838417d0 100644 --- a/internal/cephfs/core/metadata.go +++ b/internal/cephfs/core/metadata.go @@ -34,23 +34,63 @@ var ErrSubVolMetadataNotSupported = errors.New("subvolume metadata operations ar func (s *subVolumeClient) supportsSubVolMetadata() bool { newLocalClusterState(s.clusterID) + clusterAdditionalInfo[s.clusterID].subVolMetadataState.RLock() + defer clusterAdditionalInfo[s.clusterID].subVolMetadataState.RUnlock() - return clusterAdditionalInfo[s.clusterID].subVolMetadataState != unsupported + return clusterAdditionalInfo[s.clusterID].subVolMetadataState.operationState != unsupported } func (s *subVolumeClient) isUnsupportedSubVolMetadata(err error) bool { + clusterAdditionalInfo[s.clusterID].subVolMetadataState.Lock() + defer clusterAdditionalInfo[s.clusterID].subVolMetadataState.Unlock() + var invalid fsAdmin.NotImplementedError if err != nil && errors.As(err, &invalid) { // In case the error is other than invalid command return error to the caller. - clusterAdditionalInfo[s.clusterID].subVolMetadataState = unsupported + clusterAdditionalInfo[s.clusterID].subVolMetadataState.operationState = unsupported return false } - clusterAdditionalInfo[s.clusterID].subVolMetadataState = supported + clusterAdditionalInfo[s.clusterID].subVolMetadataState.operationState = supported return true } +// isNotSupportedResize returns true if resize is not supported. +func (s *subVolumeClient) isNotSupportedResize() bool { + newLocalClusterState(s.clusterID) + clusterAdditionalInfo[s.clusterID].resizeState.RLock() + defer clusterAdditionalInfo[s.clusterID].resizeState.RUnlock() + + return clusterAdditionalInfo[s.clusterID].resizeState.operationState == unknown || + clusterAdditionalInfo[s.clusterID].resizeState.operationState == unsupported +} + +// updateResizeState updates resize state. +func (s *subVolumeClient) updateResizeState(state operationState) { + clusterAdditionalInfo[s.clusterID].resizeState.Lock() + defer clusterAdditionalInfo[s.clusterID].resizeState.Unlock() + + clusterAdditionalInfo[s.clusterID].resizeState.operationState = state +} + +// isSubVolumeGroupCreated returns true if subvolume group is created. +func (s *subVolumeClient) isSubVolumeGroupCreated(group string) bool { + newLocalClusterState(s.clusterID) + clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.RLock() + defer clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.RUnlock() + + return clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[group] +} + +// updateSubVolumeGroupCreated updates subvolume group created state. +func (s *subVolumeClient) updateSubVolumeGroupCreated(group string, state bool) { + clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.Lock() + defer clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.Unlock() + + clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[group] = state +} + // setMetadata sets custom metadata on the subvolume in a volume as a // key-value pair. func (s *subVolumeClient) setMetadata(key, value string) error { diff --git a/internal/cephfs/core/snapshot_metadata.go b/internal/cephfs/core/snapshot_metadata.go index f168fbf8cd41..123a4eac892c 100644 --- a/internal/cephfs/core/snapshot_metadata.go +++ b/internal/cephfs/core/snapshot_metadata.go @@ -30,20 +30,25 @@ var ErrSubVolSnapMetadataNotSupported = errors.New("subvolume snapshot metadata func (s *snapshotClient) supportsSubVolSnapMetadata() bool { newLocalClusterState(s.clusterID) + clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.RLock() + defer clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.RUnlock() - return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState != unsupported + return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.operationState != unsupported } func (s *snapshotClient) isUnsupportedSubVolSnapMetadata(err error) bool { + clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Lock() + defer clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Unlock() + var invalid fsAdmin.NotImplementedError if err != nil && errors.As(err, &invalid) { // In case the error is other than invalid command return error to // the caller. - clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = unsupported + clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.operationState = unsupported return false } - clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = supported + clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.operationState = supported return true } diff --git a/internal/cephfs/core/volume.go b/internal/cephfs/core/volume.go index c95a7cc8639e..314337f9d6b0 100644 --- a/internal/cephfs/core/volume.go +++ b/internal/cephfs/core/volume.go @@ -22,6 +22,7 @@ import ( "fmt" "path" "strings" + "sync" cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors" fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" @@ -32,12 +33,17 @@ import ( "github.com/ceph/go-ceph/rados" ) -// clusterAdditionalInfo contains information regarding if resize is -// supported in the particular cluster and subvolumegroup is -// created or not. -// Subvolumegroup creation and volume resize decisions are -// taken through this additional cluster information. -var clusterAdditionalInfo = make(map[string]*localClusterState) +var ( + // clusterAdditionalInfo contains information regarding if resize is + // supported in the particular cluster and subvolumegroup is + // created or not. + // Subvolumegroup creation and volume resize decisions are + // taken through this additional cluster information. + clusterAdditionalInfo = make(map[string]*localClusterState) + // clusterAdditionalInfoMutex is used to protect against + // concurrent writes. + clusterAdditionalInfoMutex = sync.Mutex{} +) // Subvolume holds subvolume information. This includes only the needed members // from fsAdmin.SubVolumeInfo. @@ -190,7 +196,12 @@ func (s *subVolumeClient) GetSubVolumeInfo(ctx context.Context) (*Subvolume, err return &subvol, nil } -type operationState int64 +type operationState int32 + +type operationStateMutex struct { + sync.RWMutex + operationState operationState +} const ( unknown operationState = iota @@ -201,19 +212,22 @@ const ( type localClusterState struct { // set the enum value i.e., unknown, supported, // unsupported as per the state of the cluster. - resizeState operationState - subVolMetadataState operationState - subVolSnapshotMetadataState operationState + resizeState operationStateMutex + subVolMetadataState operationStateMutex + subVolSnapshotMetadataState operationStateMutex // A cluster can have multiple filesystem for that we need to have a map of // subvolumegroups to check filesystem is created nor not. // set true once a subvolumegroup is created // for corresponding filesystem in a cluster. subVolumeGroupsCreated map[string]bool + subVolumeGroupsRWMutex sync.RWMutex } func newLocalClusterState(clusterID string) { // verify if corresponding clusterID key is present in the map, // and if not, initialize with default values(false). + clusterAdditionalInfoMutex.Lock() + defer clusterAdditionalInfoMutex.Unlock() if _, keyPresent := clusterAdditionalInfo[clusterID]; !keyPresent { clusterAdditionalInfo[clusterID] = &localClusterState{} clusterAdditionalInfo[clusterID].subVolumeGroupsCreated = make(map[string]bool) @@ -232,7 +246,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { } // create subvolumegroup if not already created for the cluster. - if !clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] { + if !s.isSubVolumeGroupCreated(s.SubvolumeGroup) { opts := fsAdmin.SubVolumeGroupOptions{} err = ca.CreateSubVolumeGroup(s.FsName, s.SubvolumeGroup, &opts) if err != nil { @@ -246,7 +260,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { return err } log.DebugLog(ctx, "cephfs: created subvolume group %s", s.SubvolumeGroup) - clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = true + s.updateSubVolumeGroupCreated(s.SubvolumeGroup, true) } opts := fsAdmin.SubVolumeOptions{ @@ -264,7 +278,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { if errors.Is(err, rados.ErrNotFound) { // Reset the subVolumeGroupsCreated so that we can try again to create the // subvolumegroup in next request if the error is Not Found. - clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = false + s.updateSubVolumeGroupCreated(s.SubvolumeGroup, false) } return err @@ -295,10 +309,10 @@ func (s *subVolumeClient) ExpandVolume(ctx context.Context, bytesQuota int64) er // CreateVolume to resize the subvolume. func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) error { newLocalClusterState(s.clusterID) + // resize subvolume when either it's supported, or when corresponding // clusterID key was not present. - if clusterAdditionalInfo[s.clusterID].resizeState == unknown || - clusterAdditionalInfo[s.clusterID].resizeState == supported { + if s.isNotSupportedResize() { fsa, err := s.conn.GetFSAdmin() if err != nil { log.ErrorLog(ctx, "could not get FSAdmin, can not resize volume %s:", s.FsName, err) @@ -307,7 +321,7 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er } _, err = fsa.ResizeSubVolume(s.FsName, s.SubvolumeGroup, s.VolID, fsAdmin.ByteCount(bytesQuota), true) if err == nil { - clusterAdditionalInfo[s.clusterID].resizeState = supported + s.updateResizeState(supported) return nil } @@ -319,7 +333,7 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er return err } } - clusterAdditionalInfo[s.clusterID].resizeState = unsupported + s.updateResizeState(unsupported) s.Size = bytesQuota return s.CreateVolume(ctx)