diff --git a/pkg/core/client/client.go b/pkg/core/client/client.go index 12ab3c868be..bdfdaba635f 100644 --- a/pkg/core/client/client.go +++ b/pkg/core/client/client.go @@ -2,12 +2,14 @@ package client import ( "context" + "io" rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation" @@ -19,6 +21,7 @@ import ( type Client interface { ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error ObjectPutInit(ctx context.Context, header object.Object, signer user.Signer, prm client.PrmObjectPutInit) (client.ObjectWriter, error) + ReplicateObject(ctx context.Context, src io.ReadSeeker, signer neofscrypto.Signer) error ObjectDelete(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectDelete) (oid.ID, error) ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectGet) (object.Object, *client.PayloadReader, error) ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHead) (*object.Object, error) diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index 3bec55aa683..3cc77b72d06 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "sync" "time" @@ -13,6 +14,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation" @@ -231,6 +233,25 @@ func (x *multiClient) ObjectPutInit(ctx context.Context, header objectSDK.Object return } +func (x *multiClient) ReplicateObject(ctx context.Context, src io.ReadSeeker, signer neofscrypto.Signer) error { + var errSeek error + err := x.iterateClients(ctx, func(c clientcore.Client) error { + err := c.ReplicateObject(ctx, src, signer) + if err != nil { + _, errSeek = src.Seek(0, io.SeekStart) + if errSeek != nil { + return nil + } + } + return err + }) + if err != nil { + return err + } + + return errSeek +} + func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error { return x.iterateClients(ctx, func(c clientcore.Client) error { return c.ContainerAnnounceUsedSpace(ctx, announcements, prm) diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index 75d64ed09c1..76d169b4897 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -3,12 +3,14 @@ package putsvc import ( "context" "fmt" + "io" clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -134,3 +136,32 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error { return nil } + +// ReplicateObjectToNode copies binary-encoded NeoFS object from the given +// [io.ReadSeeker] into local storage of the node described by specified +// [netmap.NodeInfo]. +func (s *RemoteSender) ReplicateObjectToNode(ctx context.Context, nodeInfo netmap.NodeInfo, src io.ReadSeeker) error { + var nodeInfoForCons clientcore.NodeInfo + + err := clientcore.NodeInfoFromRawNetmapElement(&nodeInfoForCons, netmapCore.Node(nodeInfo)) + if err != nil { + return fmt.Errorf("parse remote node info: %w", err) + } + + key, err := s.keyStorage.GetKey(nil) + if err != nil { + return fmt.Errorf("fetch local node's private key: %w", err) + } + + c, err := s.clientConstructor.Get(nodeInfoForCons) + if err != nil { + return fmt.Errorf("init NeoFS API client of the remote node: %w", err) + } + + err = c.ReplicateObject(ctx, src, (*neofsecdsa.Signer)(key)) + if err != nil { + return fmt.Errorf("copy object using NeoFS API client of the remote node: %w", err) + } + + return nil +} diff --git a/pkg/services/replicator/process.go b/pkg/services/replicator/process.go index 96606070dd4..facc91882d5 100644 --- a/pkg/services/replicator/process.go +++ b/pkg/services/replicator/process.go @@ -2,11 +2,13 @@ package replicator import ( "context" + "io" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" "github.com/nspcc-dev/neofs-sdk-go/netmap" "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // TaskResult is a replication result interface. @@ -25,9 +27,11 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) ) }() + var binObjStream io.ReadSeekCloser // set it task.obj is unset only + var err error + if task.obj == nil { - var err error - task.obj, err = engine.Get(p.localStorage, task.addr) + binObjStream, err = p.localStorage.OpenObjectStream(task.addr) if err != nil { p.log.Error("could not get object from local storage", zap.Stringer("object", task.addr), @@ -35,6 +39,13 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) return } + + defer func() { + if err := binObjStream.Close(); err != nil { + p.log.Debug("failed to close replicated object's binary stream from the local storage", + zap.Stringer("object", task.addr), zap.Error(err)) + } + }() } prm := new(putsvc.RemotePutPrm). @@ -47,6 +58,15 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) default: } + if i > 0 && binObjStream != nil { + _, err = binObjStream.Seek(0, io.SeekStart) + if err != nil { + p.log.Error("failed to seek start of the replicated object's binary stream from the local storage", + zap.Stringer("object", task.addr), zap.Error(err)) + return + } + } + log := p.log.With( zap.String("node", netmap.StringifyPublicKey(task.nodes[i])), zap.Stringer("object", task.addr), @@ -54,7 +74,17 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) callCtx, cancel := context.WithTimeout(ctx, p.putTimeout) - err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i])) + if binObjStream != nil { + err = p.remoteSender.ReplicateObjectToNode(ctx, task.nodes[i], binObjStream) + // FIXME: temporary workaround, see also + // https://github.com/nspcc-dev/neofs-api/issues/201#issuecomment-1891383454 + if st, ok := status.FromError(err); ok && st.Code() == codes.Unimplemented { + log.Debug("node does not support 'Replicate' RPC, fallback to 'Put'") + err = p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i])) + } + } else { + err = p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i])) + } cancel()