Skip to content

Commit

Permalink
replicator: Demux one message with the replicated object between nodes
Browse files Browse the repository at this point in the history
Previously, replication service of storage nodes encoded single object
into protocol message N times, where N is a number of nodes to accept
new object replica. Based on the fact that if the object is fixed, the
request remains unchanged, it makes sense to prepare the message once
and send it to all nodes.

Closes #2316. Refs #2317.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Jan 29, 2024
1 parent 34f23c7 commit 36ca672
Showing 1 changed file with 43 additions and 0 deletions.
43 changes: 43 additions & 0 deletions pkg/services/replicator/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package replicator

import (
"context"
"errors"
"io"

putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
Expand All @@ -18,6 +20,34 @@ type TaskResult interface {
SubmitSuccessfulReplication(netmap.NodeInfo)
}

type readSeekerClosedOnEOF struct {
closed bool

rs io.ReadSeeker
c io.Closer
}

func (x *readSeekerClosedOnEOF) Read(p []byte) (int, error) {
n, err := x.rs.Read(p)
if errors.Is(err, io.EOF) {
x.closed = true
_ = x.c.Close()
}
return n, err
}

func (x *readSeekerClosedOnEOF) Seek(offset int64, whence int) (int64, error) {
return x.rs.Seek(offset, whence)
}

func (x *readSeekerClosedOnEOF) Close() error {
if !x.closed {
x.closed = true
return x.c.Close()
}
return nil
}

// HandleTask executes replication task inside invoking goroutine.
// Passes all the nodes that accepted the replication to the TaskResult.
func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) {
Expand All @@ -40,6 +70,16 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
return
}

if len(task.nodes) > 1 {
rs := client.DemuxReplicatedObject(binObjStream)
// since in this case we read object once it's worth to close the stream insta
// after reading finish so that no longer used resources do not hang up
binObjStream = &readSeekerClosedOnEOF{
rs: rs,
c: binObjStream,
}
}

defer func() {
if err := binObjStream.Close(); err != nil {
p.log.Debug("failed to close replicated object's binary stream from the local storage",
Expand Down Expand Up @@ -76,6 +116,9 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)

if binObjStream != nil {
err = p.remoteSender.ReplicateObjectToNode(ctx, task.nodes[i], binObjStream)
// note that we don't need to reset binObjStream because it always read once:
// - if len(task.nodes) == 1, we won't come here again
// - otherwise, we use client.DemuxReplicatedObject (see above)
// FIXME: temporary workaround, see also
// https://github.com/nspcc-dev/neofs-api/issues/201#issuecomment-1891383454
if st, ok := status.FromError(err); ok && st.Code() == codes.Unimplemented {
Expand Down

0 comments on commit 36ca672

Please sign in to comment.