Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to flush data from writecache #1806

Merged
merged 8 commits into from
Sep 28, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Changelog for NeoFS Node
- Changelog updates CI step (#1808)
- Validate storage node configuration before node startup (#1805)
- `neofs-node -check` command to check the configuration file (#1805)
- `flush-cache` control service command to flush write-cache (#1806)

### Changed

Expand Down
1 change: 0 additions & 1 deletion cmd/neofs-cli/modules/control/evacuate_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,4 @@ func initControlEvacuateShardCmd() {
flags.Bool(dumpIgnoreErrorsFlag, false, "Skip invalid/unreadable objects")

_ = evacuateShardCmd.MarkFlagRequired(shardIDFlag)
_ = evacuateShardCmd.MarkFlagRequired(controlRPC)
}
50 changes: 50 additions & 0 deletions cmd/neofs-cli/modules/control/flush_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package control

import (
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-node/cmd/neofs-cli/internal/common"
"github.com/nspcc-dev/neofs-node/cmd/neofs-cli/internal/commonflags"
"github.com/nspcc-dev/neofs-node/cmd/neofs-cli/internal/key"
"github.com/nspcc-dev/neofs-node/pkg/services/control"
"github.com/spf13/cobra"
)

var flushCacheCmd = &cobra.Command{
Use: "flush-cache",
Short: "Flush objects from the write-cache to the main storage",
Long: "Flush objects from the write-cache to the main storage",
Run: flushCache,
}

func flushCache(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)

req := &control.FlushCacheRequest{Body: new(control.FlushCacheRequest_Body)}
req.Body.Shard_ID = getShardID(cmd)

signRequest(cmd, pk, req)

cli := getClient(cmd, pk)

var resp *control.FlushCacheResponse
var err error
err = cli.ExecRaw(func(client *client.Client) error {
resp, err = control.FlushCache(client, req)
return err
})
common.ExitOnErr(cmd, "rpc error: %w", err)

verifyResponse(cmd, resp.GetSignature(), resp.GetBody())

cmd.Println("Write-cache has been flushed.")
}

func initControlFlushCacheCmd() {
commonflags.InitWithoutRPC(flushCacheCmd)

ff := flushCacheCmd.Flags()
ff.String(controlRPC, controlRPCDefault, controlRPCUsage)
ff.String(shardIDFlag, "", "Shard ID in base58 encoding")

_ = flushCacheCmd.MarkFlagRequired(shardIDFlag)
}
2 changes: 2 additions & 0 deletions cmd/neofs-cli/modules/control/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ func initControlShardsCmd() {
shardsCmd.AddCommand(dumpShardCmd)
shardsCmd.AddCommand(restoreShardCmd)
shardsCmd.AddCommand(evacuateShardCmd)
shardsCmd.AddCommand(flushCacheCmd)

initControlShardsListCmd()
initControlSetShardModeCmd()
initControlDumpShardCmd()
initControlRestoreShardCmd()
initControlEvacuateShardCmd()
initControlFlushCacheCmd()
}
42 changes: 42 additions & 0 deletions pkg/local_object_storage/engine/writecache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package engine

import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
)

// FlushWriteCachePrm groups the parameters of FlushWriteCache operation.
type FlushWriteCachePrm struct {
shardID *shard.ID
ignoreErrors bool
}

// SetShardID is an option to set shard ID.
//
// Option is required.
func (p *FlushWriteCachePrm) SetShardID(id *shard.ID) {
p.shardID = id
}

// SetIgnoreErrors sets errors ignore flag..
func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
p.ignoreErrors = ignore
}

// FlushWriteCacheRes groups the resulting values of FlushWriteCache operation.
type FlushWriteCacheRes struct{}

// FlushWriteCache flushes write-cache on a single shard.
func (e *StorageEngine) FlushWriteCache(p FlushWriteCachePrm) (FlushWriteCacheRes, error) {
e.mtx.RLock()
sh, ok := e.shards[p.shardID.String()]
e.mtx.RUnlock()

if !ok {
return FlushWriteCacheRes{}, errShardNotFound
}

var prm shard.FlushWriteCachePrm
prm.SetIgnoreErrors(p.ignoreErrors)

return FlushWriteCacheRes{}, sh.FlushWriteCache(prm)
}
4 changes: 0 additions & 4 deletions pkg/local_object_storage/shard/mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ func (s *Shard) SetMode(m mode.Mode) error {
s.m.Lock()
defer s.m.Unlock()

if s.info.Mode == m {
return nil
}

components := []interface{ SetMode(mode.Mode) error }{
s.metaBase, s.blobStor,
}
Expand Down
46 changes: 46 additions & 0 deletions pkg/local_object_storage/shard/writecache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package shard

import (
"errors"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
)

// FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation.
type FlushWriteCachePrm struct {
ignoreErrors bool
}

// SetIgnoreErrors sets the flag to ignore read-errors during flush.
func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
p.ignoreErrors = ignore
}

// errWriteCacheDisabled is returned when an operation on write-cache is performed,
// but write-cache is disabled.
var errWriteCacheDisabled = errors.New("write-cache is disabled")

// FlushWriteCache moves writecache in read-only mode and flushes all data from it.
// After the operation writecache will remain read-only mode.
func (s *Shard) FlushWriteCache(p FlushWriteCachePrm) error {
if !s.hasWriteCache() {
return errWriteCacheDisabled
}

s.m.RLock()
defer s.m.RUnlock()

// To write data to the blobstor we need to write to the blobstor and the metabase.
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}

if err := s.writeCache.SetMode(mode.ReadOnly); err != nil {
return err
}

return s.writeCache.Flush(p.ignoreErrors)
}
83 changes: 83 additions & 0 deletions pkg/local_object_storage/writecache/flush.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package writecache

import (
"errors"
"time"

"github.com/mr-tron/base58"
Expand All @@ -25,6 +26,10 @@ const (
defaultFlushInterval = time.Second
)

// errMustBeReadOnly is returned when write-cache must be
// in read-only mode to perform an operation.
var errMustBeReadOnly = errors.New("write-cache must be in read-only mode")

// runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop() {
for i := 0; i < c.workersCount; i++ {
Expand Down Expand Up @@ -224,3 +229,81 @@ func (c *cache) flushObject(obj *object.Object) error {
_, err = c.metabase.Put(pPrm)
return err
}

// Flush flushes all objects from the write-cache to the main storage.
// Write-cache must be in readonly mode to ensure correctness of an operation and
// to prevent interference with background flush workers.
func (c *cache) Flush(ignoreErrors bool) error {
c.modeMtx.RLock()
defer c.modeMtx.RUnlock()

if !c.mode.ReadOnly() {
return errMustBeReadOnly
}

var prm common.IteratePrm
prm.IgnoreErrors = ignoreErrors
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
_, ok := c.flushed.Peek(addr.EncodeToString())
if ok {
return nil
}

data, err := f()
if err != nil {
if ignoreErrors {
return nil
}
return err
}

var obj object.Object
err = obj.Unmarshal(data)
if err != nil {
if ignoreErrors {
return nil
}
return err
}

return c.flushObject(&obj)
}

_, err := c.fsTree.Iterate(prm)
if err != nil {
return err
}

return c.db.View(func(tx *bbolt.Tx) error {
var addr oid.Address

b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
sa := string(k)
if _, ok := c.flushed.Peek(sa); ok {
continue
}

if err := addr.DecodeString(sa); err != nil {
if ignoreErrors {
continue
}
return err
}

var obj object.Object
if err := obj.Unmarshal(data); err != nil {
if ignoreErrors {
continue
}
return err
}

if err := c.flushObject(&obj); err != nil {
return err
}
}
return nil
})
}
Loading