Skip to content

Commit

Permalink
node/object: Use replication API to distribute ready object to container
Browse files Browse the repository at this point in the history
Previously, storage nodes always sent "ready" objects after client data
slicing via NeoFS API `ObjectService.Put` RPC. Recently, NeoFS protocol
was extended with `ObjectService.Replicate` RPC allowing to replicate
object from one container node to another. New RPC is more efficient,
and now used by Policer+Replicator tandem to replicate objects on
shortage. Actually, when `ObjectService.Put` server makes initial object
save, it does completely the same.

In total, when `Put` server represents storage node from the container,
it can use `Replicate` RPC for better performance. An additional
advantage is the one-time encoding of the protocol message, which is
reused for sending to different nodes. This reduces the memory cost of
processing each data stream.

Closes #2317.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Feb 21, 2024
1 parent 8385ee4 commit 35c8edf
Show file tree
Hide file tree
Showing 16 changed files with 385 additions and 57 deletions.
9 changes: 8 additions & 1 deletion cmd/neofs-node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type api struct {
clients *coreClientConstructor
}

type apiAdapter api

// TODO: docs.
func (x *api) ReplicateToNode(ctx context.Context, req []byte, node netmap.NodeInfo) error {
var n coreclient.NodeInfo
Expand All @@ -28,7 +30,12 @@ func (x *api) ReplicateToNode(ctx context.Context, req []byte, node netmap.NodeI
return fmt.Errorf("parse info about storage node from the network map: %w", err)
}

mc, err := x.clients.Get(n)
return (*apiAdapter)(x).ReplicateToNode(ctx, req, n)
}

// TODO: docs.
func (x *apiAdapter) ReplicateToNode(ctx context.Context, req []byte, node coreclient.NodeInfo) error {
mc, err := x.clients.Get(node)
if err != nil {
return fmt.Errorf("connect to node: %w", err)
}
Expand Down
14 changes: 9 additions & 5 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func initObjectService(c *cfg) {
}
}

sPut := putsvc.NewService(
sPut := putsvc.NewService(&apiAdapter{clients: putConstructor},
putsvc.WithKeyStorage(keyStorage),
putsvc.WithClientConstructor(putConstructor),
putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)),
Expand Down Expand Up @@ -558,8 +558,8 @@ func (e engineWithNotifications) Lock(locker oid.Address, toLock []oid.ID) error
return e.base.Lock(locker, toLock)
}

func (e engineWithNotifications) Put(o *objectSDK.Object) error {
if err := e.base.Put(o); err != nil {
func (e engineWithNotifications) Put(o *objectSDK.Object, objBin []byte, hdrLen int) error {
if err := e.base.Put(o, objBin, hdrLen); err != nil {
return err
}

Expand Down Expand Up @@ -606,6 +606,10 @@ func (e engineWithoutNotifications) Lock(locker oid.Address, toLock []oid.ID) er
return e.engine.Lock(locker.Container(), locker.Object(), toLock)
}

func (e engineWithoutNotifications) Put(o *objectSDK.Object) error {
return engine.Put(e.engine, o)
func (e engineWithoutNotifications) Put(o *objectSDK.Object, objBin []byte, hdrLen int) error {
var putPrm engine.PutPrm
putPrm.WithObject(o)
putPrm.SetObjectBinary(objBin, hdrLen)
_, err := e.engine.Put(putPrm)
return err
}
2 changes: 1 addition & 1 deletion pkg/local_object_storage/engine/evacuate.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ mainLoop:
if _, ok := shardMap[shards[j].ID().String()]; ok {
continue
}
putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, getRes.Object())
putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, getRes.Object(), nil, 0)
if putDone || exists {
if putDone {
e.log.Debug("object is moved to another shard",
Expand Down
15 changes: 12 additions & 3 deletions pkg/local_object_storage/engine/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (

// PutPrm groups the parameters of Put operation.
type PutPrm struct {
obj *objectSDK.Object
obj *objectSDK.Object
objBin []byte
hdrLen int
}

// PutRes groups the resulting values of Put operation.
Expand All @@ -30,6 +32,11 @@ func (p *PutPrm) WithObject(obj *objectSDK.Object) {
p.obj = obj
}

// TODO: docs
func (p *PutPrm) SetObjectBinary(objBin []byte, hdrLen int) {
p.objBin, p.hdrLen = objBin, hdrLen
}

// Put saves the object to local storage.
//
// Returns any error encountered that
Expand Down Expand Up @@ -72,7 +79,7 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
return false
}

putDone, exists := e.putToShard(sh, ind, pool, addr, prm.obj)
putDone, exists := e.putToShard(sh, ind, pool, addr, prm.obj, prm.objBin, prm.hdrLen)
finished = putDone || exists
return finished
})
Expand All @@ -87,7 +94,8 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
// putToShard puts object to sh.
// First return value is true iff put has been successfully done.
// Second return value is true iff object already exists.
func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) {
func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object,
objBin []byte, hdrLen int) (bool, bool) {
var putSuccess, alreadyExists bool

exitCh := make(chan struct{})
Expand Down Expand Up @@ -129,6 +137,7 @@ func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool

var putPrm shard.PutPrm
putPrm.SetObject(obj)
putPrm.SetObjectBinary(objBin, hdrLen)

_, err = sh.Put(putPrm)
if err != nil {
Expand Down
27 changes: 19 additions & 8 deletions pkg/local_object_storage/metabase/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type PutPrm struct {
obj *objectSDK.Object

id []byte

hdrBin []byte
}

// PutRes groups the resulting values of Put operation.
Expand All @@ -41,6 +43,11 @@ func (p *PutPrm) SetStorageID(id []byte) {
p.id = id
}

// TODO: docs
func (p *PutPrm) SetHeaderBinary(hdrBin []byte) {
p.hdrBin = hdrBin
}

var (
ErrUnknownObjectType = errors.New("unknown object type")
ErrIncorrectSplitInfoUpdate = errors.New("updating split info on object without it")
Expand All @@ -64,7 +71,7 @@ func (db *DB) Put(prm PutPrm) (res PutRes, err error) {
currEpoch := db.epochState.CurrentEpoch()

err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
return db.put(tx, prm.obj, prm.id, nil, currEpoch)
return db.put(tx, prm.obj, prm.id, nil, currEpoch, prm.hdrBin)
})
if err == nil {
storagelog.Write(db.log,
Expand All @@ -77,7 +84,7 @@ func (db *DB) Put(prm PutPrm) (res PutRes, err error) {

func (db *DB) put(
tx *bbolt.Tx, obj *objectSDK.Object, id []byte,
si *objectSDK.SplitInfo, currEpoch uint64) error {
si *objectSDK.SplitInfo, currEpoch uint64, hdrBin []byte) error {
cnr, ok := obj.ContainerID()
if !ok {
return errors.New("missing container in object")
Expand Down Expand Up @@ -118,13 +125,13 @@ func (db *DB) put(
return err
}

err = db.put(tx, par, id, parentSI, currEpoch)
err = db.put(tx, par, id, parentSI, currEpoch, nil)
if err != nil {
return err
}
}

err = putUniqueIndexes(tx, obj, si, id)
err = putUniqueIndexes(tx, obj, si, id, hdrBin)
if err != nil {
return fmt.Errorf("can't put unique indexes: %w", err)
}
Expand Down Expand Up @@ -169,6 +176,7 @@ func putUniqueIndexes(
obj *objectSDK.Object,
si *objectSDK.SplitInfo,
id []byte,
hdrBin []byte,
) error {
isParent := si != nil
addr := objectCore.AddressOf(obj)
Expand All @@ -191,15 +199,18 @@ func putUniqueIndexes(
return ErrUnknownObjectType
}

rawObject, err := obj.CutPayload().Marshal()
if err != nil {
return fmt.Errorf("can't marshal object header: %w", err)
var err error
if hdrBin == nil {
hdrBin, err = obj.CutPayload().Marshal()
if err != nil {
return fmt.Errorf("can't marshal object header: %w", err)
}
}

err = putUniqueIndexItem(tx, namedBucketItem{
name: bucketName,
key: objKey,
val: rawObject,
val: hdrBin,
})
if err != nil {
return err
Expand Down
22 changes: 18 additions & 4 deletions pkg/local_object_storage/shard/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (

// PutPrm groups the parameters of Put operation.
type PutPrm struct {
obj *object.Object
obj *object.Object
objBin []byte
hdrLen int
}

// PutRes groups the resulting values of Put operation.
Expand All @@ -23,6 +25,11 @@ func (p *PutPrm) SetObject(obj *object.Object) {
p.obj = obj
}

// TODO: docs
func (p *PutPrm) SetObjectBinary(objBin []byte, hdrLen int) {
p.objBin, p.hdrLen = objBin, hdrLen
}

// Put saves the object in shard.
//
// Returns any error encountered that
Expand All @@ -38,9 +45,13 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
return PutRes{}, ErrReadOnlyMode
}

data, err := prm.obj.Marshal()
if err != nil {
return PutRes{}, fmt.Errorf("cannot marshal object: %w", err)
var err error
data := prm.objBin
if data == nil {
data, err = prm.obj.Marshal()
if err != nil {
return PutRes{}, fmt.Errorf("cannot marshal object: %w", err)
}
}

var putPrm common.PutPrm // form Put parameters
Expand Down Expand Up @@ -71,6 +82,9 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
if !m.NoMetabase() {
var pPrm meta.PutPrm
pPrm.SetObject(prm.obj)
if prm.objBin != nil {
pPrm.SetHeaderBinary(prm.objBin[:prm.hdrLen])
}
pPrm.SetStorageID(res.StorageID)
if _, err := s.metaBase.Put(pPrm); err != nil {
// may we need to handle this case in a special way
Expand Down
48 changes: 36 additions & 12 deletions pkg/services/object/put/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ package putsvc

import (
"fmt"
"math"
"sync"
"sync/atomic"

"github.com/nspcc-dev/neofs-node/pkg/core/object"
svcutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/util"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)

type preparedObjectTarget interface {
WriteObject(*objectSDK.Object, object.ContentMeta) error
WriteObject(*objectSDK.Object, object.ContentMeta, encodedObject) error
Close() (oid.ID, error)
}

Expand All @@ -27,8 +29,6 @@ type distributedTarget struct {
obj *objectSDK.Object
objMeta object.ContentMeta

payload []byte

nodeTargetInitializer func(nodeDesc) preparedObjectTarget

isLocalKey func([]byte) bool
Expand All @@ -38,6 +38,14 @@ type distributedTarget struct {
fmt *object.FormatValidator

log *zap.Logger

localOnly bool
localNodeInContainer bool
localNodeSigner neofscrypto.Signer
// - object if localOnly
// - replicate request if localNodeInContainer
// - payload otherwise
encodedObject encodedObject
}

// parameters and state of container traversal.
Expand Down Expand Up @@ -114,25 +122,41 @@ func (x errIncompletePut) Error() string {
return commonMsg
}

func (t *distributedTarget) WriteHeader(obj *objectSDK.Object) error {
t.obj = obj
func (t *distributedTarget) WriteHeader(hdr *objectSDK.Object) error {
payloadLen := hdr.PayloadSize()
if payloadLen > math.MaxInt {
return fmt.Errorf("too big payload of physically stored for this server %d > %d", payloadLen, math.MaxInt)
}

if t.localNodeInContainer {
var err error
if t.localOnly {
t.encodedObject, err = encodeObjectWithoutPayload(*hdr, int(payloadLen))
} else {
t.encodedObject, err = encodeReplicateRequestWithoutPayload(t.localNodeSigner, *hdr, int(payloadLen))
}
if err != nil {
return err
}
} else if payloadLen > 0 {
t.encodedObject = encodedObject{b: getBuffer(int(payloadLen))}
}

t.obj = hdr

return nil
}

func (t *distributedTarget) Write(p []byte) (n int, err error) {
t.payload = append(t.payload, p...)
t.encodedObject.b = append(t.encodedObject.b, p...)

return len(p), nil
}

func (t *distributedTarget) Close() (oid.ID, error) {
defer func() {
putPayload(t.payload)
t.payload = nil
}()
defer putBuffer(t.encodedObject.b)

t.obj.SetPayload(t.payload)
t.obj.SetPayload(t.encodedObject.b[t.encodedObject.pldOff:])

var err error

Expand All @@ -155,7 +179,7 @@ func (t *distributedTarget) sendObject(node nodeDesc) error {

target := t.nodeTargetInitializer(node)

if err := target.WriteObject(t.obj, t.objMeta); err != nil {
if err := target.WriteObject(t.obj, t.objMeta, t.encodedObject); err != nil {
return fmt.Errorf("could not write header: %w", err)
} else if _, err := target.Close(); err != nil {
return fmt.Errorf("could not close object stream: %w", err)
Expand Down
Loading

0 comments on commit 35c8edf

Please sign in to comment.