Skip to content

Commit

Permalink
node/object: Serve new replication service
Browse files Browse the repository at this point in the history
NeoFS protocol has been recently extended with new object replication
RPC `ObjectService.Replicate` separated from the general-purpose
`ObjectService.Put` one. According to API of the new RPC, all physically
stored objects are transmitted in one message. Also, replication request
and response formats are much simpler than for other operations.

Serve new RPC by the storage node app. Requests are served similar to
`ObjectService.Put` ones with TTL=1 (local only).

Refs #2317.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Mar 28, 2024
1 parent 9ca1fbd commit 5fec85b
Show file tree
Hide file tree
Showing 9 changed files with 1,275 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Changelog for NeoFS Node
- Support of `GT`, `GE`, `LT` and `LE` numeric comparison operators in CLI (#2733)
- SN eACL processing of NULL and numeric operators (#2742)
- CLI now allows to create and print eACL with numeric filters (#2742)
- Storage nodes serve new `ObjectService.Replicate` RPC (#2674)

### Fixed
- Access to `PUT` objects no longer grants `DELETE` rights (#2261)
Expand Down
41 changes: 41 additions & 0 deletions cmd/neofs-node/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,62 @@ import (
"crypto/tls"
"errors"
"fmt"
"math"
"net"
"time"

grpcconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/grpc"
"github.com/nspcc-dev/neofs-sdk-go/object"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

func initGRPC(c *cfg) {
if c.cfgMorph.client == nil {
initMorphComponents(c)
}

// limit max size of single messages received by the gRPC servers up to max
// object size setting of the NeoFS network: this is needed to serve
// ObjectService.Replicate RPC transmitting the entire stored object in one
// message
maxObjSize, err := c.nCli.MaxObjectSize()
fatalOnErrDetails("read max object size network setting to determine gRPC recv message limit", err)

maxRecvSize := maxObjSize
// don't forget about meta fields
if maxRecvSize < uint64(math.MaxUint64-object.MaxHeaderLen) { // just in case, always true in practice
maxRecvSize += object.MaxHeaderLen
} else {
maxRecvSize = math.MaxUint64
}

var maxRecvMsgSizeOpt grpc.ServerOption
if maxRecvSize > maxMsgSize { // do not decrease default value
if maxRecvSize > math.MaxInt {
// ^2GB for 32-bit systems which is currently enough in practice. If at some
// point this is not enough, we'll need to expand the option
fatalOnErr(fmt.Errorf("cannot serve NeoFS API over gRPC: object of max size is bigger than gRPC server is able to support %d>%d",
maxRecvSize, math.MaxInt))
}
maxRecvMsgSizeOpt = grpc.MaxRecvMsgSize(int(maxRecvSize))
c.log.Debug("limit max recv gRPC message size to fit max stored objects",
zap.Uint64("max object size", maxObjSize), zap.Uint64("max recv msg", maxRecvSize))
}

var successCount int
grpcconfig.IterateEndpoints(c.cfgReader, func(sc *grpcconfig.Config) {
serverOpts := []grpc.ServerOption{
grpc.MaxSendMsgSize(maxMsgSize),
}
if maxRecvMsgSizeOpt != nil {
// TODO(@cthulhu-rider): the setting can be server-global only now, support
// per-RPC limits
// TODO(@cthulhu-rider): max object size setting may change in general,
// but server configuration is static now
serverOpts = append(serverOpts, maxRecvMsgSizeOpt)
}

tlsCfg := sc.TLS()

Expand Down
49 changes: 48 additions & 1 deletion cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,10 @@ func initObjectService(c *cfg) {
firstSvc = objectService.NewMetricCollector(signSvc, c.metricsCollector)
}

server := objectTransportGRPC.New(firstSvc)
objNode, err := newNodeForObjects(c.cfgObject.cnrSource, c.netMapSource, sPut, c.IsLocalKey)
fatalOnErr(err)

server := objectTransportGRPC.New(firstSvc, objNode)

for _, srv := range c.cfgGRPC.servers {
objectGRPC.RegisterObjectServiceServer(srv, server)
Expand Down Expand Up @@ -601,3 +604,47 @@ func (h headerSource) Head(address oid.Address) (*objectSDK.Object, error) {

return hw.h, nil
}

// nodeForObjects represents NeoFS storage node for object storage.
type nodeForObjects struct {
putObjectService *putsvc.Service
containerNodes *containerNodes
isLocalPubKey func([]byte) bool
}

func newNodeForObjects(containers containercore.Source, network netmap.Source, putObjectService *putsvc.Service, isLocalPubKey func([]byte) bool) (*nodeForObjects, error) {
cnrNodes, err := newContainerNodes(containers, network)
if err != nil {
return nil, err
}
return &nodeForObjects{
putObjectService: putObjectService,
containerNodes: cnrNodes,
isLocalPubKey: isLocalPubKey,
}, nil
}

// ForEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key
// of each node match the referenced container's storage policy at two latest
// epochs into f. When f returns false, nil is returned instantly.
//
// Implements [object.Node] interface.
func (x *nodeForObjects) ForEachContainerNodePublicKeyInLastTwoEpochs(id cid.ID, f func(pubKey []byte) bool) error {
return x.containerNodes.forEachContainerNodePublicKeyInLastTwoEpochs(id, f)
}

// IsOwnPublicKey checks whether given binary-encoded public key is assigned to
// local storage node in the network map.
//
// Implements [object.Node] interface.
func (x *nodeForObjects) IsOwnPublicKey(pubKey []byte) bool {
return x.isLocalPubKey(pubKey)
}

// StoreObject checks given object's format and, if it is correct, saves the
// object in the node's local object storage.
//
// Implements [object.Node] interface.
func (x *nodeForObjects) StoreObject(obj objectSDK.Object) error {
return x.putObjectService.ValidateAndStoreObjectLocally(obj)
}
185 changes: 185 additions & 0 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package main

import (
"fmt"
"sync"

"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
)

// storagePolicyRes combines storage policy application result for particular
// container and network map in single unit.
type storagePolicyRes struct {
ns [][]netmapsdk.NodeInfo
err error
}

// containerNodesAtEpoch is a thread-safe LRU cache mapping containers into
// storage policy application results for particular network map.
type containerNodesAtEpoch struct {
mtx sync.RWMutex
lru simplelru.LRUCache[cid.ID, storagePolicyRes]
}

// containerNodes wraps NeoFS network state to apply container storage policies.
//
// Since policy application results are consistent for fixed container and
// network map, they could be cached. The containerNodes caches up to 1000
// results for the latest and the previous epochs. The previous one is required
// to support data migration after the epoch tick. Older epochs are not cached.
type containerNodes struct {
containers container.Source
network netmap.Source

lastCurrentEpochMtx sync.Mutex
lastCurrentEpoch uint64

curEpochCache, prevEpochCache *containerNodesAtEpoch
}

const cachedContainerNodesPerEpochNum = 1000

func newContainerNodes(containers container.Source, network netmap.Source) (*containerNodes, error) {
lru, err := simplelru.NewLRU[cid.ID, storagePolicyRes](cachedContainerNodesPerEpochNum, nil)
if err != nil {
// should never happen
return nil, fmt.Errorf("create LRU cache for container nodes: %w", err)
}
return &containerNodes{
containers: containers,
network: network,
curEpochCache: &containerNodesAtEpoch{lru: lru},
prevEpochCache: new(containerNodesAtEpoch),
}, nil
}

// forEachNodePubKeyInSets passes binary-encoded public key of each node into f.
// When f returns false, forEachNodePubKeyInSets returns false instantly.
// Otherwise, true is returned.
func forEachNodePubKeyInSets(nodeSets [][]netmapsdk.NodeInfo, f func(pubKey []byte) bool) bool {
for i := range nodeSets {
for j := range nodeSets[i] {
if !f(nodeSets[i][j].PublicKey()) {
return false
}
}
}
return true
}

// forEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key
// of each node match the referenced container's storage policy at two latest
// epochs into f. When f returns false, nil is returned instantly.
func (x *containerNodes) forEachContainerNodePublicKeyInLastTwoEpochs(cnrID cid.ID, f func(pubKey []byte) bool) error {
curEpoch, err := x.network.Epoch()
if err != nil {
return fmt.Errorf("read current NeoFS epoch: %w", err)
}

var curEpochCache, prevEpochCache *containerNodesAtEpoch
x.lastCurrentEpochMtx.Lock()
switch {
case curEpoch == x.lastCurrentEpoch-1:
curEpochCache = x.prevEpochCache
case curEpoch > x.lastCurrentEpoch:
curLRU, err := simplelru.NewLRU[cid.ID, storagePolicyRes](cachedContainerNodesPerEpochNum, nil)
if err != nil {
// should never happen
x.lastCurrentEpochMtx.Unlock()
return fmt.Errorf("create LRU cache for container nodes: %w", err)
}

if curEpoch == x.lastCurrentEpoch+1 {
x.prevEpochCache = x.curEpochCache
} else {
prevLRU, err := simplelru.NewLRU[cid.ID, storagePolicyRes](cachedContainerNodesPerEpochNum, nil)
if err != nil {
// should never happen
x.lastCurrentEpochMtx.Unlock()
return fmt.Errorf("create LRU cache for container nodes: %w", err)
}
x.prevEpochCache = &containerNodesAtEpoch{lru: prevLRU}
}
x.curEpochCache = &containerNodesAtEpoch{lru: curLRU}
x.lastCurrentEpoch = curEpoch
fallthrough
case curEpoch == x.lastCurrentEpoch:
curEpochCache = x.curEpochCache
prevEpochCache = x.prevEpochCache
}
x.lastCurrentEpochMtx.Unlock()

var cnr *container.Container

processEpoch := func(cache *containerNodesAtEpoch, epoch uint64) (storagePolicyRes, error) {
var result storagePolicyRes
var isCached bool
if cache != nil {
cache.mtx.Lock()
defer cache.mtx.Unlock()
result, isCached = cache.lru.Get(cnrID)
}
if !isCached {
if cnr == nil {
var err error
cnr, err = x.containers.Get(cnrID)
if err != nil {
// not persistent error => do not cache
return result, fmt.Errorf("read container by ID: %w", err)
}
}
networkMap, err := x.network.GetNetMapByEpoch(epoch)
if err != nil {
// not persistent error => do not cache
return result, fmt.Errorf("read network map at epoch #%d: %w", epoch, err)
}
result.ns, result.err = networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID)
if cache != nil {
cache.lru.Add(cnrID, result)
}
}
return result, nil
}

cur, err := processEpoch(curEpochCache, curEpoch)
if err != nil {
return err
}
if cur.err == nil && !forEachNodePubKeyInSets(cur.ns, f) {
return nil
}
if curEpoch == 0 {
if cur.err != nil {
return fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, cur.err)
}
return nil
}

prev, err := processEpoch(prevEpochCache, curEpoch-1)
if err != nil {
if cur.err != nil {
return fmt.Errorf("select container nodes for both epochs: (previous=%d) %w, (current=%d) %w",
curEpoch-1, err, curEpoch, cur.err)
}
return err
}
if prev.err == nil && !forEachNodePubKeyInSets(prev.ns, f) {
return nil
}

if cur.err != nil {
if prev.err != nil {
return fmt.Errorf("select container nodes for both epochs: (previous=%d) %w, (current=%d) %w",
curEpoch-1, prev.err, curEpoch, cur.err)
}
return fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, cur.err)
}
if prev.err != nil {
return fmt.Errorf("select container nodes for previous epoch #%d: %w", curEpoch-1, prev.err)
}
return nil
}
Loading

0 comments on commit 5fec85b

Please sign in to comment.