Skip to content

Commit

Permalink
[nspcc-dev#1770] engine: Support configuration reload
Browse files Browse the repository at this point in the history
Currently, it only supports changing the compound of the shards.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
  • Loading branch information
carpawell authored and aprasolova committed Oct 19, 2022
1 parent 3cb248b commit 416ed4a
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 1 deletion.
12 changes: 12 additions & 0 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ const notificationHandlerPoolSize = 10
// structs).
// It must not be used concurrently.
type applicationConfiguration struct {
// _read indicated whether a config
// has already been read
_read bool

EngineCfg struct {
errorThreshold uint32
shardPoolSize uint32
Expand Down Expand Up @@ -144,6 +148,14 @@ type subStorageCfg struct {
// readConfig fills applicationConfiguration with raw configuration values
// not modifying them.
func (a *applicationConfiguration) readConfig(c *config.Config) error {
if a._read {
// clear if it is rereading
*a = applicationConfiguration{}
} else {
// update the status
a._read = true
}

a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)

Expand Down
99 changes: 99 additions & 0 deletions pkg/local_object_storage/engine/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,102 @@ func (e *StorageEngine) BlockExecution(err error) error {
func (e *StorageEngine) ResumeExecution() error {
return e.setBlockExecErr(nil)
}

type ReConfiguration struct {
errorsThreshold uint32
shardPoolSize uint32

shards map[string][]shard.Option // meta path -> shard opts
}

// SetErrorsThreshold sets a size amount of errors after which
// shard is moved to read-only mode.
func (rCfg *ReConfiguration) SetErrorsThreshold(errorsThreshold uint32) {
rCfg.errorsThreshold = errorsThreshold
}

// SetShardPoolSize sets a size of worker pool for each shard
func (rCfg *ReConfiguration) SetShardPoolSize(shardPoolSize uint32) {
rCfg.shardPoolSize = shardPoolSize
}

// AddShard adds a shard for the reconfiguration. Path to a metabase is used as
// an identifier of the shard in configuration.
func (rCfg *ReConfiguration) AddShard(metaPath string, opts []shard.Option) {
if rCfg.shards == nil {
rCfg.shards = make(map[string][]shard.Option)
}

if _, found := rCfg.shards[metaPath]; found {
return
}

rCfg.shards[metaPath] = opts
}

// Reload reloads StorageEngine's configuration in runtime.
func (e *StorageEngine) Reload(rcfg ReConfiguration) error {
e.mtx.RLock()

var shardsToRemove []string // shards IDs
var shardsToAdd []string // meta paths

// mark removed shards for removal
for id, sh := range e.shards {
_, ok := rcfg.shards[sh.Shard.DumpInfo().MetaBaseInfo.Path]
if !ok {
shardsToRemove = append(shardsToRemove, id)
}
}

// mark new shards for addition
for newPath := range rcfg.shards {
addShard := true
for _, sh := range e.shards {
if newPath == sh.Shard.DumpInfo().MetaBaseInfo.Path {
addShard = false
break
}
}

if addShard {
shardsToAdd = append(shardsToAdd, newPath)
}
}

e.mtx.RUnlock()

err := e.removeShards(shardsToRemove...)
if err != nil {
return fmt.Errorf("could not remove shards: %w", err)
}

e.mtx.Lock()
defer e.mtx.Unlock()

for _, newPath := range shardsToAdd {
id, err := e.addShard(rcfg.shards[newPath]...)
if err != nil {
return fmt.Errorf("could not add new shard: %w", err)
}

idStr := id.String()
sh := e.shards[idStr]

err = sh.Open()
if err == nil {
err = sh.Init()
}
if err != nil {
delete(e.shards, idStr)
e.shardPools[idStr].Release()
delete(e.shardPools, idStr)

return fmt.Errorf("could not init %s shard: %w", idStr, err)
}

e.log.Info("added new shard", zap.String("id", idStr))
}

return nil
}
92 changes: 92 additions & 0 deletions pkg/local_object_storage/engine/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package engine

import (
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
"testing"

"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -78,3 +83,90 @@ func TestPersistentShardID(t *testing.T) {
require.NoError(t, e.Close())

}

func TestReload(t *testing.T) {
path := t.TempDir()

t.Run("add shards", func(t *testing.T) {
const shardNum = 4
addPath := filepath.Join(path, "add")

e, currShards := engineWithShards(t, addPath, shardNum)

var rcfg ReConfiguration
for _, p := range currShards {
rcfg.AddShard(p, nil)
}

rcfg.AddShard(currShards[0], nil) // same path
require.NoError(t, e.Reload(rcfg))

// no new paths => no new shards
require.Equal(t, shardNum, len(e.shards))
require.Equal(t, shardNum, len(e.shardPools))

newMeta := filepath.Join(addPath, fmt.Sprintf("%d.metabase", shardNum))

// add new shard
rcfg.AddShard(newMeta, []shard.Option{shard.WithMetaBaseOptions(
meta.WithPath(newMeta),
meta.WithEpochState(epochState{}),
)})
require.NoError(t, e.Reload(rcfg))

require.Equal(t, shardNum+1, len(e.shards))
require.Equal(t, shardNum+1, len(e.shardPools))
})

t.Run("remove shards", func(t *testing.T) {
const shardNum = 4
removePath := filepath.Join(path, "remove")

e, currShards := engineWithShards(t, removePath, shardNum)

var rcfg ReConfiguration
for i := 0; i < len(currShards)-1; i++ { // without one of the shards
rcfg.AddShard(currShards[i], nil)
}

require.NoError(t, e.Reload(rcfg))

// removed one
require.Equal(t, shardNum-1, len(e.shards))
require.Equal(t, shardNum-1, len(e.shardPools))
})
}

// engineWithShards creates engine with specified number of shards. Returns
// slice of paths to their metabase and the engine.
// TODO: #1776 unify engine construction in tests
func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []string) {
addPath := filepath.Join(path, "add")

currShards := make([]string, 0, num)

e := New()
for i := 0; i < num; i++ {
metaPath := filepath.Join(addPath, fmt.Sprintf("%d.metabase", i))
currShards = append(currShards, metaPath)

_, err := e.AddShard(
shard.WithBlobStorOptions(
blobstor.WithStorages(newStorages(filepath.Join(addPath, strconv.Itoa(i)), errSmallSize))),
shard.WithMetaBaseOptions(
meta.WithPath(metaPath),
meta.WithPermissions(0700),
meta.WithEpochState(epochState{}),
),
)
require.NoError(t, err)
}

require.Equal(t, num, len(e.shards))
require.Equal(t, num, len(e.shardPools))

require.NoError(t, e.Open())
require.NoError(t, e.Init())

return e, currShards
}
39 changes: 38 additions & 1 deletion pkg/local_object_storage/engine/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"go.uber.org/atomic"
"go.uber.org/zap"
)

var errShardNotFound = errors.New("shard not found")
Expand Down Expand Up @@ -46,6 +47,10 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
e.mtx.Lock()
defer e.mtx.Unlock()

return e.addShard(opts...)
}

func (e *StorageEngine) addShard(opts ...shard.Option) (*shard.ID, error) {
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
if err != nil {
return nil, err
Expand Down Expand Up @@ -73,7 +78,7 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
)...)

if err := sh.UpdateID(); err != nil {
return nil, fmt.Errorf("could not open shard: %w", err)
return nil, fmt.Errorf("could not update shard ID: %w", err)
}

strID := sh.ID().String()
Expand All @@ -91,6 +96,38 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
return sh.ID(), nil
}

// removeShards removes specified shards. Skips non-existent shards.
// Returns any error encountered that did not allow remove the shards.
func (e *StorageEngine) removeShards(ids ...string) error {
e.mtx.Lock()
defer e.mtx.Unlock()

for _, id := range ids {
sh, found := e.shards[id]
if !found {
continue
}

err := sh.Close()
if err != nil {
return fmt.Errorf("could not close removed shard: %w", err)
}

delete(e.shards, id)

pool, ok := e.shardPools[id]
if ok {
pool.Release()
delete(e.shardPools, id)
}

e.log.Info("shard has been removed",
zap.String("id", id))
}

return nil
}

func generateShardID() (*shard.ID, error) {
uid, err := uuid.NewRandom()
if err != nil {
Expand Down
46 changes: 46 additions & 0 deletions pkg/local_object_storage/engine/shards_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package engine

import (
"os"
"testing"

"github.com/stretchr/testify/require"
)

func TestRemoveShard(t *testing.T) {
const numOfShards = 6

e := testNewEngineWithShardNum(t, numOfShards)
t.Cleanup(func() {
e.Close()
os.RemoveAll(t.Name())
})

require.Equal(t, numOfShards, len(e.shardPools))
require.Equal(t, numOfShards, len(e.shards))

removedNum := numOfShards / 2

mSh := make(map[string]bool, numOfShards)
for i, sh := range e.DumpInfo().Shards {
if i == removedNum {
break
}

mSh[sh.ID.String()] = true
}

for id, remove := range mSh {
if remove {
require.NoError(t, e.removeShards(id))
}
}

require.Equal(t, numOfShards-removedNum, len(e.shardPools))
require.Equal(t, numOfShards-removedNum, len(e.shards))

for id, removed := range mSh {
_, ok := e.shards[id]
require.True(t, ok != removed)
}
}

0 comments on commit 416ed4a

Please sign in to comment.