Skip to content

Commit

Permalink
replicator: Send local objects using new replication API
Browse files Browse the repository at this point in the history
It's more lightweight and supports binary copying without additional
decode-encode round.

Closes #2317.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Jan 29, 2024
1 parent 803701f commit 34f23c7
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 4 deletions.
3 changes: 3 additions & 0 deletions pkg/core/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package client

import (
"context"
"io"

rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation"
Expand All @@ -19,6 +21,7 @@ import (
type Client interface {
ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error
ObjectPutInit(ctx context.Context, header object.Object, signer user.Signer, prm client.PrmObjectPutInit) (client.ObjectWriter, error)
ReplicateObject(ctx context.Context, src io.ReadSeeker, signer neofscrypto.Signer) error
ObjectDelete(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectDelete) (oid.ID, error)
ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectGet) (object.Object, *client.PayloadReader, error)
ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHead) (*object.Object, error)
Expand Down
21 changes: 21 additions & 0 deletions pkg/network/cache/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"

Expand All @@ -13,6 +14,7 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation"
Expand Down Expand Up @@ -231,6 +233,25 @@ func (x *multiClient) ObjectPutInit(ctx context.Context, header objectSDK.Object
return
}

func (x *multiClient) ReplicateObject(ctx context.Context, src io.ReadSeeker, signer neofscrypto.Signer) error {
var errSeek error
err := x.iterateClients(ctx, func(c clientcore.Client) error {
err := c.ReplicateObject(ctx, src, signer)
if err != nil {
_, errSeek = src.Seek(0, io.SeekStart)
if errSeek != nil {
return nil
}
}
return err
})
if err != nil {
return err
}

return errSeek
}

func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error {
return x.iterateClients(ctx, func(c clientcore.Client) error {
return c.ContainerAnnounceUsedSpace(ctx, announcements, prm)
Expand Down
31 changes: 31 additions & 0 deletions pkg/services/object/put/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package putsvc
import (
"context"
"fmt"
"io"

clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand Down Expand Up @@ -134,3 +136,32 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {

return nil
}

// ReplicateObjectToNode copies binary-encoded NeoFS object from the given
// [io.ReadSeeker] into local storage of the node described by specified
// [netmap.NodeInfo].
func (s *RemoteSender) ReplicateObjectToNode(ctx context.Context, nodeInfo netmap.NodeInfo, src io.ReadSeeker) error {
var nodeInfoForCons clientcore.NodeInfo

err := clientcore.NodeInfoFromRawNetmapElement(&nodeInfoForCons, netmapCore.Node(nodeInfo))
if err != nil {
return fmt.Errorf("parse remote node info: %w", err)
}

key, err := s.keyStorage.GetKey(nil)
if err != nil {
return fmt.Errorf("fetch local node's private key: %w", err)
}

c, err := s.clientConstructor.Get(nodeInfoForCons)
if err != nil {
return fmt.Errorf("init NeoFS API client of the remote node: %w", err)
}

err = c.ReplicateObject(ctx, src, (*neofsecdsa.Signer)(key))
if err != nil {
return fmt.Errorf("copy object using NeoFS API client of the remote node: %w", err)
}

return nil
}
38 changes: 34 additions & 4 deletions pkg/services/replicator/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package replicator

import (
"context"
"io"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// TaskResult is a replication result interface.
Expand All @@ -25,16 +27,25 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
)
}()

var binObjStream io.ReadSeekCloser // set it task.obj is unset only
var err error

if task.obj == nil {
var err error
task.obj, err = engine.Get(p.localStorage, task.addr)
binObjStream, err = p.localStorage.OpenObjectStream(task.addr)
if err != nil {
p.log.Error("could not get object from local storage",
zap.Stringer("object", task.addr),
zap.Error(err))

return
}

defer func() {
if err := binObjStream.Close(); err != nil {
p.log.Debug("failed to close replicated object's binary stream from the local storage",
zap.Stringer("object", task.addr), zap.Error(err))
}
}()
}

prm := new(putsvc.RemotePutPrm).
Expand All @@ -47,14 +58,33 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
default:
}

if i > 0 && binObjStream != nil {
_, err = binObjStream.Seek(0, io.SeekStart)
if err != nil {
p.log.Error("failed to seek start of the replicated object's binary stream from the local storage",
zap.Stringer("object", task.addr), zap.Error(err))
return
}
}

log := p.log.With(
zap.String("node", netmap.StringifyPublicKey(task.nodes[i])),
zap.Stringer("object", task.addr),
)

callCtx, cancel := context.WithTimeout(ctx, p.putTimeout)

err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i]))
if binObjStream != nil {
err = p.remoteSender.ReplicateObjectToNode(ctx, task.nodes[i], binObjStream)
// FIXME: temporary workaround, see also
// https://github.com/nspcc-dev/neofs-api/issues/201#issuecomment-1891383454
if st, ok := status.FromError(err); ok && st.Code() == codes.Unimplemented {
log.Debug("node does not support 'Replicate' RPC, fallback to 'Put'")
err = p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i]))
}
} else {
err = p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i]))
}

cancel()

Expand Down

0 comments on commit 34f23c7

Please sign in to comment.