diff --git a/cmd/neofs-node/api.go b/cmd/neofs-node/api.go index 5c1877dc08e..5fd508f00ea 100644 --- a/cmd/neofs-node/api.go +++ b/cmd/neofs-node/api.go @@ -20,6 +20,8 @@ type api struct { clients *coreClientConstructor } +type apiAdapter api + // TODO: docs. func (x *api) ReplicateToNode(ctx context.Context, req []byte, node netmap.NodeInfo) error { var n coreclient.NodeInfo @@ -28,7 +30,12 @@ func (x *api) ReplicateToNode(ctx context.Context, req []byte, node netmap.NodeI return fmt.Errorf("parse info about storage node from the network map: %w", err) } - mc, err := x.clients.Get(n) + return (*apiAdapter)(x).ReplicateToNode(ctx, req, n) +} + +// TODO: docs. +func (x *apiAdapter) ReplicateToNode(ctx context.Context, req []byte, node coreclient.NodeInfo) error { + mc, err := x.clients.Get(node) if err != nil { return fmt.Errorf("connect to node: %w", err) } diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 87d68d7a161..3f5e1d52bc5 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -243,7 +243,7 @@ func initObjectService(c *cfg) { } } - sPut := putsvc.NewService( + sPut := putsvc.NewService(&apiAdapter{clients: putConstructor}, putsvc.WithKeyStorage(keyStorage), putsvc.WithClientConstructor(putConstructor), putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)), @@ -558,8 +558,8 @@ func (e engineWithNotifications) Lock(locker oid.Address, toLock []oid.ID) error return e.base.Lock(locker, toLock) } -func (e engineWithNotifications) Put(o *objectSDK.Object) error { - if err := e.base.Put(o); err != nil { +func (e engineWithNotifications) Put(o *objectSDK.Object, objBin []byte, hdrLen int) error { + if err := e.base.Put(o, objBin, hdrLen); err != nil { return err } @@ -606,6 +606,10 @@ func (e engineWithoutNotifications) Lock(locker oid.Address, toLock []oid.ID) er return e.engine.Lock(locker.Container(), locker.Object(), toLock) } -func (e engineWithoutNotifications) Put(o *objectSDK.Object) error { - return engine.Put(e.engine, o) +func (e engineWithoutNotifications) Put(o *objectSDK.Object, objBin []byte, hdrLen int) error { + var putPrm engine.PutPrm + putPrm.WithObject(o) + putPrm.SetObjectBinary(objBin, hdrLen) + _, err := e.engine.Put(putPrm) + return err } diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 9be104dba1f..e8cb94315c2 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -152,7 +152,7 @@ mainLoop: if _, ok := shardMap[shards[j].ID().String()]; ok { continue } - putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, getRes.Object()) + putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, getRes.Object(), nil, 0) if putDone || exists { if putDone { e.log.Debug("object is moved to another shard", diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 67f80664bf9..ad06d78b6ef 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -15,7 +15,9 @@ import ( // PutPrm groups the parameters of Put operation. type PutPrm struct { - obj *objectSDK.Object + obj *objectSDK.Object + objBin []byte + hdrLen int } // PutRes groups the resulting values of Put operation. @@ -30,6 +32,11 @@ func (p *PutPrm) WithObject(obj *objectSDK.Object) { p.obj = obj } +// TODO: docs +func (p *PutPrm) SetObjectBinary(objBin []byte, hdrLen int) { + p.objBin, p.hdrLen = objBin, hdrLen +} + // Put saves the object to local storage. // // Returns any error encountered that @@ -72,7 +79,7 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { return false } - putDone, exists := e.putToShard(sh, ind, pool, addr, prm.obj) + putDone, exists := e.putToShard(sh, ind, pool, addr, prm.obj, prm.objBin, prm.hdrLen) finished = putDone || exists return finished }) @@ -87,7 +94,8 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { // putToShard puts object to sh. // First return value is true iff put has been successfully done. // Second return value is true iff object already exists. -func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) { +func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, + objBin []byte, hdrLen int) (bool, bool) { var putSuccess, alreadyExists bool exitCh := make(chan struct{}) @@ -129,6 +137,7 @@ func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool var putPrm shard.PutPrm putPrm.SetObject(obj) + putPrm.SetObjectBinary(objBin, hdrLen) _, err = sh.Put(putPrm) if err != nil { diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index ad8999e31e5..9831648e783 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -26,6 +26,8 @@ type PutPrm struct { obj *objectSDK.Object id []byte + + hdrBin []byte } // PutRes groups the resulting values of Put operation. @@ -41,6 +43,11 @@ func (p *PutPrm) SetStorageID(id []byte) { p.id = id } +// TODO: docs +func (p *PutPrm) SetHeaderBinary(hdrBin []byte) { + p.hdrBin = hdrBin +} + var ( ErrUnknownObjectType = errors.New("unknown object type") ErrIncorrectSplitInfoUpdate = errors.New("updating split info on object without it") @@ -64,7 +71,7 @@ func (db *DB) Put(prm PutPrm) (res PutRes, err error) { currEpoch := db.epochState.CurrentEpoch() err = db.boltDB.Batch(func(tx *bbolt.Tx) error { - return db.put(tx, prm.obj, prm.id, nil, currEpoch) + return db.put(tx, prm.obj, prm.id, nil, currEpoch, prm.hdrBin) }) if err == nil { storagelog.Write(db.log, @@ -77,7 +84,7 @@ func (db *DB) Put(prm PutPrm) (res PutRes, err error) { func (db *DB) put( tx *bbolt.Tx, obj *objectSDK.Object, id []byte, - si *objectSDK.SplitInfo, currEpoch uint64) error { + si *objectSDK.SplitInfo, currEpoch uint64, hdrBin []byte) error { cnr, ok := obj.ContainerID() if !ok { return errors.New("missing container in object") @@ -118,13 +125,13 @@ func (db *DB) put( return err } - err = db.put(tx, par, id, parentSI, currEpoch) + err = db.put(tx, par, id, parentSI, currEpoch, nil) if err != nil { return err } } - err = putUniqueIndexes(tx, obj, si, id) + err = putUniqueIndexes(tx, obj, si, id, hdrBin) if err != nil { return fmt.Errorf("can't put unique indexes: %w", err) } @@ -169,6 +176,7 @@ func putUniqueIndexes( obj *objectSDK.Object, si *objectSDK.SplitInfo, id []byte, + hdrBin []byte, ) error { isParent := si != nil addr := objectCore.AddressOf(obj) @@ -191,15 +199,18 @@ func putUniqueIndexes( return ErrUnknownObjectType } - rawObject, err := obj.CutPayload().Marshal() - if err != nil { - return fmt.Errorf("can't marshal object header: %w", err) + var err error + if hdrBin == nil { + hdrBin, err = obj.CutPayload().Marshal() + if err != nil { + return fmt.Errorf("can't marshal object header: %w", err) + } } err = putUniqueIndexItem(tx, namedBucketItem{ name: bucketName, key: objKey, - val: rawObject, + val: hdrBin, }) if err != nil { return err diff --git a/pkg/local_object_storage/shard/put.go b/pkg/local_object_storage/shard/put.go index dfbd4de97e3..9bc22df5b7a 100644 --- a/pkg/local_object_storage/shard/put.go +++ b/pkg/local_object_storage/shard/put.go @@ -12,7 +12,9 @@ import ( // PutPrm groups the parameters of Put operation. type PutPrm struct { - obj *object.Object + obj *object.Object + objBin []byte + hdrLen int } // PutRes groups the resulting values of Put operation. @@ -23,6 +25,11 @@ func (p *PutPrm) SetObject(obj *object.Object) { p.obj = obj } +// TODO: docs +func (p *PutPrm) SetObjectBinary(objBin []byte, hdrLen int) { + p.objBin, p.hdrLen = objBin, hdrLen +} + // Put saves the object in shard. // // Returns any error encountered that @@ -38,9 +45,13 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) { return PutRes{}, ErrReadOnlyMode } - data, err := prm.obj.Marshal() - if err != nil { - return PutRes{}, fmt.Errorf("cannot marshal object: %w", err) + var err error + data := prm.objBin + if data == nil { + data, err = prm.obj.Marshal() + if err != nil { + return PutRes{}, fmt.Errorf("cannot marshal object: %w", err) + } } var putPrm common.PutPrm // form Put parameters @@ -71,6 +82,9 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) { if !m.NoMetabase() { var pPrm meta.PutPrm pPrm.SetObject(prm.obj) + if prm.objBin != nil { + pPrm.SetHeaderBinary(prm.objBin[:prm.hdrLen]) + } pPrm.SetStorageID(res.StorageID) if _, err := s.metaBase.Put(pPrm); err != nil { // may we need to handle this case in a special way diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 2d1814e74ed..4dfc1fe007d 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -2,6 +2,7 @@ package putsvc import ( "fmt" + "math" "sync" "sync/atomic" @@ -9,13 +10,14 @@ import ( svcutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/util" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" ) type preparedObjectTarget interface { - WriteObject(*objectSDK.Object, object.ContentMeta) error + WriteObject(*objectSDK.Object, object.ContentMeta, encodedObject) error Close() (oid.ID, error) } @@ -27,8 +29,6 @@ type distributedTarget struct { obj *objectSDK.Object objMeta object.ContentMeta - payload []byte - nodeTargetInitializer func(nodeDesc) preparedObjectTarget isLocalKey func([]byte) bool @@ -38,6 +38,14 @@ type distributedTarget struct { fmt *object.FormatValidator log *zap.Logger + + localOnly bool + localNodeInContainer bool + localNodeSigner neofscrypto.Signer + // - object if localOnly + // - replicate request if localNodeInContainer + // - payload otherwise + encodedObject encodedObject } // parameters and state of container traversal. @@ -114,25 +122,41 @@ func (x errIncompletePut) Error() string { return commonMsg } -func (t *distributedTarget) WriteHeader(obj *objectSDK.Object) error { - t.obj = obj +func (t *distributedTarget) WriteHeader(hdr *objectSDK.Object) error { + payloadLen := hdr.PayloadSize() + if payloadLen > math.MaxInt { + return fmt.Errorf("too big payload of physically stored for this server %d > %d", payloadLen, math.MaxInt) + } + + if t.localNodeInContainer { + var err error + if t.localOnly { + t.encodedObject, err = encodeObjectWithoutPayload(*hdr, int(payloadLen)) + } else { + t.encodedObject, err = encodeReplicateRequestWithoutPayload(t.localNodeSigner, *hdr, int(payloadLen)) + } + if err != nil { + return err + } + } else if payloadLen > 0 { + t.encodedObject = encodedObject{b: getBuffer(int(payloadLen))} + } + + t.obj = hdr return nil } func (t *distributedTarget) Write(p []byte) (n int, err error) { - t.payload = append(t.payload, p...) + t.encodedObject.b = append(t.encodedObject.b, p...) return len(p), nil } func (t *distributedTarget) Close() (oid.ID, error) { - defer func() { - putPayload(t.payload) - t.payload = nil - }() + defer putBuffer(t.encodedObject.b) - t.obj.SetPayload(t.payload) + t.obj.SetPayload(t.encodedObject.b[t.encodedObject.pldOff:]) var err error @@ -155,7 +179,7 @@ func (t *distributedTarget) sendObject(node nodeDesc) error { target := t.nodeTargetInitializer(node) - if err := target.WriteObject(t.obj, t.objMeta); err != nil { + if err := target.WriteObject(t.obj, t.objMeta, t.encodedObject); err != nil { return fmt.Errorf("could not write header: %w", err) } else if _, err := target.Close(); err != nil { return fmt.Errorf("could not close object stream: %w", err) diff --git a/pkg/services/object/put/local.go b/pkg/services/object/put/local.go index 69feecbc4e0..06f9e1cb15b 100644 --- a/pkg/services/object/put/local.go +++ b/pkg/services/object/put/local.go @@ -18,7 +18,8 @@ import ( type ObjectStorage interface { // Put must save passed object // and return any appeared error. - Put(*object.Object) error + // Optional objBin represents obj in a canonical NeoFS format. + Put(obj *object.Object, objBin []byte, hdrLen int) error // Delete must delete passed objects // and return any appeared error. Delete(tombstone oid.Address, toDelete []oid.ID) error @@ -34,17 +35,19 @@ type localTarget struct { obj *object.Object meta objectCore.ContentMeta + enc encodedObject } -func (t *localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMeta) error { +func (t *localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMeta, enc encodedObject) error { t.obj = obj t.meta = meta + t.enc = enc return nil } func (t *localTarget) Close() (oid.ID, error) { - err := putObjectLocally(t.storage, t.obj, t.meta) + err := putObjectLocally(t.storage, t.obj, t.meta, &t.enc) if err != nil { return oid.ID{}, err } @@ -54,7 +57,7 @@ func (t *localTarget) Close() (oid.ID, error) { return id, nil } -func putObjectLocally(storage ObjectStorage, obj *object.Object, meta objectCore.ContentMeta) error { +func putObjectLocally(storage ObjectStorage, obj *object.Object, meta objectCore.ContentMeta, enc *encodedObject) error { switch meta.Type() { case object.TypeTombstone: err := storage.Delete(objectCore.AddressOf(obj), meta.Objects()) @@ -70,7 +73,14 @@ func putObjectLocally(storage ObjectStorage, obj *object.Object, meta objectCore // objects that do not change meta storage } - if err := storage.Put(obj); err != nil { + var objBin []byte + var hdrLen int + if enc != nil && enc.pldOff > 0 { + objBin = enc.b[enc.hdrOff:] + hdrLen = enc.pldFldOff - enc.hdrOff + } + + if err := storage.Put(obj, objBin, hdrLen); err != nil { return fmt.Errorf("could not put object to local storage: %w", err) } @@ -150,5 +160,5 @@ func (p *Service) ValidateAndStoreObjectLocally(cnrID cid.ID, obj object.Object) } } - return putObjectLocally(p.localStore, &obj, objMeta) + return putObjectLocally(p.localStore, &obj, objMeta, nil) } diff --git a/pkg/services/object/put/pool.go b/pkg/services/object/put/pool.go index 705273227d6..6dabcced527 100644 --- a/pkg/services/object/put/pool.go +++ b/pkg/services/object/put/pool.go @@ -4,17 +4,20 @@ import ( "sync" ) -const defaultAllocSize = 1024 +var buffers sync.Pool -var putBytesPool = &sync.Pool{ - New: func() any { return make([]byte, 0, defaultAllocSize) }, +func getBuffer(cp int) []byte { + b, ok := buffers.Get().([]byte) + if ok { + if cap(b) >= cp { + return b + } + buffers.Put(b) + } + return make([]byte, 0, cp) } -func getPayload() []byte { - return putBytesPool.Get().([]byte) -} - -func putPayload(p []byte) { +func putBuffer(p []byte) { //nolint:staticcheck - putBytesPool.Put(p[:0]) + buffers.Put(p[:0]) } diff --git a/pkg/services/object/put/pool_test.go b/pkg/services/object/put/pool_test.go new file mode 100644 index 00000000000..aba79985cd1 --- /dev/null +++ b/pkg/services/object/put/pool_test.go @@ -0,0 +1,19 @@ +package putsvc + +import ( + "math/rand" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGetPayload(t *testing.T) { + for i := 0; i < 100; i++ { + cp := rand.Int() % 1024 + + b := getBuffer(cp) + require.GreaterOrEqual(t, cap(b), cp) + + putBuffer(b) + } +} diff --git a/pkg/services/object/put/prm.go b/pkg/services/object/put/prm.go index d7dbada2d42..229f894e0c9 100644 --- a/pkg/services/object/put/prm.go +++ b/pkg/services/object/put/prm.go @@ -5,6 +5,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" containerSDK "github.com/nspcc-dev/neofs-sdk-go/container" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" "github.com/nspcc-dev/neofs-sdk-go/object" ) @@ -20,6 +21,9 @@ type PutInitPrm struct { copiesNumber uint32 relay func(client.NodeInfo, client.MultiAddressClient) error + + localNodeInContainer bool + localNodeSigner neofscrypto.Signer } type PutChunkPrm struct { diff --git a/pkg/services/object/put/proto.go b/pkg/services/object/put/proto.go new file mode 100644 index 00000000000..62858de41d1 --- /dev/null +++ b/pkg/services/object/put/proto.go @@ -0,0 +1,118 @@ +package putsvc + +import ( + "errors" + "fmt" + "math" + + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + "github.com/nspcc-dev/neofs-sdk-go/object" + "google.golang.org/protobuf/encoding/protowire" +) + +// TODO: share common code with Replicator + +const ( + _ = iota + fieldNumSigPubKey + fieldNumSigVal + fieldNumSigScheme +) + +const ( + fieldNumObjectPayload = 4 +) + +const ( + _ = iota + fieldNumReplicateObject + fieldNumReplicateSignature +) + +type encodedObject struct { + b []byte + + hdrOff int + pldFldOff int + pldOff int +} + +func encodeObjectWithoutPayload(hdr object.Object, pldLen int) (encodedObject, error) { + var res encodedObject + + hdrv2 := hdr.ToV2() + hdrLen := hdrv2.StableSize() + pldFldLen := protowire.SizeTag(fieldNumObjectPayload) + protowire.SizeBytes(pldLen) + if pldFldLen > math.MaxInt-hdrLen { + return res, fmt.Errorf("binary object is too big for this server: %d+%d>%d", hdrLen, pldLen, math.MaxInt) + } + + res.b = getBuffer(hdrLen + pldFldLen) + + res.b = res.b[:hdrLen] + hdrv2.StableMarshal(res.b) + res.pldFldOff = len(res.b) + res.b = protowire.AppendTag(res.b, fieldNumObjectPayload, protowire.BytesType) + res.b = protowire.AppendVarint(res.b, uint64(pldLen)) + res.pldOff = len(res.b) + + return res, nil +} + +func encodeReplicateRequestWithoutPayload(signer neofscrypto.Signer, hdr object.Object, pldLen int) (encodedObject, error) { + var res encodedObject + id, ok := hdr.ID() + if !ok { + return res, errors.New("missing object ID") + } + + sig, err := signer.Sign(id[:]) + if err != nil { + return res, fmt.Errorf("sign object ID: %w", err) + } + + hdrv2 := hdr.ToV2() + hdrLen := hdrv2.StableSize() + pldFldLen := protowire.SizeTag(fieldNumObjectPayload) + protowire.SizeBytes(pldLen) + if pldFldLen > math.MaxInt-hdrLen { + return res, fmt.Errorf("binary object is too big for this server: %d+%d>%d", hdrLen, pldFldLen, math.MaxInt) + } + + pubKey := neofscrypto.PublicKeyBytes(signer.Public()) + sigScheme := signer.Scheme() + + sigFldLen := protowire.SizeTag(fieldNumSigPubKey) + protowire.SizeBytes(len(pubKey)) + + protowire.SizeTag(fieldNumSigVal) + protowire.SizeBytes(len(sig)) + + protowire.SizeTag(fieldNumSigScheme) + protowire.SizeVarint(uint64(sigScheme)) + fullLen := protowire.SizeTag(fieldNumReplicateSignature) + protowire.SizeBytes(sigFldLen) + + protowire.SizeTag(fieldNumReplicateObject) + objFldLen := hdrLen + pldFldLen + if protowire.SizeBytes(objFldLen) > math.MaxInt-fullLen { + return res, fmt.Errorf("replicate request exceeds server limit %d", math.MaxInt) + } + fullLen += protowire.SizeBytes(objFldLen) + + res.b = getBuffer(fullLen) + + // signature + res.b = protowire.AppendTag(res.b, fieldNumReplicateSignature, protowire.BytesType) + res.b = protowire.AppendVarint(res.b, uint64(sigFldLen)) + res.b = protowire.AppendTag(res.b, fieldNumSigPubKey, protowire.BytesType) + res.b = protowire.AppendBytes(res.b, pubKey) + res.b = protowire.AppendTag(res.b, fieldNumSigVal, protowire.BytesType) + res.b = protowire.AppendBytes(res.b, sig) + res.b = protowire.AppendTag(res.b, fieldNumSigScheme, protowire.VarintType) + res.b = protowire.AppendVarint(res.b, uint64(sigScheme)) + // object + res.b = protowire.AppendTag(res.b, fieldNumReplicateObject, protowire.BytesType) + res.b = protowire.AppendVarint(res.b, uint64(objFldLen)) + res.hdrOff = len(res.b) + res.b = res.b[:len(res.b)+hdrLen] + hdrv2.StableMarshal(res.b[res.hdrOff:]) + res.pldFldOff = len(res.b) + res.b = protowire.AppendTag(res.b, fieldNumObjectPayload, protowire.BytesType) + res.b = protowire.AppendVarint(res.b, uint64(pldLen)) + res.pldOff = len(res.b) + + return res, nil +} diff --git a/pkg/services/object/put/proto_test.go b/pkg/services/object/put/proto_test.go new file mode 100644 index 00000000000..03593c5f7ba --- /dev/null +++ b/pkg/services/object/put/proto_test.go @@ -0,0 +1,58 @@ +package putsvc + +import ( + "math/rand" + "testing" + + objectv2 "github.com/nspcc-dev/neofs-api-go/v2/object" + objectgrpc "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" + "github.com/nspcc-dev/neofs-api-go/v2/refs" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + "github.com/nspcc-dev/neofs-sdk-go/crypto/test" + "github.com/nspcc-dev/neofs-sdk-go/object" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" +) + +func TestUnaryReplicateRequest(t *testing.T) { + // prepare replicated object + payload := make([]byte, 1024) + rand.Read(payload) + obj := objecttest.Object(t) + obj.SetPayload(payload) + id := oidtest.ID() + obj.SetID(id) + hdr := *obj.CutPayload() + obj.SetPayloadSize(uint64(len(payload))) + signer := test.RandomSigner(t) + + // prepare request + r, err := encodeReplicateRequestWithoutPayload(signer, hdr, len(payload)) + require.NoError(t, err) + require.Equal(t, len(payload), cap(r.b)-r.pldOff) + require.Equal(t, len(payload), cap(r.b)-len(r.b)) + + r.b = append(r.b, payload...) + + // decode request + var req objectgrpc.ReplicateRequest + require.NoError(t, proto.Unmarshal(r.b, &req)) + + // check signature + require.Equal(t, neofscrypto.PublicKeyBytes(signer.Public()), req.Signature.Key) + require.EqualValues(t, signer.Scheme(), req.Signature.Scheme) + + var sigv2 refs.Signature + require.NoError(t, sigv2.FromGRPCMessage(req.Signature)) + var sig neofscrypto.Signature + require.NoError(t, sig.ReadFromV2(sigv2)) + require.True(t, sig.Verify(id[:])) + + // check object + var objv2 objectv2.Object + require.NoError(t, objv2.FromGRPCMessage(req.Object)) + obj2 := *object.NewFromV2(&objv2) + require.Equal(t, obj, obj2) +} diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index ff414b3e1fa..b4956b37f7e 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -23,8 +23,10 @@ type remoteTarget struct { nodeInfo clientcore.NodeInfo obj *object.Object + enc encodedObject clientConstructor ClientConstructor + transport Transport } // RemotePutPrm groups remote put operation parameters. @@ -34,13 +36,22 @@ type RemotePutPrm struct { obj *object.Object } -func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta) error { +func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta, enc encodedObject) error { t.obj = obj - + t.enc = enc return nil } func (t *remoteTarget) Close() (oid.ID, error) { + if t.enc.hdrOff > 0 { + err := t.transport.ReplicateToNode(t.ctx, t.enc.b, t.nodeInfo) + if err != nil { + return oid.ID{}, fmt.Errorf("replicate object: %w", err) + } + id, _ := t.obj.ID() + return id, nil + } + var sessionInfo *util.SessionInfo if tok := t.commonPrm.SessionToken(); tok != nil { diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index 48067f6524d..134ae075732 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -22,10 +22,16 @@ type MaxSizeSource interface { type Service struct { *cfg + transport Transport } type Option func(*cfg) +// TODO: docs. +type Transport interface { + ReplicateToNode(ctx context.Context, req []byte, node client.NodeInfo) error +} + type ClientConstructor interface { Get(client.NodeInfo) (client.MultiAddressClient, error) } @@ -64,7 +70,7 @@ func defaultCfg() *cfg { } } -func NewService(opts ...Option) *Service { +func NewService(transport Transport, opts ...Option) *Service { c := defaultCfg() for i := range opts { @@ -74,14 +80,16 @@ func NewService(opts ...Option) *Service { c.fmtValidator = object.NewFormatValidator(c.fmtValidatorOpts...) return &Service{ - cfg: c, + cfg: c, + transport: transport, } } func (p *Service) Put(ctx context.Context) (*Streamer, error) { return &Streamer{ - cfg: p.cfg, - ctx: ctx, + cfg: p.cfg, + ctx: ctx, + transport: p.transport, }, nil } diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index ef67be51094..9d7f7e2a793 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -25,6 +25,8 @@ type Streamer struct { relay func(client.NodeInfo, client.MultiAddressClient) error maxPayloadSz uint64 // network config + + transport Transport } var errNotInit = errors.New("stream not initialized") @@ -141,6 +143,11 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error { func (p *Streamer) preparePrm(prm *PutInitPrm) error { var err error + localNodeKey, err := p.keyStorage.GetKey(nil) + if err != nil { + return fmt.Errorf("get local node's private key: %w", err) + } + // get latest network map nm, err := netmap.GetLatestNetworkMap(p.netMapSrc) if err != nil { @@ -186,9 +193,25 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error { builder = util.NewLocalPlacement(builder, p.netmapKeys) } + nodeSets, err := builder.BuildPlacement(idCnr, nil, cnrInfo.Value.PlacementPolicy()) + if err != nil { + return fmt.Errorf("apply container's storage policy to current network map: %w", err) + } +nextSet: + for i := range nodeSets { + for j := range nodeSets[i] { + prm.localNodeInContainer = p.netmapKeys.IsLocalKey(nodeSets[i][j].PublicKey()) + if prm.localNodeInContainer { + break nextSet + } + } + } + // set placement builder prm.traverseOpts = append(prm.traverseOpts, placement.UseBuilder(builder)) + prm.localNodeSigner = (*neofsecdsa.Signer)(localNodeKey) + return nil } @@ -212,7 +235,8 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) internal.Target { // enable additional container broadcast on non-local operation // if object has TOMBSTONE or LOCK type. typ := prm.hdr.Type() - withBroadcast := !prm.common.LocalOnly() && (typ == object.TypeTombstone || typ == object.TypeLock) + localOnly := prm.common.LocalOnly() + withBroadcast := !localOnly && (typ == object.TypeTombstone || typ == object.TypeLock) return &distributedTarget{ traversal: traversal{ @@ -220,7 +244,6 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) internal.Target { extraBroadcastEnabled: withBroadcast, }, - payload: getPayload(), remotePool: p.remotePool, localPool: p.localPool, nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget { @@ -235,6 +258,7 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) internal.Target { keyStorage: p.keyStorage, commonPrm: prm.common, clientConstructor: p.clientConstructor, + transport: p.transport, } client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.info) @@ -246,6 +270,10 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) internal.Target { log: p.log, isLocalKey: p.netmapKeys.IsLocalKey, + + localOnly: localOnly, + localNodeInContainer: prm.localNodeInContainer, + localNodeSigner: prm.localNodeSigner, } }