Skip to content

Commit

Permalink
Fix/relay requests (#3014)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Nov 14, 2024
2 parents b7714ae + 3ea9280 commit 31fa3b4
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ attribute, which is used for container domain name in NNS contracts (#2954)
- RPC client reconnection failures leading to complete SN failure (#2797)
- `meta.DB.Open(readOnly)` moves metabase in RO mode (#3000)
- Panic in event listener related to inability to switch RPC node (#2970)
- Non-container nodes never check placement policy on PUT, SEARCH requests (#3014)

### Changed
- `ObjectService`'s `Put` RPC handler caches up to 10K lists of per-object sorted container nodes (#2901)
Expand Down
1 change: 1 addition & 0 deletions pkg/services/object/put/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er
return
}
}); err != nil {
wg.Done()
svcutil.LogWorkerPoolError(x.log, "PUT", err)
err = fmt.Errorf("submit next job to save an object to the worker pool: %w", err)
if e, _ := lastRespErr.Load().(error); e != nil {
Expand Down
15 changes: 15 additions & 0 deletions pkg/services/object/put/v2/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
sessionV2 "github.com/nspcc-dev/neofs-api-go/v2/session"
"github.com/nspcc-dev/neofs-api-go/v2/signature"
"github.com/nspcc-dev/neofs-api-go/v2/status"
"github.com/nspcc-dev/neofs-node/pkg/core/client"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object/internal"
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
)

type streamer struct {
Expand Down Expand Up @@ -178,8 +180,21 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien
err = fmt.Errorf("response verification failed: %w", err)
}

err = checkStatus(resp.GetMetaHeader().GetStatus())
if err != nil {
err = fmt.Errorf("remote node response: %w", err)
}

return
})

return firstErr
}

func checkStatus(stV2 *status.Status) error {
if !status.IsSuccess(stV2.Code()) {
return apistatus.ErrorFromV2(stV2)
}

return nil
}
14 changes: 14 additions & 0 deletions pkg/services/object/search/v2/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import (
rpcclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-api-go/v2/session"
"github.com/nspcc-dev/neofs-api-go/v2/signature"
"github.com/nspcc-dev/neofs-api-go/v2/status"
"github.com/nspcc-dev/neofs-node/pkg/core/client"
"github.com/nspcc-dev/neofs-node/pkg/network"
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object/internal"
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand Down Expand Up @@ -116,6 +118,10 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre
return nil, fmt.Errorf("could not verify %T: %w", resp, err)
}

if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, fmt.Errorf("remote node response: %w", err)
}

chunk := resp.GetBody().GetIDList()
var id oid.ID

Expand All @@ -139,6 +145,14 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre
return p, nil
}

func checkStatus(stV2 *status.Status) error {
if !status.IsSuccess(stV2.Code()) {
return apistatus.ErrorFromV2(stV2)
}

return nil
}

func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressClient, []byte) ([]oid.ID, error)) searchsvc.RequestForwarder {
return func(info client.NodeInfo, c client.MultiAddressClient) ([]oid.ID, error) {
var (
Expand Down

0 comments on commit 31fa3b4

Please sign in to comment.