From d24fb33b7a58f1848053a64e725f8d609e5e5d2d Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 18 Jul 2024 15:39:34 +0400 Subject: [PATCH] node/policy: Cache object policy application results Continues 10d05a45bd72758c661f9727decbde6efa2bf651 for sorting container nodes for objects. Since each container may include plenty of objects, cache size limit is chosen - again heuristically - 10 times bigger, i.e. 10K. Refs #2692. Signed-off-by: Leonard Lyubich --- CHANGELOG.md | 1 + cmd/neofs-node/policy.go | 156 ++++++++++++++++++++++------------ cmd/neofs-node/policy_test.go | 136 ++++++++++++++++++++++++++--- 3 files changed, 225 insertions(+), 68 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3cbc3a298d..309a46b542 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ Changelog for NeoFS Node - neofs-cli allows several objects deletion at a time (#2774) - `ObjectService.Put` server of in-container node places objects using new `ObjectService.Replicate` RPC (#2802) - `ObjectService`'s `Put` and `Replicate` RPC handlers cache up to 1000 lists of container nodes (#2892) +- `ObjectService`'s `Get`/`Head`/`GetRange` RPC handlers cache up to 10K lists of per-object sorted container nodes (#2896) ### Removed diff --git a/cmd/neofs-node/policy.go b/cmd/neofs-node/policy.go index 8bac383a7f..972fdbe0b5 100644 --- a/cmd/neofs-node/policy.go +++ b/cmd/neofs-node/policy.go @@ -14,18 +14,30 @@ import ( // storagePolicyRes structures persistent storage policy application result for // particular container and network map incl. error. type storagePolicyRes struct { - nodeSets [][]netmapsdk.NodeInfo - err error + nodeSets [][]netmapsdk.NodeInfo + repCounts []uint + err error } -type containerNodesCacheKey struct { - epoch uint64 - cnr cid.ID -} +type ( + containerNodesCacheKey struct { + epoch uint64 + cnr cid.ID + } + objectNodesCacheKey struct { + epoch uint64 + addr oid.Address + } +) -// max number of container storage policy applications results cached by -// containerNodes. -const cachedContainerNodesNum = 1000 +const ( + // max number of container storage policy applications results cached by + // containerNodes. + cachedContainerNodesNum = 1000 + // max number of object storage policy applications results cached by + // containerNodes. + cachedObjectNodesNum = 10000 +) // containerNodes wraps NeoFS network state to apply container storage policies. // @@ -36,7 +48,8 @@ type containerNodes struct { containers container.Source network netmap.Source - cache *lru.Cache[containerNodesCacheKey, storagePolicyRes] + cache *lru.Cache[containerNodesCacheKey, storagePolicyRes] + objCache *lru.Cache[objectNodesCacheKey, storagePolicyRes] } func newContainerNodes(containers container.Source, network netmap.Source) (*containerNodes, error) { @@ -44,10 +57,15 @@ func newContainerNodes(containers container.Source, network netmap.Source) (*con if err != nil { return nil, fmt.Errorf("create LRU container node cache for one epoch: %w", err) } + lo, err := lru.New[objectNodesCacheKey, storagePolicyRes](cachedObjectNodesNum) + if err != nil { + return nil, fmt.Errorf("create LRU container node cache for objects: %w", err) + } return &containerNodes{ containers: containers, network: network, cache: l, + objCache: lo, }, nil } @@ -117,6 +135,47 @@ func (x *containerNodes) forEachContainerNode(cnrID cid.ID, withPrevEpoch bool, return nil } +// getNodesForObject reads storage policy of the referenced container from the +// underlying container storage, reads network map at the specified epoch from +// the underlying storage, applies the storage policy to it and returns sorted +// lists of selected storage nodes along with the per-list numbers of primary +// object holders. Resulting slices must not be changed. +func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) { + curEpoch, err := x.network.Epoch() + if err != nil { + return nil, nil, fmt.Errorf("read current NeoFS epoch: %w", err) + } + cacheKey := objectNodesCacheKey{curEpoch, addr} + res, ok := x.objCache.Get(cacheKey) + if ok { + return res.nodeSets, res.repCounts, res.err + } + cnrRes, networkMap, err := (&containerPolicyContext{ + id: addr.Container(), + containers: x.containers, + network: x.network, + }).applyToNetmap(curEpoch, x.cache) + if err != nil || cnrRes.err != nil { + if err == nil { + err = cnrRes.err // cached in x.cache, no need to store in x.objCache + } + return nil, nil, fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err) + } + if networkMap == nil { + if networkMap, err = x.network.GetNetMapByEpoch(curEpoch); err != nil { + // non-persistent error => do not cache + return nil, nil, fmt.Errorf("read network map by epoch: %w", err) + } + } + res.repCounts = cnrRes.repCounts + res.nodeSets, res.err = networkMap.PlacementVectors(cnrRes.nodeSets, addr.Object()) + if res.err != nil { + res.err = fmt.Errorf("sort container nodes for object: %w", res.err) + } + x.objCache.Add(cacheKey, res) + return res.nodeSets, res.repCounts, res.err +} + // preserves context of storage policy processing for the particular container. type containerPolicyContext struct { // static @@ -131,9 +190,18 @@ type containerPolicyContext struct { // ID to the network map at the specified epoch. applyAtEpoch checks existing // results in the cache and stores new results in it. func (x *containerPolicyContext) applyAtEpoch(epoch uint64, cache *lru.Cache[containerNodesCacheKey, storagePolicyRes]) (storagePolicyRes, error) { + res, _, err := x.applyToNetmap(epoch, cache) + return res, err +} + +// applyToNetmap applies storage policy of container referenced by parameterized +// ID to the network map at the specified epoch. applyAtEpoch checks existing +// results in the cache and stores new results in it. Network map is returned if +// it was requested, i.e. on cache miss only. +func (x *containerPolicyContext) applyToNetmap(epoch uint64, cache *lru.Cache[containerNodesCacheKey, storagePolicyRes]) (storagePolicyRes, *netmapsdk.NetMap, error) { cacheKey := containerNodesCacheKey{epoch, x.id} if result, ok := cache.Get(cacheKey); ok { - return result, nil + return result, nil, nil } var result storagePolicyRes var err error @@ -141,57 +209,33 @@ func (x *containerPolicyContext) applyAtEpoch(epoch uint64, cache *lru.Cache[con x.cnr, err = x.containers.Get(x.id) if err != nil { // non-persistent error => do not cache - return result, fmt.Errorf("read container by ID: %w", err) + return result, nil, fmt.Errorf("read container by ID: %w", err) } } networkMap, err := x.network.GetNetMapByEpoch(epoch) if err != nil { // non-persistent error => do not cache - return result, fmt.Errorf("read network map by epoch: %w", err) + return result, nil, fmt.Errorf("read network map by epoch: %w", err) } - result.nodeSets, result.err = networkMap.ContainerNodes(x.cnr.Value.PlacementPolicy(), x.id) - cache.Add(cacheKey, result) - return result, nil -} - -// getNodesForObject reads storage policy of the referenced container from the -// underlying container storage, reads network map at the specified epoch from -// the underlying storage, applies the storage policy to it and returns sorted -// lists of selected storage nodes along with the per-list numbers of primary -// object holders. Resulting slices must not be changed. -func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) { - epoch, err := x.network.Epoch() - if err != nil { - return nil, nil, fmt.Errorf("read current NeoFS epoch: %w", err) - } - cnrID := addr.Container() - cnr, err := x.containers.Get(cnrID) - if err != nil { - return nil, nil, fmt.Errorf("read container by ID: %w", err) - } - networkMap, err := x.network.GetNetMapByEpoch(epoch) - if err != nil { - return nil, nil, fmt.Errorf("read network map at epoch #%d: %w", epoch, err) - } - - policy := cnr.Value.PlacementPolicy() - nodeLists, err := networkMap.ContainerNodes(policy, cnrID) - if err != nil { - return nil, nil, fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err) - } else if nodeLists, err = networkMap.PlacementVectors(nodeLists, addr.Object()); err != nil { - return nil, nil, fmt.Errorf("sort container nodes from the network map at epoch #%d: %w", epoch, err) - } else if len(nodeLists) != policy.NumberOfReplicas() { - return nil, nil, fmt.Errorf("invalid result of container's storage policy application to the network map at epoch #%d: "+ - "diff number of storage node lists (%d) and required replica descriptors (%d)", epoch, len(nodeLists), policy.NumberOfReplicas()) - } - - primaryCounts := make([]uint, len(nodeLists)) - for i := range nodeLists { - if primaryCounts[i] = uint(policy.ReplicaNumberByIndex(i)); primaryCounts[i] > uint(len(nodeLists[i])) { - return nil, nil, fmt.Errorf("invalid result of container's storage policy application to the network map at epoch #%d: "+ - "invalid storage node list #%d: number of nodes (%d) is less than minimum required by the container policy (%d)", - epoch, i, len(nodeLists), policy.NumberOfReplicas()) + policy := x.cnr.Value.PlacementPolicy() + result.nodeSets, result.err = networkMap.ContainerNodes(policy, x.id) + if result.err == nil { + // ContainerNodes should control following, but still better to double-check + if policyNum := policy.NumberOfReplicas(); len(result.nodeSets) != policyNum { + result.err = fmt.Errorf("invalid result of container's storage policy application to the network map at epoch #%d: "+ + "diff number of storage node sets (%d) and required replica descriptors (%d)", + epoch, len(result.nodeSets), policyNum) + } + result.repCounts = make([]uint, len(result.nodeSets)) + for i := range result.nodeSets { + if result.repCounts[i] = uint(policy.ReplicaNumberByIndex(i)); result.repCounts[i] > uint(len(result.nodeSets[i])) { + result.err = fmt.Errorf("invalid result of container's storage policy application to the network map at epoch #%d: "+ + "invalid storage node set #%d: number of nodes (%d) is less than minimum required by the container policy (%d)", + epoch, i, len(result.nodeSets), result.repCounts[i]) + break + } } } - return nodeLists, primaryCounts, nil + cache.Add(cacheKey, result) + return result, networkMap, nil } diff --git a/cmd/neofs-node/policy_test.go b/cmd/neofs-node/policy_test.go index 66121e1775..150348e6b1 100644 --- a/cmd/neofs-node/policy_test.go +++ b/cmd/neofs-node/policy_test.go @@ -4,6 +4,8 @@ import ( "crypto/rand" "errors" "fmt" + "strconv" + "strings" "testing" containercore "github.com/nspcc-dev/neofs-node/pkg/core/container" @@ -11,9 +13,12 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" "github.com/nspcc-dev/neofs-sdk-go/netmap" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" "github.com/stretchr/testify/require" ) +const anyEpoch = 42 + type testContainer struct { id cid.ID val container.Container @@ -84,9 +89,8 @@ func (x *testNetwork) Epoch() (uint64, error) { return x.epoch, x.epochErr } -func newNetmapWithContainer(tb testing.TB, nodeNum int, selected []int) ([]netmap.NodeInfo, *netmap.NetMap, container.Container) { +func newNetmapWithContainer(tb testing.TB, nodeNum int, selected ...[]int) ([]netmap.NodeInfo, *netmap.NetMap, container.Container) { nodes := make([]netmap.NodeInfo, nodeNum) -nextNode: for i := range nodes { key := make([]byte, 33) _, err := rand.Read(key) @@ -94,28 +98,36 @@ nextNode: nodes[i].SetPublicKey(key) for j := range selected { - if i == selected[j] { - nodes[i].SetAttribute("attr", "true") - continue nextNode + for k := range selected[j] { + if i == selected[j][k] { + nodes[i].SetAttribute("attr"+strconv.Itoa(j), "true") + break + } } } - - nodes[i].SetAttribute("attr", "false") } var networkMap netmap.NetMap networkMap.SetNodes(nodes) + var sbRpl, sbSlc, sbFlt strings.Builder + for i := range selected { + sbFlt.WriteString(fmt.Sprintf("FILTER attr%d EQ true AS F%d\n", i, i)) + sbSlc.WriteString(fmt.Sprintf("SELECT %d FROM F%d AS S%d\n", len(selected[i]), i, i)) + sbRpl.WriteString(fmt.Sprintf("REP %d IN S%d\n", len(selected[i]), i)) + } var policy netmap.PlacementPolicy - strPolicy := fmt.Sprintf("REP %d CBF 1 SELECT %d FROM F FILTER attr EQ true AS F", len(selected), len(selected)) - require.NoError(tb, policy.DecodeString(strPolicy)) + strPolicy := fmt.Sprintf("%sCBF 1\n%s%s", &sbRpl, &sbSlc, &sbFlt) + require.NoError(tb, policy.DecodeString(strPolicy), strPolicy) nodeSets, err := networkMap.ContainerNodes(policy, cidtest.ID()) require.NoError(tb, err) - require.Len(tb, nodeSets, 1) - require.Len(tb, nodeSets[0], len(selected)) + require.Len(tb, nodeSets, len(selected)) for i := range selected { - require.Contains(tb, nodeSets[0], nodes[selected[i]], i) + require.Len(tb, nodeSets[i], len(selected[i]), i) + for j := range selected[i] { + require.Contains(tb, nodeSets[i], nodes[selected[i][j]], [2]int{i, j}) + } } var cnr container.Container @@ -500,3 +512,103 @@ func TestContainerNodes_ForEachContainerNodePublicKeyInLastTwoEpochs(t *testing. } }) } + +func TestContainerNodes_GetNodesForObject(t *testing.T) { + anyAddr := oidtest.Address() + t.Run("read current epoch failure", func(t *testing.T) { + epochErr := errors.New("any epoch error") + network := &testNetwork{epochErr: epochErr} + ns, err := newContainerNodes(new(testContainer), network) + require.NoError(t, err) + + for n := 1; n < 10; n++ { + _, _, err = ns.getNodesForObject(anyAddr) + require.ErrorIs(t, err, epochErr) + require.EqualError(t, err, "read current NeoFS epoch: any epoch error") + // such error must not be cached + network.assertEpochCallCount(t, n) + } + }) + t.Run("read container failure", func(t *testing.T) { + cnrErr := errors.New("any container error") + cnrs := &testContainer{id: anyAddr.Container(), err: cnrErr} + ns, err := newContainerNodes(cnrs, &testNetwork{epoch: anyEpoch}) + require.NoError(t, err) + + for n := 1; n < 10; n++ { + _, _, err = ns.getNodesForObject(anyAddr) + require.ErrorIs(t, err, cnrErr) + require.EqualError(t, err, "select container nodes for current epoch #42: read container by ID: any container error") + // such error must not be cached + cnrs.assertCalledNTimesWith(t, n, anyAddr.Container()) + } + }) + t.Run("read netmap failure", func(t *testing.T) { + curNetmapErr := errors.New("any current netmap error") + network := &testNetwork{epoch: anyEpoch, curNetmapErr: curNetmapErr} + ns, err := newContainerNodes(&testContainer{id: anyAddr.Container()}, network) + require.NoError(t, err) + + for n := 1; n <= 10; n++ { + _, _, err = ns.getNodesForObject(anyAddr) + require.ErrorIs(t, err, curNetmapErr) + require.EqualError(t, err, "select container nodes for current epoch #42: read network map by epoch: any current netmap error") + network.assertEpochCallCount(t, n) + // such error must not be cached + network.assertNetmapCalledNTimes(t, n, network.epoch) + } + }) + t.Run("apply policy failures", func(t *testing.T) { + t.Run("select container nodes", func(t *testing.T) { + _, _, cnr := newNetmapWithContainer(t, 5, []int{1, 3}) + failNetmap := new(netmap.NetMap) + _, policyErr := failNetmap.ContainerNodes(cnr.PlacementPolicy(), anyAddr.Container()) + require.Error(t, policyErr) + + cnrs := &testContainer{id: anyAddr.Container(), val: cnr} + network := &testNetwork{epoch: anyEpoch, curNetmap: failNetmap} + ns, err := newContainerNodes(cnrs, network) + require.NoError(t, err) + + for n := 1; n <= 10; n++ { + _, _, err = ns.getNodesForObject(anyAddr) + require.EqualError(t, err, fmt.Sprintf("select container nodes for current epoch #42: %v", policyErr)) + network.assertEpochCallCount(t, n) + // assert results are cached + cnrs.assertCalledNTimesWith(t, 1, anyAddr.Container()) + require.Len(t, network.callsNetmap, 1) + require.EqualValues(t, network.epoch, network.callsNetmap[0]) + } + }) + }) + t.Run("OK", func(t *testing.T) { + nodes, networkMap, cnr := newNetmapWithContainer(t, 10, [][]int{ + {1, 3}, + {2, 4, 6}, + {5}, + {0, 1, 7, 8, 9}, + }...) + cnrs := &testContainer{id: anyAddr.Container(), val: cnr} + network := &testNetwork{epoch: anyEpoch, curNetmap: networkMap} + ns, err := newContainerNodes(cnrs, network) + require.NoError(t, err) + + for n := 1; n <= 10; n++ { + nodeLists, primCounts, err := ns.getNodesForObject(anyAddr) + require.NoError(t, err) + require.Len(t, primCounts, 4) + require.Len(t, nodeLists, 4) + require.EqualValues(t, 2, primCounts[0]) + require.ElementsMatch(t, []netmap.NodeInfo{nodes[1], nodes[3]}, nodeLists[0]) + require.EqualValues(t, 3, primCounts[1]) + require.ElementsMatch(t, []netmap.NodeInfo{nodes[2], nodes[4], nodes[6]}, nodeLists[1]) + require.EqualValues(t, 1, primCounts[2]) + require.ElementsMatch(t, []netmap.NodeInfo{nodes[5]}, nodeLists[2]) + require.EqualValues(t, 5, primCounts[3]) + require.ElementsMatch(t, []netmap.NodeInfo{nodes[0], nodes[1], nodes[7], nodes[8], nodes[9]}, nodeLists[3]) + cnrs.assertCalledNTimesWith(t, 1, anyAddr.Container()) + require.Len(t, network.callsNetmap, 1) + require.EqualValues(t, network.epoch, network.callsNetmap[0]) + } + }) +}