From ecb3a89fd59450236f4c709dcbbf822c5de69da3 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 5 Mar 2024 13:51:01 +0400 Subject: [PATCH] replicator: Send local objects using new replication API It's more lightweight and supports binary copying without additional decode-encode round. Based on the fact that if the object is fixed, the request remains unchanged. According to this, transport message is encoded once and sent to all nodes. Closes #2317. Refs #2316. Signed-off-by: Leonard Lyubich --- CHANGELOG.md | 1 + go.mod | 2 +- go.sum | 4 ++-- pkg/core/client/client.go | 3 +++ pkg/network/cache/multi.go | 20 +++++++++++++++++++ pkg/services/object/put/remote.go | 31 ++++++++++++++++++++++++++++++ pkg/services/replicator/process.go | 30 +++++++++++++++++++++-------- 7 files changed, 80 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd631157a7..f23773aa55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ Changelog for NeoFS Node - Storage nodes no longer accept objects with header larger than 16KB (#2749) - IR sends NeoFS chain GAS to netmap nodes every epoch, not per a configurable blocks number (#2777) - Big objects are split with the new split scheme (#2667) +- Background replicator transfers objects using new `ObjectService.Replicate` RPC (#2317) ### Removed - Object notifications incl. NATS (#2750) diff --git a/go.mod b/go.mod index d988f15bd7..b5e6248abc 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/nspcc-dev/neo-go v0.105.1 github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240228163253-cb87bbd5e4eb github.com/nspcc-dev/neofs-contract v0.19.1 - github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240228185329-d1bb0881274a + github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240305154614-871e23ecd9bd github.com/nspcc-dev/tzhash v1.8.0 github.com/olekukonko/tablewriter v0.0.5 github.com/panjf2000/ants/v2 v2.9.0 diff --git a/go.sum b/go.sum index 139303b84c..0185c31e11 100644 --- a/go.sum +++ b/go.sum @@ -140,8 +140,8 @@ github.com/nspcc-dev/neofs-contract v0.19.1 h1:U1Uh+MlzfkalO0kRJ2pADZyHrmAOroC6K github.com/nspcc-dev/neofs-contract v0.19.1/go.mod h1:ZOGouuwuHpgvYkx/LCGufGncIzEUhYEO18LL4cWEbyw= github.com/nspcc-dev/neofs-crypto v0.4.1 h1:B6S0zXMWrVFlf/GlII6xKRGWU0VE7dHM+QkoKAO7AQA= github.com/nspcc-dev/neofs-crypto v0.4.1/go.mod h1:0SHn+sSn+lrrIvonLR8MgbOlBhXZKhc4rw/l2htYeA0= -github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240228185329-d1bb0881274a h1:YbLj8AtTVGvQ5Mi482dmftKIimqTsI5OXxjIPJefzEo= -github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240228185329-d1bb0881274a/go.mod h1:0WwnMTpMvbeKkU57+aLRtpOB7vu0eIpz7bu342ng8Gk= +github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240305154614-871e23ecd9bd h1:ncLz0cc1A2qVDq/TURvQghr9PI7tMVve9LUSpi13COs= +github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240305154614-871e23ecd9bd/go.mod h1:0WwnMTpMvbeKkU57+aLRtpOB7vu0eIpz7bu342ng8Gk= github.com/nspcc-dev/rfc6979 v0.2.1 h1:8wWxkamHWFmO790GsewSoKUSJjVnL1fmdRpokU/RgRM= github.com/nspcc-dev/rfc6979 v0.2.1/go.mod h1:Tk7h5kyUWkhjyO3zUgFFhy1v2vQv3BvQEntakdtqrWc= github.com/nspcc-dev/tzhash v1.8.0 h1:pJvzME2mZzP/h5rcy/Wb6amT9FJBFeKbJ3HEnWEeUpY= diff --git a/pkg/core/client/client.go b/pkg/core/client/client.go index 12ab3c868b..7e42f31006 100644 --- a/pkg/core/client/client.go +++ b/pkg/core/client/client.go @@ -2,12 +2,14 @@ package client import ( "context" + "io" rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation" @@ -19,6 +21,7 @@ import ( type Client interface { ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error ObjectPutInit(ctx context.Context, header object.Object, signer user.Signer, prm client.PrmObjectPutInit) (client.ObjectWriter, error) + ReplicateObject(ctx context.Context, id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) error ObjectDelete(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectDelete) (oid.ID, error) ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectGet) (object.Object, *client.PayloadReader, error) ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHead) (*object.Object, error) diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index 3bec55aa68..9418418cff 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "sync" "time" @@ -13,6 +14,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation" @@ -231,6 +233,24 @@ func (x *multiClient) ObjectPutInit(ctx context.Context, header objectSDK.Object return } +func (x *multiClient) ReplicateObject(ctx context.Context, id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) error { + var errSeek error + err := x.iterateClients(ctx, func(c clientcore.Client) error { + err := c.ReplicateObject(ctx, id, src, signer) + if err != nil { + _, errSeek = src.Seek(0, io.SeekStart) + if errSeek != nil { + return nil // to break the iterator + } + } + return err + }) + if err != nil { + return err + } + return errSeek +} + func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error { return x.iterateClients(ctx, func(c clientcore.Client) error { return c.ContainerAnnounceUsedSpace(ctx, announcements, prm) diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index 75d64ed09c..acbc95e000 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -3,12 +3,14 @@ package putsvc import ( "context" "fmt" + "io" clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -134,3 +136,32 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error { return nil } + +// ReplicateObjectToNode copies binary-encoded NeoFS object from the given +// [io.ReadSeeker] into local storage of the node described by specified +// [netmap.NodeInfo]. +func (s *RemoteSender) ReplicateObjectToNode(ctx context.Context, id oid.ID, src io.ReadSeeker, nodeInfo netmap.NodeInfo) error { + var nodeInfoForCons clientcore.NodeInfo + + err := clientcore.NodeInfoFromRawNetmapElement(&nodeInfoForCons, netmapCore.Node(nodeInfo)) + if err != nil { + return fmt.Errorf("parse remote node info: %w", err) + } + + key, err := s.keyStorage.GetKey(nil) + if err != nil { + return fmt.Errorf("fetch local node's private key: %w", err) + } + + c, err := s.clientConstructor.Get(nodeInfoForCons) + if err != nil { + return fmt.Errorf("init NeoFS API client of the remote node: %w", err) + } + + err = c.ReplicateObject(ctx, id, src, (*neofsecdsa.Signer)(key)) + if err != nil { + return fmt.Errorf("copy object using NeoFS API client of the remote node: %w", err) + } + + return nil +} diff --git a/pkg/services/replicator/process.go b/pkg/services/replicator/process.go index 96606070dd..661ba5c4c8 100644 --- a/pkg/services/replicator/process.go +++ b/pkg/services/replicator/process.go @@ -1,10 +1,12 @@ package replicator import ( + "bytes" "context" + "io" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" + "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/netmap" "go.uber.org/zap" ) @@ -25,9 +27,12 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) ) }() - if task.obj == nil { - var err error - task.obj, err = engine.Get(p.localStorage, task.addr) + var err error + var prm *putsvc.RemotePutPrm + var stream io.ReadSeeker + binReplication := task.obj == nil + if binReplication { + b, err := p.localStorage.GetBytes(task.addr) if err != nil { p.log.Error("could not get object from local storage", zap.Stringer("object", task.addr), @@ -35,11 +40,14 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) return } + stream = bytes.NewReader(b) + if len(task.nodes) > 1 { + stream = client.DemuxReplicatedObject(stream) + } + } else { + prm = new(putsvc.RemotePutPrm).WithObject(task.obj) } - prm := new(putsvc.RemotePutPrm). - WithObject(task.obj) - for i := 0; task.quantity > 0 && i < len(task.nodes); i++ { select { case <-ctx.Done(): @@ -54,7 +62,13 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) callCtx, cancel := context.WithTimeout(ctx, p.putTimeout) - err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i])) + if binReplication { + err = p.remoteSender.ReplicateObjectToNode(callCtx, task.addr.Object(), stream, task.nodes[i]) + // note that we don't need to reset stream because it is used exactly once + // according to the client.DemuxReplicatedObject above + } else { + err = p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i])) + } cancel()