Skip to content

Commit

Permalink
node/object: Serve new replication service
Browse files Browse the repository at this point in the history
NeoFS protocol has been recently extended with new object replication
RPC `ObjectService.Replicate` separated from the general-purpose
`ObjectService.Put` one. According to API of the new RPC, all physically
stored objects are transmitted in one message. Also, replication request
and response formats are much simpler than for other operations.

Serve new RPC by the storage node app. Requests are served similar to
`ObjectService.Put` ones with TTL=1 (local only).

Refs #2317.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Feb 21, 2024
1 parent 2b069f6 commit 2fa7832
Show file tree
Hide file tree
Showing 10 changed files with 1,274 additions and 16 deletions.
42 changes: 42 additions & 0 deletions cmd/neofs-node/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"math"
"net"
"time"

Expand All @@ -14,12 +15,53 @@ import (
)

func initGRPC(c *cfg) {
if c.cfgMorph.client == nil {
initMorphComponents(c)
}

// limit max size of single messages received by the gRPC servers up to max
// object size setting of the NeoFS network: this is needed to serve
// ObjectService.Replicate RPC transmitting the entire stored object in one
// message
maxObjSize, err := c.maxObjectSize()
fatalOnErrDetails("read max object size network setting to determine gRPC recv message limit", err)

maxRecvSize := maxObjSize
// don't forget about meta fields
const approxMaxMsgMetaSize = 10 << 10 // ^10K is definitely enough
if maxRecvSize < uint64(math.MaxUint64-approxMaxMsgMetaSize) { // just in case, always true in practice
maxRecvSize += approxMaxMsgMetaSize
}

var maxRecvMsgSizeOpt grpc.ServerOption
if maxRecvSize > maxMsgSize { // do not decrease default value
if maxRecvSize > math.MaxInt {
// ^2GB for 32-bit systems which is currently enough in practice. If at some
// point this is not enough, we'll need to expand the option
fatalOnErr(fmt.Errorf("cannot serve NeoFS API over gRPC: "+
"object of max size is bigger than gRPC server is able to support %d > %d",
maxRecvSize, math.MaxInt))
}

maxRecvMsgSizeOpt = grpc.MaxRecvMsgSize(int(maxRecvSize))
c.log.Debug("limit max recv gRPC message size to fit max stored objects",
zap.Uint64("max object size", maxObjSize), zap.Uint64("max recv msg", maxRecvSize))
}

var successCount int
grpcconfig.IterateEndpoints(c.cfgReader, func(sc *grpcconfig.Config) {
serverOpts := []grpc.ServerOption{
grpc.MaxSendMsgSize(maxMsgSize),
}

if maxRecvMsgSizeOpt != nil {
// TODO(@cthulhu-rider): the setting can be server-global only now, support
// per-RPC limits
// TODO(@cthulhu-rider): max object size setting may change in general,
// but server configuration is static now
serverOpts = append(serverOpts, maxRecvMsgSizeOpt)
}

tlsCfg := sc.TLS()

if tlsCfg != nil {
Expand Down
23 changes: 21 additions & 2 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl"
apiNetmap "github.com/nspcc-dev/neofs-sdk-go/netmap"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
apireputation "github.com/nspcc-dev/neofs-sdk-go/reputation"
Expand All @@ -54,8 +55,12 @@ type objectSvc struct {
delete *deletesvcV2.Service
}

func (c *cfg) maxObjectSize() (uint64, error) {
return c.cfgNetmap.wrapper.MaxObjectSize()
}

func (c *cfg) MaxObjectSize() uint64 {
sz, err := c.cfgNetmap.wrapper.MaxObjectSize()
sz, err := c.maxObjectSize()
if err != nil {
c.log.Error("could not get max object size value",
zap.String("error", err.Error()),
Expand Down Expand Up @@ -362,7 +367,21 @@ func initObjectService(c *cfg) {
firstSvc = objectService.NewMetricCollector(signSvc, c.metricsCollector)
}

server := objectTransportGRPC.New(firstSvc)
replNode := newReplicationNode(c.log, sPut, c.PublicKey, func(id cid.ID) (apiNetmap.PlacementPolicy, error) {
cnr, err := c.cfgObject.cnrSource.Get(id)
if err != nil {
return apiNetmap.PlacementPolicy{}, nil
}
return cnr.Value.PlacementPolicy(), nil
}, c.cfgNetmap.state.CurrentEpoch, func(epoch uint64) (apiNetmap.NetMap, error) {
nm, err := c.netMapSource.GetNetMapByEpoch(epoch)
if err != nil {
return apiNetmap.NetMap{}, err
}
return *nm, nil
})

server := objectTransportGRPC.New(firstSvc, replNode)

for _, srv := range c.cfgGRPC.servers {
objectGRPC.RegisterObjectServiceServer(srv, server)
Expand Down
127 changes: 127 additions & 0 deletions cmd/neofs-node/replication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package main

import (
"bytes"
"fmt"

putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
"go.uber.org/zap"
)

// replicationNode checks object storage policy compliance against NeoFS network
// maps.
type replicationNode struct {
log *zap.Logger

putObjectService *putsvc.Service

getPubKey func() []byte
getContainerStoragePolicy func(cid.ID) (netmapsdk.PlacementPolicy, error)
getCurrentEpoch func() uint64
getNetmap func(epoch uint64) (netmapsdk.NetMap, error)
}

func newReplicationNode(
log *zap.Logger,
putObjectService *putsvc.Service,
getLocalNodePubKey func() []byte,
getContainerStoragePolicy func(cid.ID) (netmapsdk.PlacementPolicy, error),
getCurrentEpoch func() uint64,
getNetmap func(epoch uint64) (netmapsdk.NetMap, error),
) *replicationNode {
return &replicationNode{
log: log,
putObjectService: putObjectService,
getPubKey: getLocalNodePubKey,
getContainerStoragePolicy: getContainerStoragePolicy,
getCurrentEpoch: getCurrentEpoch,
getNetmap: getNetmap,
}
}

func (x *replicationNode) compliesStoragePolicyInPastNetmap(bPubKey []byte, cnrID cid.ID, policy netmapsdk.PlacementPolicy, epoch uint64) (bool, error) {
nm, err := x.getNetmap(epoch)
if err != nil {
return false, fmt.Errorf("read network map: %w", err)
}

inNetmap := false
nodes := nm.Nodes()

for i := range nodes {
if bytes.Equal(nodes[i].PublicKey(), bPubKey) {
inNetmap = true
break
}
}

if !inNetmap {
return false, nil
}

cnrVectors, err := nm.ContainerNodes(policy, cnrID)
if err != nil {
return false, fmt.Errorf("build list of container nodes from network map, storage policy and container ID: %w", err)
}

for i := range cnrVectors {
for j := range cnrVectors[i] {
if bytes.Equal(cnrVectors[i][j].PublicKey(), bPubKey) {
return true, nil
}
}
}

return false, nil
}

// CompliesContainerStoragePolicy checks whether local node's public key is
// presented in network map of the latest NeoFS epoch and matches storage policy
// of the referenced container.
func (x *replicationNode) CompliesContainerStoragePolicy(cnrID cid.ID) (bool, error) {
storagePolicy, err := x.getContainerStoragePolicy(cnrID)
if err != nil {
return false, fmt.Errorf("read container storage policy by container ID: %w", err)
}

ok, err := x.compliesStoragePolicyInPastNetmap(x.getPubKey(), cnrID, storagePolicy, x.getCurrentEpoch())
if err != nil {
return false, fmt.Errorf("check with the latest network map: %w", err)
}

return ok, nil
}

// ClientCompliesContainerStoragePolicy checks whether given public key belongs
// to any storage node present in network map of the latest or previous NeoFS
// epoch and matching storage policy of the referenced container.
func (x *replicationNode) ClientCompliesContainerStoragePolicy(bClientPubKey []byte, cnrID cid.ID) (bool, error) {
storagePolicy, err := x.getContainerStoragePolicy(cnrID)
if err != nil {
return false, fmt.Errorf("read container storage policy by container ID: %w", err)
}

curEpoch := x.getCurrentEpoch()

ok, err := x.compliesStoragePolicyInPastNetmap(bClientPubKey, cnrID, storagePolicy, curEpoch)
if err != nil {
return false, fmt.Errorf("check with the latest network map: %w", err)
}

if !ok && curEpoch > 0 {
ok, err = x.compliesStoragePolicyInPastNetmap(bClientPubKey, cnrID, storagePolicy, curEpoch-1)
if err != nil {
return false, fmt.Errorf("check with previous network map: %w", err)
}
}

return ok, nil
}

// StoreObject processes object same way as `ObjectService.Put` RPC with TTL=1.
func (x *replicationNode) StoreObject(cnr cid.ID, obj object.Object) error {
return x.putObjectService.ValidateAndStoreObjectLocally(cnr, obj)
}
Loading

0 comments on commit 2fa7832

Please sign in to comment.