Skip to content

Commit

Permalink
[nspcc-dev#1978] cli/object: Gather all related object in delete session
Browse files Browse the repository at this point in the history
Object removal session should reflect all objects related to the
removing one.

Make `OpenSessionViaClient` to gather the split members of the original
object in order to spread the session to them.

Signed-off-by: Leonard Lyubich <ctulhurider@gmail.com>
  • Loading branch information
cthulhu-rider committed Oct 27, 2022
1 parent 3c6daa2 commit 24dd213
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Changelog for NeoFS Node
- Inability to provide session to NeoFS CLI in a NeoFS-binary format (#1933)
- `neofs-adm` now works correctly with a committee of more than 4 nodes (#1949, #1959)
- Closing a shard now waits until GC background workers stop (#1964)
- Missing object relatives in object removal session opened by NeoFS CLI (#1978)

### Removed
### Updated
Expand Down
177 changes: 170 additions & 7 deletions cmd/neofs-cli/modules/object/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -204,7 +205,12 @@ func ReadOrOpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.
return
}

finalizeSession(cmd, dst, tok, key, cnr, obj)
var objs []oid.ID
if obj != nil {
objs = []oid.ID{*obj}
}

finalizeSession(cmd, dst, tok, key, cnr, objs...)
dst.SetClient(cli)
}

Expand All @@ -222,7 +228,26 @@ func OpenSession(cmd *cobra.Command, dst SessionPrm, key *ecdsa.PrivateKey, cnr
//
// *internal.PutObjectPrm
// *internal.DeleteObjectPrm
//
// If provided SessionPrm is of type internal.DeleteObjectPrm, OpenSessionViaClient
// spreads the session to all object's relatives.
func OpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.Client, key *ecdsa.PrivateKey, cnr cid.ID, obj *oid.ID) {
var objs []oid.ID

if obj != nil {
if _, ok := dst.(*internal.DeleteObjectPrm); ok {
common.PrintVerbose("Collecting relatives of the removal object...")

rels := collectObjectRelatives(cmd, cli, cnr, *obj)

if len(rels) == 0 {
objs = []oid.ID{*obj}
} else {
objs = append(rels, *obj)
}
}
}

var tok session.Object

const sessionLifetime = 10 // in NeoFS epochs
Expand All @@ -234,21 +259,21 @@ func OpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.Client

common.PrintVerbose("Session successfully opened.")

finalizeSession(cmd, dst, &tok, key, cnr, obj)
finalizeSession(cmd, dst, &tok, key, cnr, objs...)

dst.SetClient(cli)
}

// specifies session verb, binds the session to the given container and limits
// the session by the given object (if specified). After all data is written,
// the session by the given objects (if specified). After all data is written,
// signs session using provided private key and writes the session into the
// given SessionPrm.
//
// SessionPrm MUST be one of:
//
// *internal.PutObjectPrm
// *internal.DeleteObjectPrm
func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, key *ecdsa.PrivateKey, cnr cid.ID, obj *oid.ID) {
func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, key *ecdsa.PrivateKey, cnr cid.ID, objs ...oid.ID) {
common.PrintVerbose("Finalizing session token...")

switch dst.(type) {
Expand All @@ -265,9 +290,9 @@ func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, ke
common.PrintVerbose("Binding session to container %s...", cnr)

tok.BindContainer(cnr)
if obj != nil {
common.PrintVerbose("Limiting session by object %s...", obj)
tok.LimitByObjects(*obj)
if len(objs) > 0 {
common.PrintVerbose("Limiting session by the objects %v...", objs)
tok.LimitByObjects(objs...)
}

common.PrintVerbose("Signing session...")
Expand All @@ -284,3 +309,141 @@ func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, ke
func initFlagSession(cmd *cobra.Command, verb string) {
commonflags.InitSession(cmd, "object "+verb)
}

// collects and returns all relatives of the given object stored in the specified
// container. Empty result without an error means lack of relationship in the
// container.
//
// The object itself is not included in the result.
func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID, obj oid.ID) []oid.ID {
common.PrintVerbose("Fetching raw object header...")

// request raw header first
var addrObj oid.Address
addrObj.SetContainer(cnr)
addrObj.SetObject(obj)

var prmHead internal.HeadObjectPrm
prmHead.SetClient(cli)
prmHead.SetAddress(addrObj)
prmHead.SetRawFlag(true)

_, err := internal.HeadObject(prmHead)

var errSplit *object.SplitInfoError

switch {
default:
common.ExitOnErr(cmd, "failed to get raw object header: %w", err)
case err == nil:
common.PrintVerbose("Raw header received - object is singular.")
return nil
case errors.As(err, &errSplit):
common.PrintVerbose("Split information received - object is virtual.")
}

splitInfo := errSplit.SplitInfo()

// collect split chain by the descending ease of operations (ease is evaluated heuristically).
// If any approach fails, we don't try the next since we assume that it will fail too.

if idLinking, ok := splitInfo.Link(); ok {
common.PrintVerbose("Collecting split members using linking object %s...", idLinking)

addrObj.SetObject(idLinking)
prmHead.SetAddress(addrObj)
prmHead.SetRawFlag(false)
// client is already set

res, err := internal.HeadObject(prmHead)
common.ExitOnErr(cmd, "failed to get linking object's header: %w", err)

children := res.Header().Children()

common.PrintVerbose("Received split members from the linking object: %v", children)

// include linking object
return append(children, idLinking)
}

if idSplit := splitInfo.SplitID(); idSplit != nil {
common.PrintVerbose("Collecting split members by split ID...")

var query object.SearchFilters
query.AddSplitIDFilter(object.MatchStringEqual, idSplit)

var prm internal.SearchObjectsPrm
prm.SetContainerID(cnr)
prm.SetClient(cli)
prm.SetFilters(query)

res, err := internal.SearchObjects(prm)
common.ExitOnErr(cmd, "failed to search objects by split ID: %w", err)

members := res.IDList()

common.PrintVerbose("Found objects by split ID: %v", res.IDList())

return members
}

idMember, ok := splitInfo.LastPart()
if !ok {
common.ExitOnErr(cmd, "", errors.New("missing any data in received object split information"))
}

common.PrintVerbose("Traverse the object split chain in reverse...", idMember)

var res *internal.HeadObjectRes
chain := []oid.ID{idMember}
chainSet := map[oid.ID]struct{}{idMember: {}}

prmHead.SetRawFlag(false)
// split members are almost definitely singular, but don't get hung up on it

for {
common.PrintVerbose("Reading previous element of the split chain member %s...", idMember)

addrObj.SetObject(idMember)

res, err = internal.HeadObject(prmHead)
common.ExitOnErr(cmd, "failed to read split chain member's header: %w", err)

idMember, ok = res.Header().PreviousID()
if !ok {
common.PrintVerbose("Chain ended.")
break
}

if _, ok = chainSet[idMember]; ok {
common.ExitOnErr(cmd, "", fmt.Errorf("duplicated member in the split chain %s", idMember))
}

chain = append(chain, idMember)
chainSet[idMember] = struct{}{}
}

common.PrintVerbose("Looking for a linking object...")

var query object.SearchFilters
query.AddParentIDFilter(object.MatchStringEqual, obj)

var prmSearch internal.SearchObjectsPrm
prmSearch.SetClient(cli)
prmSearch.SetContainerID(cnr)
prmSearch.SetFilters(query)

resSearch, err := internal.SearchObjects(prmSearch)
common.ExitOnErr(cmd, "failed to find object children: %w", err)

list := resSearch.IDList()

for i := range list {
if _, ok = chainSet[list[i]]; !ok {
common.PrintVerbose("Found one more related object %s.", list[i])
chain = append(chain, list[i])
}
}

return chain
}

0 comments on commit 24dd213

Please sign in to comment.