Skip to content

Commit

Permalink
replicator: Send local objects using new replication API
Browse files Browse the repository at this point in the history
It's more lightweight and supports binary copying without additional
decode-encode round.

Based on the fact that if the object is fixed, the request remains
unchanged. According to this, transport message is encoded once and sent
to all nodes.

Closes #2317. Refs #2316.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Mar 28, 2024
1 parent 5fec85b commit ecb3a89
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Changelog for NeoFS Node
- Storage nodes no longer accept objects with header larger than 16KB (#2749)
- IR sends NeoFS chain GAS to netmap nodes every epoch, not per a configurable blocks number (#2777)
- Big objects are split with the new split scheme (#2667)
- Background replicator transfers objects using new `ObjectService.Replicate` RPC (#2317)

### Removed
- Object notifications incl. NATS (#2750)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/nspcc-dev/neo-go v0.105.1
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240228163253-cb87bbd5e4eb
github.com/nspcc-dev/neofs-contract v0.19.1
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240228185329-d1bb0881274a
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240305154614-871e23ecd9bd
github.com/nspcc-dev/tzhash v1.8.0
github.com/olekukonko/tablewriter v0.0.5
github.com/panjf2000/ants/v2 v2.9.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ github.com/nspcc-dev/neofs-contract v0.19.1 h1:U1Uh+MlzfkalO0kRJ2pADZyHrmAOroC6K
github.com/nspcc-dev/neofs-contract v0.19.1/go.mod h1:ZOGouuwuHpgvYkx/LCGufGncIzEUhYEO18LL4cWEbyw=
github.com/nspcc-dev/neofs-crypto v0.4.1 h1:B6S0zXMWrVFlf/GlII6xKRGWU0VE7dHM+QkoKAO7AQA=
github.com/nspcc-dev/neofs-crypto v0.4.1/go.mod h1:0SHn+sSn+lrrIvonLR8MgbOlBhXZKhc4rw/l2htYeA0=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240228185329-d1bb0881274a h1:YbLj8AtTVGvQ5Mi482dmftKIimqTsI5OXxjIPJefzEo=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240228185329-d1bb0881274a/go.mod h1:0WwnMTpMvbeKkU57+aLRtpOB7vu0eIpz7bu342ng8Gk=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240305154614-871e23ecd9bd h1:ncLz0cc1A2qVDq/TURvQghr9PI7tMVve9LUSpi13COs=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240305154614-871e23ecd9bd/go.mod h1:0WwnMTpMvbeKkU57+aLRtpOB7vu0eIpz7bu342ng8Gk=
github.com/nspcc-dev/rfc6979 v0.2.1 h1:8wWxkamHWFmO790GsewSoKUSJjVnL1fmdRpokU/RgRM=
github.com/nspcc-dev/rfc6979 v0.2.1/go.mod h1:Tk7h5kyUWkhjyO3zUgFFhy1v2vQv3BvQEntakdtqrWc=
github.com/nspcc-dev/tzhash v1.8.0 h1:pJvzME2mZzP/h5rcy/Wb6amT9FJBFeKbJ3HEnWEeUpY=
Expand Down
3 changes: 3 additions & 0 deletions pkg/core/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package client

import (
"context"
"io"

rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation"
Expand All @@ -19,6 +21,7 @@ import (
type Client interface {
ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error
ObjectPutInit(ctx context.Context, header object.Object, signer user.Signer, prm client.PrmObjectPutInit) (client.ObjectWriter, error)
ReplicateObject(ctx context.Context, id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) error
ObjectDelete(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectDelete) (oid.ID, error)
ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectGet) (object.Object, *client.PayloadReader, error)
ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHead) (*object.Object, error)
Expand Down
20 changes: 20 additions & 0 deletions pkg/network/cache/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"

Expand All @@ -13,6 +14,7 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation"
Expand Down Expand Up @@ -231,6 +233,24 @@ func (x *multiClient) ObjectPutInit(ctx context.Context, header objectSDK.Object
return
}

func (x *multiClient) ReplicateObject(ctx context.Context, id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) error {
var errSeek error
err := x.iterateClients(ctx, func(c clientcore.Client) error {
err := c.ReplicateObject(ctx, id, src, signer)
if err != nil {
_, errSeek = src.Seek(0, io.SeekStart)
if errSeek != nil {
return nil // to break the iterator
}
}
return err
})
if err != nil {
return err
}
return errSeek
}

func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error {
return x.iterateClients(ctx, func(c clientcore.Client) error {
return c.ContainerAnnounceUsedSpace(ctx, announcements, prm)
Expand Down
31 changes: 31 additions & 0 deletions pkg/services/object/put/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package putsvc
import (
"context"
"fmt"
"io"

clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand Down Expand Up @@ -134,3 +136,32 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {

return nil
}

// ReplicateObjectToNode copies binary-encoded NeoFS object from the given
// [io.ReadSeeker] into local storage of the node described by specified
// [netmap.NodeInfo].
func (s *RemoteSender) ReplicateObjectToNode(ctx context.Context, id oid.ID, src io.ReadSeeker, nodeInfo netmap.NodeInfo) error {
var nodeInfoForCons clientcore.NodeInfo

err := clientcore.NodeInfoFromRawNetmapElement(&nodeInfoForCons, netmapCore.Node(nodeInfo))
if err != nil {
return fmt.Errorf("parse remote node info: %w", err)
}

key, err := s.keyStorage.GetKey(nil)
if err != nil {
return fmt.Errorf("fetch local node's private key: %w", err)
}

c, err := s.clientConstructor.Get(nodeInfoForCons)
if err != nil {
return fmt.Errorf("init NeoFS API client of the remote node: %w", err)
}

err = c.ReplicateObject(ctx, id, src, (*neofsecdsa.Signer)(key))
if err != nil {
return fmt.Errorf("copy object using NeoFS API client of the remote node: %w", err)
}

return nil
}
30 changes: 22 additions & 8 deletions pkg/services/replicator/process.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package replicator

import (
"bytes"
"context"
"io"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"go.uber.org/zap"
)
Expand All @@ -25,21 +27,27 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
)
}()

if task.obj == nil {
var err error
task.obj, err = engine.Get(p.localStorage, task.addr)
var err error
var prm *putsvc.RemotePutPrm
var stream io.ReadSeeker
binReplication := task.obj == nil
if binReplication {
b, err := p.localStorage.GetBytes(task.addr)
if err != nil {
p.log.Error("could not get object from local storage",
zap.Stringer("object", task.addr),
zap.Error(err))

return
}
stream = bytes.NewReader(b)
if len(task.nodes) > 1 {
stream = client.DemuxReplicatedObject(stream)
}
} else {
prm = new(putsvc.RemotePutPrm).WithObject(task.obj)
}

prm := new(putsvc.RemotePutPrm).
WithObject(task.obj)

for i := 0; task.quantity > 0 && i < len(task.nodes); i++ {
select {
case <-ctx.Done():
Expand All @@ -54,7 +62,13 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)

callCtx, cancel := context.WithTimeout(ctx, p.putTimeout)

err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i]))
if binReplication {
err = p.remoteSender.ReplicateObjectToNode(callCtx, task.addr.Object(), stream, task.nodes[i])
// note that we don't need to reset stream because it is used exactly once
// according to the client.DemuxReplicatedObject above
} else {
err = p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i]))
}

cancel()

Expand Down

0 comments on commit ecb3a89

Please sign in to comment.