Skip to content

Commit

Permalink
[#2057] meta: Fix concurrent mode changes
Browse files Browse the repository at this point in the history
Includes:
1. mode change read lock operation in every exported method that r/w the
underlying database;
2. returning `ErrDegradedMode` logical error if any exported method is
called in degraded (without a metabase) mode.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
  • Loading branch information
carpawell committed Nov 15, 2022
1 parent e8d401e commit 02676f0
Show file tree
Hide file tree
Showing 16 changed files with 142 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Changelog for NeoFS Node
- Assembly process triggered by a request with a bearer token (#2040)
- Losing locking context after metabase resync (#1502)
- Removing all trees by container ID if tree ID is empty in `pilorama.Forest.TreeDrop` (#1940)
- Concurrent mode changes in the metabase and blobstor (#2057)

### Removed
### Updated
Expand Down
7 changes: 7 additions & 0 deletions pkg/local_object_storage/metabase/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ func (db *DB) containers(tx *bbolt.Tx) ([]cid.ID, error) {
}

func (db *DB) ContainerSize(id cid.ID) (size uint64, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return 0, ErrDegradedMode
}

err = db.boltDB.View(func(tx *bbolt.Tx) error {
size, err = db.containerSize(tx, id)

Expand Down
14 changes: 14 additions & 0 deletions pkg/local_object_storage/metabase/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ func (db *DB) Init() error {
// Reset resets metabase. Works similar to Init but cleans up all static buckets and
// removes all dynamic (CID-dependent) ones in non-blank BoltDB instances.
func (db *DB) Reset() error {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return ErrDegradedMode
}

return db.init(true)
}

Expand Down Expand Up @@ -147,6 +154,13 @@ func (db *DB) init(reset bool) error {

// SyncCounters forces to synchronize the object counters.
func (db *DB) SyncCounters() error {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return ErrDegradedMode
}

return db.boltDB.Update(func(tx *bbolt.Tx) error {
return syncCounter(tx, true)
})
Expand Down
7 changes: 7 additions & 0 deletions pkg/local_object_storage/metabase/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ func (o ObjectCounters) Phy() uint64 {
// Returns only the errors that do not allow reading counter
// in Bolt database.
func (db *DB) ObjectCounters() (cc ObjectCounters, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return ObjectCounters{}, ErrDegradedMode
}

err = db.boltDB.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(shardInfoBucket)
if b != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/local_object_storage/metabase/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return DeleteRes{}, ErrDegradedMode
}

var rawRemoved uint64
var availableRemoved uint64
var err error
Expand Down
4 changes: 4 additions & 0 deletions pkg/local_object_storage/metabase/exists.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (db *DB) Exists(prm ExistsPrm) (res ExistsRes, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return res, ErrDegradedMode
}

currEpoch := db.epochState.CurrentEpoch()

err = db.boltDB.View(func(tx *bbolt.Tx) error {
Expand Down
21 changes: 21 additions & 0 deletions pkg/local_object_storage/metabase/graveyard.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ func (g *GarbageIterationPrm) SetOffset(offset oid.Address) {
// If h returns ErrInterruptIterator, nil returns immediately.
// Returns other errors of h directly.
func (db *DB) IterateOverGarbage(p GarbageIterationPrm) error {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return ErrDegradedMode
}

return db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateDeletedObj(tx, gcHandler{p.h}, p.offset)
})
Expand Down Expand Up @@ -118,6 +125,13 @@ func (g *GraveyardIterationPrm) SetOffset(offset oid.Address) {
// If h returns ErrInterruptIterator, nil returns immediately.
// Returns other errors of h directly.
func (db *DB) IterateOverGraveyard(p GraveyardIterationPrm) error {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return ErrDegradedMode
}

return db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateDeletedObj(tx, graveyardHandler{p.h}, p.offset)
})
Expand Down Expand Up @@ -218,6 +232,13 @@ func graveFromKV(k, v []byte) (res TombstonedObject, err error) {
//
// Returns any error appeared during deletion process.
func (db *DB) DropGraves(tss []TombstonedObject) error {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return ErrDegradedMode
}

buf := make([]byte, addressKeySize)

return db.boltDB.Update(func(tx *bbolt.Tx) error {
Expand Down
3 changes: 3 additions & 0 deletions pkg/local_object_storage/metabase/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,8 @@ type Info struct {

// DumpInfo returns information about the DB.
func (db *DB) DumpInfo() Info {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

return db.info
}
14 changes: 14 additions & 0 deletions pkg/local_object_storage/metabase/iterators.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ var ErrInterruptIterator = logicerr.New("iterator is interrupted")
// If h returns ErrInterruptIterator, nil returns immediately.
// Returns other errors of h directly.
func (db *DB) IterateExpired(epoch uint64, h ExpiredObjectHandler) error {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return ErrDegradedMode
}

return db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateExpired(tx, epoch, h)
})
Expand Down Expand Up @@ -119,6 +126,13 @@ func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler)
//
// Does not modify tss.
func (db *DB) IterateCoveredByTombstones(tss map[string]oid.Address, h func(oid.Address) error) error {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return ErrDegradedMode
}

return db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateCoveredByTombstones(tx, tss, h)
})
Expand Down
4 changes: 4 additions & 0 deletions pkg/local_object_storage/metabase/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func (db *DB) ListWithCursor(prm ListPrm) (res ListRes, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return res, ErrDegradedMode
}

result := make([]objectcore.AddressWithType, 0, prm.count)

err = db.boltDB.View(func(tx *bbolt.Tx) error {
Expand Down
18 changes: 18 additions & 0 deletions pkg/local_object_storage/metabase/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (db *DB) Lock(cnr cid.ID, locker oid.ID, locked []oid.ID) error {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return ErrDegradedMode
}

if len(locked) == 0 {
panic("empty locked list")
}
Expand Down Expand Up @@ -91,6 +95,13 @@ func (db *DB) Lock(cnr cid.ID, locker oid.ID, locked []oid.ID) error {

// FreeLockedBy unlocks all objects in DB which are locked by lockers.
func (db *DB) FreeLockedBy(lockers []oid.Address) error {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return ErrDegradedMode
}

return db.boltDB.Update(func(tx *bbolt.Tx) error {
var err error

Expand Down Expand Up @@ -202,6 +213,13 @@ func (i IsLockedRes) Locked() bool {
//
// Returns only non-logical errors related to underlying database.
func (db *DB) IsLocked(prm IsLockedPrm) (res IsLockedRes, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return res, ErrDegradedMode
}

return res, db.boltDB.View(func(tx *bbolt.Tx) error {
res.locked = objectLocked(tx, prm.addr.Container(), prm.addr.Object())
return nil
Expand Down
12 changes: 12 additions & 0 deletions pkg/local_object_storage/metabase/movable.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (db *DB) ToMoveIt(prm ToMoveItPrm) (res ToMoveItRes, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return res, ErrDegradedMode
}

key := make([]byte, addressKeySize)
key = addressKey(prm.addr, key)

Expand All @@ -68,6 +72,10 @@ func (db *DB) DoNotMove(prm DoNotMovePrm) (res DoNotMoveRes, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return res, ErrDegradedMode
}

key := make([]byte, addressKeySize)
key = addressKey(prm.addr, key)

Expand All @@ -84,6 +92,10 @@ func (db *DB) Movable(_ MovablePrm) (MovableRes, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return MovableRes{}, ErrDegradedMode
}

var strAddrs []string

err := db.boltDB.View(func(tx *bbolt.Tx) error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/local_object_storage/metabase/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (db *DB) Put(prm PutPrm) (res PutRes, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return res, ErrDegradedMode
}

currEpoch := db.epochState.CurrentEpoch()

err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/local_object_storage/metabase/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (db *DB) Select(prm SelectPrm) (res SelectRes, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return res, ErrDegradedMode
}

if blindlyProcess(prm.filters) {
return res, nil
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/local_object_storage/metabase/shard_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ var (
// ReadShardID reads shard id from db.
// If id is missing, returns nil, nil.
func (db *DB) ReadShardID() ([]byte, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return nil, ErrDegradedMode
}

var id []byte
err := db.boltDB.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(shardInfoBucket)
Expand All @@ -26,6 +33,13 @@ func (db *DB) ReadShardID() ([]byte, error) {

// WriteShardID writes shard it to db.
func (db *DB) WriteShardID(id []byte) error {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return ErrDegradedMode
}

return db.boltDB.Update(func(tx *bbolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions pkg/local_object_storage/metabase/storage_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ func (r StorageIDRes) StorageID() []byte {
// StorageID returns storage descriptor for objects from the blobstor.
// It is put together with the object can makes get/delete operation faster.
func (db *DB) StorageID(prm StorageIDPrm) (res StorageIDRes, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return res, ErrDegradedMode
}

err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.id, err = db.storageID(tx, prm.addr)

Expand Down Expand Up @@ -77,6 +84,10 @@ func (db *DB) UpdateStorageID(prm UpdateStorageIDPrm) (res UpdateStorageIDRes, e
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return res, ErrDegradedMode
}

currEpoch := db.epochState.CurrentEpoch()

err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
Expand Down

0 comments on commit 02676f0

Please sign in to comment.