diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c03a40ef9..9220259666 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,8 @@ Changelog for NeoFS Node - Support of `GT`, `GE`, `LT` and `LE` numeric comparison operators in CLI (#2733) - SN eACL processing of NULL and numeric operators (#2742) - CLI now allows to create and print eACL with numeric filters (#2742) -- gRPC connection limits per endpoint (#1240) +- gRPC connection limits per endpoint (#1240) +- Storage nodes serve new `ObjectService.Replicate` RPC (#2674) ### Fixed - Access to `PUT` objects no longer grants `DELETE` rights (#2261) diff --git a/cmd/neofs-node/grpc.go b/cmd/neofs-node/grpc.go index 8fdc36af01..c40cdf7938 100644 --- a/cmd/neofs-node/grpc.go +++ b/cmd/neofs-node/grpc.go @@ -4,10 +4,12 @@ import ( "crypto/tls" "errors" "fmt" + "math" "net" "time" grpcconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/grpc" + "github.com/nspcc-dev/neofs-sdk-go/object" "go.uber.org/zap" "golang.org/x/net/netutil" "google.golang.org/grpc" @@ -15,11 +17,50 @@ import ( ) func initGRPC(c *cfg) { + if c.cfgMorph.client == nil { + initMorphComponents(c) + } + + // limit max size of single messages received by the gRPC servers up to max + // object size setting of the NeoFS network: this is needed to serve + // ObjectService.Replicate RPC transmitting the entire stored object in one + // message + maxObjSize, err := c.nCli.MaxObjectSize() + fatalOnErrDetails("read max object size network setting to determine gRPC recv message limit", err) + + maxRecvSize := maxObjSize + // don't forget about meta fields + if maxRecvSize < uint64(math.MaxUint64-object.MaxHeaderLen) { // just in case, always true in practice + maxRecvSize += object.MaxHeaderLen + } else { + maxRecvSize = math.MaxUint64 + } + + var maxRecvMsgSizeOpt grpc.ServerOption + if maxRecvSize > maxMsgSize { // do not decrease default value + if maxRecvSize > math.MaxInt { + // ^2GB for 32-bit systems which is currently enough in practice. If at some + // point this is not enough, we'll need to expand the option + fatalOnErr(fmt.Errorf("cannot serve NeoFS API over gRPC: object of max size is bigger than gRPC server is able to support %d>%d", + maxRecvSize, math.MaxInt)) + } + maxRecvMsgSizeOpt = grpc.MaxRecvMsgSize(int(maxRecvSize)) + c.log.Debug("limit max recv gRPC message size to fit max stored objects", + zap.Uint64("max object size", maxObjSize), zap.Uint64("max recv msg", maxRecvSize)) + } + var successCount int grpcconfig.IterateEndpoints(c.cfgReader, func(sc *grpcconfig.Config) { serverOpts := []grpc.ServerOption{ grpc.MaxSendMsgSize(maxMsgSize), } + if maxRecvMsgSizeOpt != nil { + // TODO(@cthulhu-rider): the setting can be server-global only now, support + // per-RPC limits + // TODO(@cthulhu-rider): max object size setting may change in general, + // but server configuration is static now + serverOpts = append(serverOpts, maxRecvMsgSizeOpt) + } tlsCfg := sc.TLS() diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 990f86b95b..c76fe84b46 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -359,7 +359,10 @@ func initObjectService(c *cfg) { firstSvc = objectService.NewMetricCollector(signSvc, c.metricsCollector) } - server := objectTransportGRPC.New(firstSvc) + objNode, err := newNodeForObjects(c.cfgObject.cnrSource, c.netMapSource, sPut, c.IsLocalKey) + fatalOnErr(err) + + server := objectTransportGRPC.New(firstSvc, objNode) for _, srv := range c.cfgGRPC.servers { objectGRPC.RegisterObjectServiceServer(srv, server) @@ -601,3 +604,47 @@ func (h headerSource) Head(address oid.Address) (*objectSDK.Object, error) { return hw.h, nil } + +// nodeForObjects represents NeoFS storage node for object storage. +type nodeForObjects struct { + putObjectService *putsvc.Service + containerNodes *containerNodes + isLocalPubKey func([]byte) bool +} + +func newNodeForObjects(containers containercore.Source, network netmap.Source, putObjectService *putsvc.Service, isLocalPubKey func([]byte) bool) (*nodeForObjects, error) { + cnrNodes, err := newContainerNodes(containers, network) + if err != nil { + return nil, err + } + return &nodeForObjects{ + putObjectService: putObjectService, + containerNodes: cnrNodes, + isLocalPubKey: isLocalPubKey, + }, nil +} + +// ForEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key +// of each node match the referenced container's storage policy at two latest +// epochs into f. When f returns false, nil is returned instantly. +// +// Implements [object.Node] interface. +func (x *nodeForObjects) ForEachContainerNodePublicKeyInLastTwoEpochs(id cid.ID, f func(pubKey []byte) bool) error { + return x.containerNodes.forEachContainerNodePublicKeyInLastTwoEpochs(id, f) +} + +// IsOwnPublicKey checks whether given binary-encoded public key is assigned to +// local storage node in the network map. +// +// Implements [object.Node] interface. +func (x *nodeForObjects) IsOwnPublicKey(pubKey []byte) bool { + return x.isLocalPubKey(pubKey) +} + +// StoreObject checks given object's format and, if it is correct, saves the +// object in the node's local object storage. +// +// Implements [object.Node] interface. +func (x *nodeForObjects) StoreObject(obj objectSDK.Object) error { + return x.putObjectService.ValidateAndStoreObjectLocally(obj) +} diff --git a/cmd/neofs-node/policy.go b/cmd/neofs-node/policy.go new file mode 100644 index 0000000000..342bddd04e --- /dev/null +++ b/cmd/neofs-node/policy.go @@ -0,0 +1,185 @@ +package main + +import ( + "fmt" + "sync" + + "github.com/hashicorp/golang-lru/v2/simplelru" + "github.com/nspcc-dev/neofs-node/pkg/core/container" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap" +) + +// storagePolicyRes combines storage policy application result for particular +// container and network map in single unit. +type storagePolicyRes struct { + ns [][]netmapsdk.NodeInfo + err error +} + +// containerNodesAtEpoch is a thread-safe LRU cache mapping containers into +// storage policy application results for particular network map. +type containerNodesAtEpoch struct { + mtx sync.RWMutex + lru simplelru.LRUCache[cid.ID, storagePolicyRes] +} + +// containerNodes wraps NeoFS network state to apply container storage policies. +// +// Since policy application results are consistent for fixed container and +// network map, they could be cached. The containerNodes caches up to 1000 +// results for the latest and the previous epochs. The previous one is required +// to support data migration after the epoch tick. Older epochs are not cached. +type containerNodes struct { + containers container.Source + network netmap.Source + + lastCurrentEpochMtx sync.Mutex + lastCurrentEpoch uint64 + + curEpochCache, prevEpochCache *containerNodesAtEpoch +} + +const cachedContainerNodesPerEpochNum = 1000 + +func newContainerNodes(containers container.Source, network netmap.Source) (*containerNodes, error) { + lru, err := simplelru.NewLRU[cid.ID, storagePolicyRes](cachedContainerNodesPerEpochNum, nil) + if err != nil { + // should never happen + return nil, fmt.Errorf("create LRU cache for container nodes: %w", err) + } + return &containerNodes{ + containers: containers, + network: network, + curEpochCache: &containerNodesAtEpoch{lru: lru}, + prevEpochCache: new(containerNodesAtEpoch), + }, nil +} + +// forEachNodePubKeyInSets passes binary-encoded public key of each node into f. +// When f returns false, forEachNodePubKeyInSets returns false instantly. +// Otherwise, true is returned. +func forEachNodePubKeyInSets(nodeSets [][]netmapsdk.NodeInfo, f func(pubKey []byte) bool) bool { + for i := range nodeSets { + for j := range nodeSets[i] { + if !f(nodeSets[i][j].PublicKey()) { + return false + } + } + } + return true +} + +// forEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key +// of each node match the referenced container's storage policy at two latest +// epochs into f. When f returns false, nil is returned instantly. +func (x *containerNodes) forEachContainerNodePublicKeyInLastTwoEpochs(cnrID cid.ID, f func(pubKey []byte) bool) error { + curEpoch, err := x.network.Epoch() + if err != nil { + return fmt.Errorf("read current NeoFS epoch: %w", err) + } + + var curEpochCache, prevEpochCache *containerNodesAtEpoch + x.lastCurrentEpochMtx.Lock() + switch { + case curEpoch == x.lastCurrentEpoch-1: + curEpochCache = x.prevEpochCache + case curEpoch > x.lastCurrentEpoch: + curLRU, err := simplelru.NewLRU[cid.ID, storagePolicyRes](cachedContainerNodesPerEpochNum, nil) + if err != nil { + // should never happen + x.lastCurrentEpochMtx.Unlock() + return fmt.Errorf("create LRU cache for container nodes: %w", err) + } + + if curEpoch == x.lastCurrentEpoch+1 { + x.prevEpochCache = x.curEpochCache + } else { + prevLRU, err := simplelru.NewLRU[cid.ID, storagePolicyRes](cachedContainerNodesPerEpochNum, nil) + if err != nil { + // should never happen + x.lastCurrentEpochMtx.Unlock() + return fmt.Errorf("create LRU cache for container nodes: %w", err) + } + x.prevEpochCache = &containerNodesAtEpoch{lru: prevLRU} + } + x.curEpochCache = &containerNodesAtEpoch{lru: curLRU} + x.lastCurrentEpoch = curEpoch + fallthrough + case curEpoch == x.lastCurrentEpoch: + curEpochCache = x.curEpochCache + prevEpochCache = x.prevEpochCache + } + x.lastCurrentEpochMtx.Unlock() + + var cnr *container.Container + + processEpoch := func(cache *containerNodesAtEpoch, epoch uint64) (storagePolicyRes, error) { + var result storagePolicyRes + var isCached bool + if cache != nil { + cache.mtx.Lock() + defer cache.mtx.Unlock() + result, isCached = cache.lru.Get(cnrID) + } + if !isCached { + if cnr == nil { + var err error + cnr, err = x.containers.Get(cnrID) + if err != nil { + // not persistent error => do not cache + return result, fmt.Errorf("read container by ID: %w", err) + } + } + networkMap, err := x.network.GetNetMapByEpoch(epoch) + if err != nil { + // not persistent error => do not cache + return result, fmt.Errorf("read network map at epoch #%d: %w", epoch, err) + } + result.ns, result.err = networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID) + if cache != nil { + cache.lru.Add(cnrID, result) + } + } + return result, nil + } + + cur, err := processEpoch(curEpochCache, curEpoch) + if err != nil { + return err + } + if cur.err == nil && !forEachNodePubKeyInSets(cur.ns, f) { + return nil + } + if curEpoch == 0 { + if cur.err != nil { + return fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, cur.err) + } + return nil + } + + prev, err := processEpoch(prevEpochCache, curEpoch-1) + if err != nil { + if cur.err != nil { + return fmt.Errorf("select container nodes for both epochs: (previous=%d) %w, (current=%d) %w", + curEpoch-1, err, curEpoch, cur.err) + } + return err + } + if prev.err == nil && !forEachNodePubKeyInSets(prev.ns, f) { + return nil + } + + if cur.err != nil { + if prev.err != nil { + return fmt.Errorf("select container nodes for both epochs: (previous=%d) %w, (current=%d) %w", + curEpoch-1, prev.err, curEpoch, cur.err) + } + return fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, cur.err) + } + if prev.err != nil { + return fmt.Errorf("select container nodes for previous epoch #%d: %w", curEpoch-1, prev.err) + } + return nil +} diff --git a/cmd/neofs-node/policy_test.go b/cmd/neofs-node/policy_test.go new file mode 100644 index 0000000000..93eba1bece --- /dev/null +++ b/cmd/neofs-node/policy_test.go @@ -0,0 +1,229 @@ +package main + +import ( + "crypto/rand" + "errors" + "fmt" + "testing" + + containercore "github.com/nspcc-dev/neofs-node/pkg/core/container" + "github.com/nspcc-dev/neofs-sdk-go/container" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/stretchr/testify/require" +) + +type testContainer struct { + id cid.ID + val container.Container + err error +} + +func (x *testContainer) Get(id cid.ID) (*containercore.Container, error) { + if !id.Equals(x.id) { + return nil, fmt.Errorf("unexpected container requested %s!=%s", id, x.id) + } + if x.err != nil { + return nil, x.err + } + return &containercore.Container{Value: x.val}, nil +} + +type testNetwork struct { + epoch uint64 + epochErr error + + curNetmap *netmap.NetMap + curNetmapErr error + prevNetmap *netmap.NetMap + prevNetmapErr error +} + +func (x *testNetwork) GetNetMap(diff uint64) (*netmap.NetMap, error) { + panic("unexpected call") +} + +func (x *testNetwork) GetNetMapByEpoch(epoch uint64) (*netmap.NetMap, error) { + if epoch == x.epoch { + return x.curNetmap, x.curNetmapErr + } + if x.epoch > 0 && epoch == x.epoch-1 { + return x.prevNetmap, x.prevNetmapErr + } + return nil, fmt.Errorf("unexpected epoch #%d requested", epoch) +} + +func (x *testNetwork) Epoch() (uint64, error) { + return x.epoch, x.epochErr +} + +func newNetmapWithContainer(tb testing.TB, nodeNum int, selected []int) ([]netmap.NodeInfo, *netmap.NetMap, container.Container) { + nodes := make([]netmap.NodeInfo, nodeNum) +nextNode: + for i := range nodes { + key := make([]byte, 33) + _, err := rand.Read(key) + require.NoError(tb, err) + nodes[i].SetPublicKey(key) + + for j := range selected { + if i == selected[j] { + nodes[i].SetAttribute("attr", "true") + continue nextNode + } + } + + nodes[i].SetAttribute("attr", "false") + } + + var networkMap netmap.NetMap + networkMap.SetNodes(nodes) + + var policy netmap.PlacementPolicy + strPolicy := fmt.Sprintf("REP %d CBF 1 SELECT %d FROM F FILTER attr EQ true AS F", len(selected), len(selected)) + require.NoError(tb, policy.DecodeString(strPolicy)) + + nodeSets, err := networkMap.ContainerNodes(policy, cidtest.ID()) + require.NoError(tb, err) + require.Len(tb, nodeSets, 1) + require.Len(tb, nodeSets[0], len(selected)) + for i := range selected { + require.Contains(tb, nodeSets[0], nodes[selected[i]], i) + } + + var cnr container.Container + cnr.SetPlacementPolicy(policy) + + return nodes, &networkMap, cnr +} + +func TestContainerNodes_ForEachContainerNodePublicKeyInLastTwoEpochs(t *testing.T) { + const anyEpoch = 42 + anyCnr := cidtest.ID() + failOnCall := func(tb testing.TB) func([]byte) bool { + return func([]byte) bool { + tb.Fatal("must not be called") + return false + } + } + + t.Run("read current epoch", func(t *testing.T) { + epochErr := errors.New("any epoch error") + ns, err := newContainerNodes(new(testContainer), &testNetwork{epochErr: epochErr}) + require.NoError(t, err) + + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, failOnCall(t)) + require.ErrorIs(t, err, epochErr) + }) + + t.Run("read container failure", func(t *testing.T) { + cnrErr := errors.New("any container error") + ns, err := newContainerNodes(&testContainer{ + id: anyCnr, + err: cnrErr, + }, &testNetwork{epoch: anyEpoch}) + require.NoError(t, err) + + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, failOnCall(t)) + require.ErrorIs(t, err, cnrErr) + }) + + t.Run("read current netmap failure", func(t *testing.T) { + curNetmapErr := errors.New("any current netmap error") + ns, err := newContainerNodes(&testContainer{id: anyCnr}, &testNetwork{ + epoch: anyEpoch, + curNetmapErr: curNetmapErr, + }) + require.NoError(t, err) + + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, failOnCall(t)) + require.ErrorIs(t, err, curNetmapErr) + }) + + t.Run("zero current epoch", func(t *testing.T) { + nodes, curNetmap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}) + + ns, err := newContainerNodes(&testContainer{id: anyCnr, val: cnr}, &testNetwork{ + epoch: 0, + curNetmap: curNetmap, + }) + require.NoError(t, err) + + var calledKeys [][]byte + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool { + calledKeys = append(calledKeys, pubKey) + return true + }) + require.NoError(t, err) + require.Len(t, calledKeys, 2) + require.Contains(t, calledKeys, nodes[1].PublicKey()) + require.Contains(t, calledKeys, nodes[3].PublicKey()) + }) + + t.Run("zero current epoch", func(t *testing.T) { + nodes, curNetmap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}) + + ns, err := newContainerNodes(&testContainer{id: anyCnr, val: cnr}, &testNetwork{ + epoch: 0, + curNetmap: curNetmap, + }) + require.NoError(t, err) + + var calledKeys [][]byte + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool { + calledKeys = append(calledKeys, pubKey) + return true + }) + require.NoError(t, err) + require.Len(t, calledKeys, 2) + require.Contains(t, calledKeys, nodes[1].PublicKey()) + require.Contains(t, calledKeys, nodes[3].PublicKey()) + }) + + t.Run("read previous network map failure", func(t *testing.T) { + nodes, curNetmap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}) + prevNetmapErr := errors.New("any previous netmap error") + + ns, err := newContainerNodes(&testContainer{id: anyCnr, val: cnr}, &testNetwork{ + epoch: anyEpoch, + curNetmap: curNetmap, + prevNetmapErr: prevNetmapErr, + }) + require.NoError(t, err) + + var calledKeys [][]byte + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool { + calledKeys = append(calledKeys, pubKey) + return true + }) + require.ErrorIs(t, err, prevNetmapErr) + require.Len(t, calledKeys, 2) + require.Contains(t, calledKeys, nodes[1].PublicKey()) + require.Contains(t, calledKeys, nodes[3].PublicKey()) + }) + + t.Run("both epochs OK", func(t *testing.T) { + curNodes, curNetmap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}) + prevNodes, prevNetmap, _ := newNetmapWithContainer(t, 5, []int{0, 4}) + + ns, err := newContainerNodes(&testContainer{id: anyCnr, val: cnr}, &testNetwork{ + epoch: anyEpoch, + curNetmap: curNetmap, + prevNetmap: prevNetmap, + }) + require.NoError(t, err) + + var calledKeys [][]byte + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool { + calledKeys = append(calledKeys, pubKey) + return true + }) + require.NoError(t, err) + require.Len(t, calledKeys, 4) + require.Contains(t, calledKeys, curNodes[1].PublicKey()) + require.Contains(t, calledKeys, curNodes[3].PublicKey()) + require.Contains(t, calledKeys, prevNodes[0].PublicKey()) + require.Contains(t, calledKeys, prevNodes[4].PublicKey()) + }) +} diff --git a/pkg/network/transport/object/grpc/replication.go b/pkg/network/transport/object/grpc/replication.go new file mode 100644 index 0000000000..358a1aa5e2 --- /dev/null +++ b/pkg/network/transport/object/grpc/replication.go @@ -0,0 +1,192 @@ +package object + +import ( + "bytes" + "context" + "errors" + "fmt" + + objectv2 "github.com/nspcc-dev/neofs-api-go/v2/object" + objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" + refsv2 "github.com/nspcc-dev/neofs-api-go/v2/refs" + refs "github.com/nspcc-dev/neofs-api-go/v2/refs/grpc" + status "github.com/nspcc-dev/neofs-api-go/v2/status/grpc" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" + "github.com/nspcc-dev/neofs-sdk-go/object" +) + +// Replicate serves neo.fs.v2.object.ObjectService/Replicate RPC. +func (s *Server) Replicate(_ context.Context, req *objectGRPC.ReplicateRequest) (*objectGRPC.ReplicateResponse, error) { + if req.Object == nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, Message: "binary object field is missing/empty", + }}, nil + } + + if req.Object.ObjectId == nil || len(req.Object.ObjectId.Value) == 0 { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, Message: "ID field is missing/empty in the object field", + }}, nil + } + + if req.Signature == nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, Message: "missing object signature field", + }}, nil + } + + if len(req.Signature.Key) == 0 { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, Message: "public key field is missing/empty in the object signature field", + }}, nil + } + + if len(req.Signature.Sign) == 0 { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, Message: "signature value is missing/empty in the object signature field", + }}, nil + } + + switch scheme := req.Signature.Scheme; scheme { + default: + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: "unsupported scheme in the object signature field", + }}, nil + case + refs.SignatureScheme_ECDSA_SHA512, + refs.SignatureScheme_ECDSA_RFC6979_SHA256, + refs.SignatureScheme_ECDSA_RFC6979_SHA256_WALLET_CONNECT: + } + + hdr := req.Object.GetHeader() + if hdr == nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: "missing header field in the object field", + }}, nil + } + + gCnrMsg := hdr.GetContainerId() + if gCnrMsg == nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: "missing container ID field in the object header field", + }}, nil + } + + var cnr cid.ID + var cnrMsg refsv2.ContainerID + err := cnrMsg.FromGRPCMessage(gCnrMsg) + if err == nil { + err = cnr.ReadFromV2(cnrMsg) + } + if err != nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: fmt.Sprintf("invalid container ID in the object header field: %v", err), + }}, nil + } + + var pubKey neofscrypto.PublicKey + switch req.Signature.Scheme { + // other cases already checked above + case refs.SignatureScheme_ECDSA_SHA512: + pubKey = new(neofsecdsa.PublicKey) + err = pubKey.Decode(req.Signature.Key) + if err != nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: "invalid ECDSA public key in the object signature field", + }}, nil + } + case refs.SignatureScheme_ECDSA_RFC6979_SHA256: + pubKey = new(neofsecdsa.PublicKeyRFC6979) + err = pubKey.Decode(req.Signature.Key) + if err != nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: "invalid ECDSA public key in the object signature field", + }}, nil + } + case refs.SignatureScheme_ECDSA_RFC6979_SHA256_WALLET_CONNECT: + pubKey = new(neofsecdsa.PublicKeyWalletConnect) + err = pubKey.Decode(req.Signature.Key) + if err != nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: "invalid ECDSA public key in the object signature field", + }}, nil + } + } + if !pubKey.Verify(req.Object.ObjectId.Value, req.Signature.Sign) { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: "signature mismatch in the object signature field", + }}, nil + } + + var clientInCnr, serverInCnr bool + err = s.node.ForEachContainerNodePublicKeyInLastTwoEpochs(cnr, func(pubKey []byte) bool { + if !serverInCnr { + serverInCnr = s.node.IsOwnPublicKey(pubKey) + } + if !clientInCnr { + clientInCnr = bytes.Equal(pubKey, req.Signature.Key) + } + return !clientInCnr || !serverInCnr + }) + if err != nil { + if errors.Is(err, apistatus.ErrContainerNotFound) { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeContainerNotFound, + Message: "failed to check server's compliance to object's storage policy: object's container not found", + }}, nil + } + + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: fmt.Sprintf("failed to apply object's storage policy: %v", err), + }}, nil + } else if !serverInCnr { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeAccessDenied, Message: "server does not match the object's storage policy", + }}, nil + } else if !clientInCnr { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeAccessDenied, Message: "client does not match the object's storage policy", + }}, nil + } + + // TODO(@cthulhu-rider): avoid decoding the object completely + obj, err := objectFromMessage(req.Object) + if err != nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: fmt.Sprintf("invalid object field: %v", err), + }}, nil + } + + err = s.node.StoreObject(*obj) + if err != nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: fmt.Sprintf("failed to store object locally: %v", err), + }}, nil + } + + return new(objectGRPC.ReplicateResponse), nil +} + +func objectFromMessage(gMsg *objectGRPC.Object) (*object.Object, error) { + var msg objectv2.Object + err := msg.FromGRPCMessage(gMsg) + if err != nil { + return nil, err + } + + return object.NewFromV2(&msg), nil +} diff --git a/pkg/network/transport/object/grpc/replication_test.go b/pkg/network/transport/object/grpc/replication_test.go new file mode 100644 index 0000000000..3a3be75dbb --- /dev/null +++ b/pkg/network/transport/object/grpc/replication_test.go @@ -0,0 +1,437 @@ +package object_test + +import ( + "bytes" + "context" + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "errors" + "testing" + + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + objectgrpc "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" + refs "github.com/nspcc-dev/neofs-api-go/v2/refs/grpc" + . "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc" + objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" + "github.com/nspcc-dev/neofs-sdk-go/crypto/test" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" + "github.com/stretchr/testify/require" +) + +func randECDSAPrivateKey(tb testing.TB) *ecdsa.PrivateKey { + k, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + require.NoError(tb, err) + return k +} + +type noCallObjectService struct{} + +func (x noCallObjectService) Get(*objectV2.GetRequest, objectSvc.GetObjectStream) error { + panic("must not be called") +} + +func (x noCallObjectService) Put(context.Context) (objectSvc.PutObjectStream, error) { + panic("must not be called") +} + +func (x noCallObjectService) Head(context.Context, *objectV2.HeadRequest) (*objectV2.HeadResponse, error) { + panic("must not be called") +} + +func (x noCallObjectService) Search(*objectV2.SearchRequest, objectSvc.SearchStream) error { + panic("must not be called") +} + +func (x noCallObjectService) Delete(context.Context, *objectV2.DeleteRequest) (*objectV2.DeleteResponse, error) { + panic("must not be called") +} + +func (x noCallObjectService) GetRange(*objectV2.GetRangeRequest, objectSvc.GetObjectRangeStream) error { + panic("must not be called") +} + +func (x noCallObjectService) GetRangeHash(context.Context, *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) { + panic("must not be called") +} + +type noCallTestNode struct{} + +func (x *noCallTestNode) ForEachContainerNodePublicKeyInLastTwoEpochs(cid.ID, func([]byte) bool) error { + panic("must not be called") +} + +func (x *noCallTestNode) IsOwnPublicKey([]byte) bool { + panic("must not be called") +} + +func (x *noCallTestNode) StoreObject(object.Object) error { + panic("must not be called") +} + +type testNode struct { + tb testing.TB + + // server state + serverPubKey []byte + + // request data + clientPubKey []byte + cnr cid.ID + obj *objectgrpc.Object + + // return + cnrErr error + clientOutsideCnr bool + serverOutsideCnr bool + + storeErr error +} + +func newTestNode(tb testing.TB, serverPubKey, clientPubKey []byte, cnr cid.ID, obj *objectgrpc.Object) *testNode { + return &testNode{ + tb: tb, + serverPubKey: serverPubKey, + clientPubKey: clientPubKey, + cnr: cnr, + obj: obj, + } +} + +func (x *testNode) ForEachContainerNodePublicKeyInLastTwoEpochs(cnr cid.ID, f func(pubKey []byte) bool) error { + require.True(x.tb, cnr.Equals(x.cnr)) + require.NotNil(x.tb, f) + if x.cnrErr != nil { + return x.cnrErr + } + if !x.clientOutsideCnr && !f(x.clientPubKey) { + return nil + } + if !x.serverOutsideCnr && !f(x.serverPubKey) { + return nil + } + return nil +} + +func (x *testNode) IsOwnPublicKey(pubKey []byte) bool { return bytes.Equal(x.serverPubKey, pubKey) } + +func (x *testNode) StoreObject(obj object.Object) error { + require.Equal(x.tb, x.obj, obj.ToV2().ToGRPCMessage().(*objectgrpc.Object)) + return x.storeErr +} + +func anyValidRequest(tb testing.TB, signer neofscrypto.Signer, cnr cid.ID, objID oid.ID) *objectgrpc.ReplicateRequest { + obj := objecttest.Object(tb) + obj.SetContainerID(cnr) + obj.SetID(objID) + + sig, err := signer.Sign(objID[:]) + require.NoError(tb, err) + + req := &objectgrpc.ReplicateRequest{ + Object: obj.ToV2().ToGRPCMessage().(*objectgrpc.Object), + Signature: &refs.Signature{ + Key: neofscrypto.PublicKeyBytes(signer.Public()), + Sign: sig, + }, + } + + switch signer.Scheme() { + default: + tb.Fatalf("unsupported scheme %v", signer.Scheme()) + case neofscrypto.ECDSA_SHA512: + req.Signature.Scheme = refs.SignatureScheme_ECDSA_SHA512 + case neofscrypto.ECDSA_DETERMINISTIC_SHA256: + req.Signature.Scheme = refs.SignatureScheme_ECDSA_RFC6979_SHA256 + case neofscrypto.ECDSA_WALLETCONNECT: + req.Signature.Scheme = refs.SignatureScheme_ECDSA_RFC6979_SHA256_WALLET_CONNECT + } + + return req +} + +func TestServer_Replicate(t *testing.T) { + var noCallNode noCallTestNode + var noCallObjSvc noCallObjectService + noCallSrv := New(noCallObjSvc, &noCallNode) + clientSigner := test.RandomSigner(t) + clientPubKey := neofscrypto.PublicKeyBytes(clientSigner.Public()) + serverPubKey := neofscrypto.PublicKeyBytes(test.RandomSigner(t).Public()) + cnr := cidtest.ID() + objID := oidtest.ID() + req := anyValidRequest(t, clientSigner, cnr, objID) + + t.Run("invalid/unsupported signature format", func(t *testing.T) { + // note: verification is tested separately + for _, tc := range []struct { + name string + fSig func() *refs.Signature + expectedCode uint32 + expectedMsg string + }{ + { + name: "missing object signature field", + fSig: func() *refs.Signature { return nil }, + expectedCode: 1024, + expectedMsg: "missing object signature field", + }, + { + name: "missing public key field in the signature field", + fSig: func() *refs.Signature { + return &refs.Signature{ + Key: nil, + Sign: []byte("any non-empty"), + Scheme: refs.SignatureScheme_ECDSA_SHA512, // any supported + } + }, + expectedCode: 1024, + expectedMsg: "public key field is missing/empty in the object signature field", + }, + { + name: "missing value field in the signature field", + fSig: func() *refs.Signature { + return &refs.Signature{ + Key: []byte("any non-empty"), + Sign: []byte{}, + Scheme: refs.SignatureScheme_ECDSA_SHA512, // any supported + } + }, + expectedCode: 1024, + expectedMsg: "signature value is missing/empty in the object signature field", + }, + { + name: "unsupported scheme in the signature field", + fSig: func() *refs.Signature { + return &refs.Signature{ + Key: []byte("any non-empty"), + Sign: []byte("any non-empty"), + Scheme: 3, + } + }, + expectedCode: 1024, + expectedMsg: "unsupported scheme in the object signature field", + }, + } { + req := anyValidRequest(t, test.RandomSigner(t), cidtest.ID(), oidtest.ID()) + req.Signature = tc.fSig() + resp, err := noCallSrv.Replicate(context.Background(), req) + require.NoError(t, err, tc.name) + require.EqualValues(t, tc.expectedCode, resp.GetStatus().GetCode(), tc.name) + require.Equal(t, tc.expectedMsg, resp.GetStatus().GetMessage(), tc.name) + } + }) + + t.Run("signature verification failure", func(t *testing.T) { + // note: common format is tested separately + for _, tc := range []struct { + name string + fSig func(bObj []byte) *refs.Signature + expectedCode uint32 + expectedMsg string + }{ + { + name: "ECDSA SHA-512: invalid public key", + fSig: func(_ []byte) *refs.Signature { + return &refs.Signature{ + Key: []byte("not ECDSA key"), + Sign: []byte("any non-empty"), + Scheme: refs.SignatureScheme_ECDSA_SHA512, + } + }, + expectedCode: 1024, + expectedMsg: "invalid ECDSA public key in the object signature field", + }, + { + name: "ECDSA SHA-512: signature mismatch", + fSig: func(bObj []byte) *refs.Signature { + return &refs.Signature{ + Key: neofscrypto.PublicKeyBytes((*neofsecdsa.Signer)(randECDSAPrivateKey(t)).Public()), + Sign: []byte("definitely invalid"), + Scheme: refs.SignatureScheme_ECDSA_SHA512, + } + }, + expectedCode: 1024, + expectedMsg: "signature mismatch in the object signature field", + }, + { + name: "ECDSA SHA-256 deterministic: invalid public key", + fSig: func(_ []byte) *refs.Signature { + return &refs.Signature{ + Key: []byte("not ECDSA key"), + Sign: []byte("any non-empty"), + Scheme: refs.SignatureScheme_ECDSA_RFC6979_SHA256, + } + }, + expectedCode: 1024, + expectedMsg: "invalid ECDSA public key in the object signature field", + }, + { + name: "ECDSA SHA-256 deterministic: signature mismatch", + fSig: func(bObj []byte) *refs.Signature { + return &refs.Signature{ + Key: neofscrypto.PublicKeyBytes((*neofsecdsa.SignerRFC6979)(randECDSAPrivateKey(t)).Public()), + Sign: []byte("definitely invalid"), + Scheme: refs.SignatureScheme_ECDSA_RFC6979_SHA256, + } + }, + expectedCode: 1024, + expectedMsg: "signature mismatch in the object signature field", + }, + { + name: "ECDSA SHA-256 WalletConnect: invalid public key", + fSig: func(_ []byte) *refs.Signature { + return &refs.Signature{ + Key: []byte("not ECDSA key"), + Sign: []byte("any non-empty"), + Scheme: refs.SignatureScheme_ECDSA_RFC6979_SHA256_WALLET_CONNECT, + } + }, + expectedCode: 1024, + expectedMsg: "invalid ECDSA public key in the object signature field", + }, + { + name: "ECDSA SHA-256 WalletConnect: signature mismatch", + fSig: func(bObj []byte) *refs.Signature { + return &refs.Signature{ + Key: neofscrypto.PublicKeyBytes((*neofsecdsa.SignerWalletConnect)(randECDSAPrivateKey(t)).Public()), + Sign: []byte("definitely invalid"), + Scheme: refs.SignatureScheme_ECDSA_RFC6979_SHA256_WALLET_CONNECT, + } + }, + expectedCode: 1024, + expectedMsg: "signature mismatch in the object signature field", + }, + } { + obj := objecttest.Object(t) + bObj, err := obj.Marshal() + require.NoError(t, err) + + resp, err := noCallSrv.Replicate(context.Background(), &objectgrpc.ReplicateRequest{ + Object: obj.ToV2().ToGRPCMessage().(*objectgrpc.Object), + Signature: tc.fSig(bObj), + }) + require.NoError(t, err, tc.name) + require.EqualValues(t, tc.expectedCode, resp.GetStatus().GetCode(), tc.name) + require.Equal(t, tc.expectedMsg, resp.GetStatus().GetMessage(), tc.name) + } + }) + + t.Run("apply storage policy failure", func(t *testing.T) { + node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) + srv := New(noCallObjSvc, node) + + node.cnrErr = errors.New("any error") + + resp, err := srv.Replicate(context.Background(), req) + require.NoError(t, err) + require.EqualValues(t, 1024, resp.GetStatus().GetCode()) + require.Equal(t, "failed to apply object's storage policy: any error", resp.GetStatus().GetMessage()) + }) + + t.Run("client or server mismatches object's storage policy", func(t *testing.T) { + node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) + srv := New(noCallObjSvc, node) + + node.serverOutsideCnr = true + node.clientOutsideCnr = true + + resp, err := srv.Replicate(context.Background(), req) + require.NoError(t, err) + require.EqualValues(t, 2048, resp.GetStatus().GetCode()) + require.Equal(t, "server does not match the object's storage policy", resp.GetStatus().GetMessage()) + + node.serverOutsideCnr = false + + resp, err = srv.Replicate(context.Background(), req) + require.NoError(t, err) + require.EqualValues(t, 2048, resp.GetStatus().GetCode()) + require.Equal(t, "client does not match the object's storage policy", resp.GetStatus().GetMessage()) + }) + + t.Run("local storage failure", func(t *testing.T) { + node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) + srv := New(noCallObjSvc, node) + + node.storeErr = errors.New("any error") + + resp, err := srv.Replicate(context.Background(), req) + require.NoError(t, err) + require.EqualValues(t, 1024, resp.GetStatus().GetCode()) + require.Equal(t, "failed to store object locally: any error", resp.GetStatus().GetMessage()) + }) + + t.Run("OK", func(t *testing.T) { + node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) + srv := New(noCallObjSvc, node) + + resp, err := srv.Replicate(context.Background(), req) + require.NoError(t, err) + require.EqualValues(t, 0, resp.GetStatus().GetCode()) + require.Empty(t, resp.GetStatus().GetMessage()) + }) +} + +type nopNode struct{} + +func (x nopNode) ForEachContainerNodePublicKeyInLastTwoEpochs(cid.ID, func(pubKey []byte) bool) error { + return nil +} + +func (x nopNode) IsOwnPublicKey([]byte) bool { + return false +} + +func (x nopNode) StoreObject(object.Object) error { + return nil +} + +func BenchmarkServer_Replicate(b *testing.B) { + ctx := context.Background() + var node nopNode + + srv := New(nil, node) + + for _, tc := range []struct { + name string + newSigner func(tb testing.TB) neofscrypto.Signer + }{ + { + name: "ECDSA SHA-512", + newSigner: func(tb testing.TB) neofscrypto.Signer { + return (*neofsecdsa.Signer)(randECDSAPrivateKey(tb)) + }, + }, + { + name: "ECDSA SHA-256 deterministic", + newSigner: func(tb testing.TB) neofscrypto.Signer { + return (*neofsecdsa.SignerRFC6979)(randECDSAPrivateKey(tb)) + }, + }, + { + name: "ECDSA SHA-256 WalletConnect", + newSigner: func(tb testing.TB) neofscrypto.Signer { + return (*neofsecdsa.SignerWalletConnect)(randECDSAPrivateKey(tb)) + }, + }, + } { + b.Run(tc.name, func(b *testing.B) { + req := anyValidRequest(b, tc.newSigner(b), cidtest.ID(), oidtest.ID()) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + resp, err := srv.Replicate(ctx, req) + require.NoError(b, err) + require.Zero(b, resp.GetStatus().GetCode()) + } + }) + } +} diff --git a/pkg/network/transport/object/grpc/service.go b/pkg/network/transport/object/grpc/service.go index 29765f6126..68e263892d 100644 --- a/pkg/network/transport/object/grpc/service.go +++ b/pkg/network/transport/object/grpc/service.go @@ -7,21 +7,54 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/object" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" + status "github.com/nspcc-dev/neofs-api-go/v2/status/grpc" objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" "github.com/nspcc-dev/neofs-node/pkg/services/util" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + objectsdk "github.com/nspcc-dev/neofs-sdk-go/object" ) +// Various NeoFS protocol status codes. +const ( + codeInternal = uint32(1024*status.Section_SECTION_FAILURE_COMMON) + uint32(status.CommonFail_INTERNAL) + codeAccessDenied = uint32(1024*status.Section_SECTION_OBJECT) + uint32(status.Object_ACCESS_DENIED) + codeContainerNotFound = uint32(1024*status.Section_SECTION_CONTAINER) + uint32(status.Container_CONTAINER_NOT_FOUND) +) + +// Node represents NeoFS storage node that is served by [Server]. +type Node interface { + // ForEachContainerNodePublicKeyInLastTwoEpochs iterates over all nodes matching + // the referenced container's storage policy at the current and the previous + // NeoFS epochs, and passes their public keys into f. IterateContainerNodeKeys + // breaks without an error when f returns false. Keys may be repeated. + // + // Returns [apistatus.ErrContainerNotFound] if referenced container was not + // found. + ForEachContainerNodePublicKeyInLastTwoEpochs(cid.ID, func(pubKey []byte) bool) error + + // IsOwnPublicKey checks whether given pubKey assigned to Node in the NeoFS + // network map. + IsOwnPublicKey(pubKey []byte) bool + + // StoreObject saves given NeoFS object into local object storage of the Node. + // StoreObject is called only when the Node complies with the container's + // storage policy. + StoreObject(objectsdk.Object) error +} + // Server wraps NeoFS API Object service and // provides gRPC Object service server interface. type Server struct { - objectGRPC.UnimplementedObjectServiceServer srv objectSvc.ServiceServer + + node Node } // New creates, initializes and returns Server instance. -func New(c objectSvc.ServiceServer) *Server { +func New(c objectSvc.ServiceServer, node Node) *Server { return &Server{ - srv: c, + srv: c, + node: node, } } diff --git a/pkg/services/object/put/local.go b/pkg/services/object/put/local.go index 77a0d614c9..db8ea03aff 100644 --- a/pkg/services/object/put/local.go +++ b/pkg/services/object/put/local.go @@ -1,11 +1,16 @@ package putsvc import ( + "bytes" + "crypto/sha256" + "errors" "fmt" objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-sdk-go/checksum" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/tzhash/tz" ) // ObjectStorage is an object storage interface. @@ -38,26 +43,118 @@ func (t *localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMet } func (t *localTarget) Close() (oid.ID, error) { - switch t.meta.Type() { + err := putObjectLocally(t.storage, t.obj, t.meta) + if err != nil { + return oid.ID{}, err + } + + id, _ := t.obj.ID() + + return id, nil +} + +func putObjectLocally(storage ObjectStorage, obj *object.Object, meta objectCore.ContentMeta) error { + switch meta.Type() { case object.TypeTombstone: - err := t.storage.Delete(objectCore.AddressOf(t.obj), t.meta.Objects()) + err := storage.Delete(objectCore.AddressOf(obj), meta.Objects()) if err != nil { - return oid.ID{}, fmt.Errorf("could not delete objects from tombstone locally: %w", err) + return fmt.Errorf("could not delete objects from tombstone locally: %w", err) } case object.TypeLock: - err := t.storage.Lock(objectCore.AddressOf(t.obj), t.meta.Objects()) + err := storage.Lock(objectCore.AddressOf(obj), meta.Objects()) if err != nil { - return oid.ID{}, fmt.Errorf("could not lock object from lock objects locally: %w", err) + return fmt.Errorf("could not lock object from lock objects locally: %w", err) } default: // objects that do not change meta storage } - if err := t.storage.Put(t.obj); err != nil { - return oid.ID{}, fmt.Errorf("(%T) could not put object to local storage: %w", t, err) + if err := storage.Put(obj); err != nil { + return fmt.Errorf("could not put object to local storage: %w", err) } - id, _ := t.obj.ID() + return nil +} - return id, nil +// ValidateAndStoreObjectLocally checks format of given object and, if it's +// correct, stores it in the underlying local object storage. Serves operation +// similar to local-only [Service.Put] one. +func (p *Service) ValidateAndStoreObjectLocally(obj object.Object) error { + cnrID, ok := obj.ContainerID() + if !ok { + return errors.New("missing container ID") + } + + cs, csSet := obj.PayloadChecksum() + if !csSet { + return errors.New("missing payload checksum") + } + + csType := cs.Type() + switch csType { + default: + return errors.New("unsupported payload checksum type") + case + checksum.SHA256, + checksum.TZ: + } + + maxPayloadSz := p.maxSizeSrc.MaxObjectSize() + if maxPayloadSz == 0 { + return errors.New("failed to obtain max payload size setting") + } + + payload := obj.Payload() + payloadSz := obj.PayloadSize() + if payloadSz != uint64(len(payload)) { + return ErrWrongPayloadSize + } + + if payloadSz > maxPayloadSz { + return ErrExceedingMaxSize + } + + cnr, err := p.cnrSrc.Get(cnrID) + if err != nil { + return fmt.Errorf("read container by ID: %w", err) + } + + if !cnr.Value.IsHomomorphicHashingDisabled() { + csHomo, csHomoSet := obj.PayloadHomomorphicHash() + switch { + case !csHomoSet: + return errors.New("missing homomorphic payload checksum") + case csHomo.Type() != checksum.TZ: + return fmt.Errorf("wrong/unsupported type of homomorphic payload checksum, expected %s", checksum.TZ) + case len(csHomo.Value()) != tz.Size: + return fmt.Errorf("invalid/unsupported length of %s homomorphic payload checksum, expected %d", + csHomo.Type(), tz.Size) + } + } + + if err := p.fmtValidator.Validate(&obj, false); err != nil { + return fmt.Errorf("validate object format: %w", err) + } + + objMeta, err := p.fmtValidator.ValidateContent(&obj) + if err != nil { + return fmt.Errorf("validate payload content: %w", err) + } + + switch csType { + default: + return errors.New("unsupported payload checksum type") + case checksum.SHA256: + h := sha256.Sum256(payload) + if !bytes.Equal(h[:], cs.Value()) { + return errors.New("payload SHA-256 checksum mismatch") + } + case checksum.TZ: + h := tz.Sum(payload) + if !bytes.Equal(h[:], cs.Value()) { + return errors.New("payload Tillich-Zemor checksum mismatch") + } + } + + return putObjectLocally(p.localStore, &obj, objMeta) }