Skip to content

Commit

Permalink
node/object: Serve new replication service
Browse files Browse the repository at this point in the history
NeoFS protocol has been recently extended with new object replication
RPC `ObjectService.Replicate` separated from the general-purpose
`ObjectService.Put` one. According to API of the new RPC, all physically
stored objects are transmitted in one message. Also, replication request
and response formats are much simpler than for other operations.

Serve new RPC by the storage node app. Requests are served similar to
`ObjectService.Put` ones with TTL=1 (local only).

Refs #2317.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Apr 3, 2024
1 parent c6a3201 commit 5b1d6ef
Show file tree
Hide file tree
Showing 9 changed files with 1,173 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Changelog for NeoFS Node
- CLI now allows to create and print eACL with numeric filters (#2742)
- gRPC connection limits per endpoint (#1240)
- `neofs-lens object link` command for the new link object inspection (#2799)
- Storage nodes serve new `ObjectService.Replicate` RPC (#2674)

### Fixed
- Access to `PUT` objects no longer grants `DELETE` rights (#2261)
Expand Down
41 changes: 41 additions & 0 deletions cmd/neofs-node/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,63 @@ import (
"crypto/tls"
"errors"
"fmt"
"math"
"net"
"time"

grpcconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/grpc"
"github.com/nspcc-dev/neofs-sdk-go/object"
"go.uber.org/zap"
"golang.org/x/net/netutil"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

func initGRPC(c *cfg) {
if c.cfgMorph.client == nil {
initMorphComponents(c)
}

// limit max size of single messages received by the gRPC servers up to max
// object size setting of the NeoFS network: this is needed to serve
// ObjectService.Replicate RPC transmitting the entire stored object in one
// message
maxObjSize, err := c.nCli.MaxObjectSize()
fatalOnErrDetails("read max object size network setting to determine gRPC recv message limit", err)

maxRecvSize := maxObjSize
// don't forget about meta fields
if maxRecvSize < uint64(math.MaxUint64-object.MaxHeaderLen) { // just in case, always true in practice
maxRecvSize += object.MaxHeaderLen
} else {
maxRecvSize = math.MaxUint64
}

var maxRecvMsgSizeOpt grpc.ServerOption
if maxRecvSize > maxMsgSize { // do not decrease default value
if maxRecvSize > math.MaxInt {
// ^2GB for 32-bit systems which is currently enough in practice. If at some
// point this is not enough, we'll need to expand the option
fatalOnErr(fmt.Errorf("cannot serve NeoFS API over gRPC: object of max size is bigger than gRPC server is able to support %d>%d",
maxRecvSize, math.MaxInt))
}
maxRecvMsgSizeOpt = grpc.MaxRecvMsgSize(int(maxRecvSize))
c.log.Debug("limit max recv gRPC message size to fit max stored objects",
zap.Uint64("max object size", maxObjSize), zap.Uint64("max recv msg", maxRecvSize))
}

var successCount int
grpcconfig.IterateEndpoints(c.cfgReader, func(sc *grpcconfig.Config) {
serverOpts := []grpc.ServerOption{
grpc.MaxSendMsgSize(maxMsgSize),
}
if maxRecvMsgSizeOpt != nil {
// TODO(@cthulhu-rider): the setting can be server-global only now, support
// per-RPC limits
// TODO(@cthulhu-rider): max object size setting may change in general,
// but server configuration is static now
serverOpts = append(serverOpts, maxRecvMsgSizeOpt)
}

tlsCfg := sc.TLS()

Expand Down
49 changes: 48 additions & 1 deletion cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,10 @@ func initObjectService(c *cfg) {
firstSvc = objectService.NewMetricCollector(signSvc, c.metricsCollector)
}

server := objectTransportGRPC.New(firstSvc)
objNode, err := newNodeForObjects(c.cfgObject.cnrSource, c.netMapSource, sPut, c.IsLocalKey)
fatalOnErr(err)

server := objectTransportGRPC.New(firstSvc, objNode)

for _, srv := range c.cfgGRPC.servers {
objectGRPC.RegisterObjectServiceServer(srv, server)
Expand Down Expand Up @@ -601,3 +604,47 @@ func (h headerSource) Head(address oid.Address) (*objectSDK.Object, error) {

return hw.h, nil
}

// nodeForObjects represents NeoFS storage node for object storage.
type nodeForObjects struct {
putObjectService *putsvc.Service
containerNodes *containerNodes
isLocalPubKey func([]byte) bool
}

func newNodeForObjects(containers containercore.Source, network netmap.Source, putObjectService *putsvc.Service, isLocalPubKey func([]byte) bool) (*nodeForObjects, error) {
cnrNodes, err := newContainerNodes(containers, network)
if err != nil {
return nil, err
}
return &nodeForObjects{
putObjectService: putObjectService,
containerNodes: cnrNodes,
isLocalPubKey: isLocalPubKey,
}, nil
}

// ForEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key
// of each node match the referenced container's storage policy at two latest
// epochs into f. When f returns false, nil is returned instantly.
//
// Implements [object.Node] interface.
func (x *nodeForObjects) ForEachContainerNodePublicKeyInLastTwoEpochs(id cid.ID, f func(pubKey []byte) bool) error {
return x.containerNodes.forEachContainerNodePublicKeyInLastTwoEpochs(id, f)
}

// IsOwnPublicKey checks whether given binary-encoded public key is assigned to
// local storage node in the network map.
//
// Implements [object.Node] interface.
func (x *nodeForObjects) IsOwnPublicKey(pubKey []byte) bool {
return x.isLocalPubKey(pubKey)
}

// VerifyAndStoreObject checks given object's format and, if it is correct,
// saves the object in the node's local object storage.
//
// Implements [object.Node] interface.
func (x *nodeForObjects) VerifyAndStoreObject(obj objectSDK.Object) error {
return x.putObjectService.ValidateAndStoreObjectLocally(obj)
}
83 changes: 83 additions & 0 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package main

import (
"fmt"

"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
)

// containerNodes wraps NeoFS network state to apply container storage policies.
type containerNodes struct {
containers container.Source
network netmap.Source
}

func newContainerNodes(containers container.Source, network netmap.Source) (*containerNodes, error) {
return &containerNodes{
containers: containers,
network: network,
}, nil
}

// forEachNodePubKeyInSets passes binary-encoded public key of each node into f.
// When f returns false, forEachNodePubKeyInSets returns false instantly.
// Otherwise, true is returned.
func forEachNodePubKeyInSets(nodeSets [][]netmapsdk.NodeInfo, f func(pubKey []byte) bool) bool {
for i := range nodeSets {
for j := range nodeSets[i] {
if !f(nodeSets[i][j].PublicKey()) {
return false
}
}
}
return true
}

// forEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key
// of each node match the referenced container's storage policy at two latest
// epochs into f. When f returns false, nil is returned instantly.
func (x *containerNodes) forEachContainerNodePublicKeyInLastTwoEpochs(cnrID cid.ID, f func(pubKey []byte) bool) error {
epoch, err := x.network.Epoch()
if err != nil {
return fmt.Errorf("read current NeoFS epoch: %w", err)
}

cnr, err := x.containers.Get(cnrID)
if err != nil {
return fmt.Errorf("read container by ID: %w", err)
}

networkMap, err := x.network.GetNetMapByEpoch(epoch)
if err != nil {
return fmt.Errorf("read network map at epoch #%d: %w", epoch, err)
}
// TODO(#2692): node sets remain unchanged for fixed container and network map,
// so recently calculated results worth caching
ns, err := networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID)
if err != nil {
return fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err)
}

if !forEachNodePubKeyInSets(ns, f) || epoch == 0 {
return nil
}

epoch--

networkMap, err = x.network.GetNetMapByEpoch(epoch)
if err != nil {
return fmt.Errorf("read network map at epoch #%d: %w", epoch, err)
}

ns, err = networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID)
if err != nil {
return fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err)
}

forEachNodePubKeyInSets(ns, f)

return nil
}
Loading

0 comments on commit 5b1d6ef

Please sign in to comment.