Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 156 additions & 9 deletions pkg/storage/fs/posix/trashbin/trashbin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ package trashbin
import (
"context"
"fmt"
"io/fs"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -165,7 +167,7 @@ func (tb *Trashbin) MoveToTrash(ctx context.Context, n *node.Node, path string)
return err
}

// 1. "Forget" the node
// 1. "Forget" the node and its children
if err = tb.lu.IDCache.DeleteByPath(ctx, path); err != nil {
return err
}
Expand Down Expand Up @@ -327,7 +329,6 @@ func (tb *Trashbin) RestoreRecycleItem(ctx context.Context, spaceID string, key,
}
if id == "" {
return nil, errtypes.NotFound("trashbin: item not found")

}

// update parent id in case it was restored to a different location
Expand Down Expand Up @@ -370,35 +371,181 @@ func (tb *Trashbin) RestoreRecycleItem(ctx context.Context, spaceID string, key,

}

// PurgeRecycleItem purges the specified item, all its children and all their revisions
// PurgeRecycleItem purges the specified item, all its children and all their revisions.
func (tb *Trashbin) PurgeRecycleItem(ctx context.Context, spaceID, key, relativePath string) error {
_, span := tracer.Start(ctx, "PurgeRecycleItem")
defer span.End()

trashRoot := filepath.Join(tb.lu.InternalPath(spaceID, spaceID), ".Trash")
err := os.RemoveAll(filepath.Clean(filepath.Join(trashRoot, "files", key+".trashitem", relativePath)))
if err != nil {
trashPath := filepath.Clean(filepath.Join(trashRoot, "files", key+".trashitem", relativePath))

type item struct {
path string
isDir bool
}

itemChan := make(chan item, 256) // small buffer to smooth bursts
var dirs []string

// Start walking the directory tree in a separate goroutine
walkErrChan := make(chan error, 1)
go func() {
defer close(itemChan)
defer close(walkErrChan)

err := filepath.WalkDir(trashPath, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}

select {
case <-ctx.Done():
return ctx.Err()
default:
}

it := item{path: path, isDir: d.IsDir()}

// Directories are collected for later filesystem removal
if d.IsDir() {
dirs = append(dirs, path)
}

select {
case <-ctx.Done():
return ctx.Err()
case itemChan <- it:
return nil
}
})

if err != nil && !os.IsNotExist(err) {
walkErrChan <- err
return
}
walkErrChan <- nil
}()

// Start worker pool for metadata purge
wg := sync.WaitGroup{}
for i := 0; i < tb.o.MaxConcurrency; i++ {
wg.Add(1)
go func(ch <-chan item) {
defer wg.Done()
for {
select {
case <-ctx.Done():
tb.log.Info().Msg("context cancelled during purge")
return
case it, ok := <-ch:
if !ok {
return
}

_, id, _, _, err := tb.lu.MetadataBackend().IdentifyPath(ctx, it.path)
if err == nil && id != "" {
trashedNode := &trashNode{spaceID: spaceID, id: id, path: it.path}
if err := tb.lu.MetadataBackend().Purge(ctx, trashedNode); err != nil {
tb.log.Error().Err(err).Str("path", it.path).Str("id", id).Msg("Failed to purge metadata")
}
}

// Delete only files here (directories are deleted later)
if !it.isDir {
if err := os.Remove(it.path); err != nil && !os.IsNotExist(err) {
tb.log.Error().Err(err).Str("path", it.path).Msg("Failed to delete file")
}
}
}
}
}(itemChan)
}

// Wait for all workers and walker to finish
wg.Wait()
if err := <-walkErrChan; err != nil {
return err
}

// Delete directories in reverse order (leafs first)
for i := len(dirs) - 1; i >= 0; i-- {
if err := os.Remove(dirs[i]); err != nil && !os.IsNotExist(err) {
tb.log.Error().Err(err).Str("path", dirs[i]).Msg("Failed to delete directory")
}
}

// Delete trashinfo if purging the root item
cleanPath := filepath.Clean(relativePath)
if cleanPath == "." || cleanPath == "/" {
return os.Remove(filepath.Join(trashRoot, "info", key+".trashinfo"))
infoPath := filepath.Join(trashRoot, "info", key+".trashinfo")
if err := os.Remove(infoPath); err != nil && !os.IsNotExist(err) {
tb.log.Error().Err(err).Str("path", infoPath).Msg("Failed to delete trashinfo")
}
}

return nil
}

// EmptyRecycle empties the trash
// EmptyRecycle empties the trash for a given space.
func (tb *Trashbin) EmptyRecycle(ctx context.Context, spaceID string) error {
_, span := tracer.Start(ctx, "EmptyRecycle")
defer span.End()

trashRoot := filepath.Join(tb.lu.InternalPath(spaceID, spaceID), ".Trash")
err := os.RemoveAll(filepath.Clean(filepath.Join(trashRoot, "files")))
filesRoot := filepath.Join(trashRoot, "files")

entries, err := os.ReadDir(filesRoot)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
return os.RemoveAll(filepath.Clean(filepath.Join(trashRoot, "info")))

type job struct {
key string
}

jobCh := make(chan job, len(entries))

// Enqueue all trash items
for _, entry := range entries {
name := entry.Name()
if !strings.HasSuffix(name, ".trashitem") {
continue
}

key := strings.TrimSuffix(name, ".trashitem")
jobCh <- job{key: key}
}
close(jobCh)

// Start worker pool
wg := sync.WaitGroup{}
for i := 0; i < tb.o.MaxConcurrency; i++ {
wg.Add(1)
go func(ch <-chan job) {
defer wg.Done()
for {
select {
case <-ctx.Done():
tb.log.Info().Msg("context cancelled during EmptyRecycle")
return
case j, ok := <-ch:
if !ok {
return
}

if err := tb.PurgeRecycleItem(ctx, spaceID, j.key, "."); err != nil {
tb.log.Error().Err(err).Str("key", j.key).Msg("Failed to purge trash item")
}
}
}
}(jobCh)
}

wg.Wait()
return nil
}

func (tb *Trashbin) IsEmpty(ctx context.Context, spaceID string) bool {
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/pkg/decomposedfs/metadata/hybrid_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,9 @@ func (b HybridBackend) Purge(ctx context.Context, n MetadataNode) error {
}
}

// delete the metadata lockfile
_ = os.Remove(b.LockfilePath(n))

return b.metaCache.RemoveMetadata(b.cacheKey(n))
}

Expand Down Expand Up @@ -517,11 +520,8 @@ func (b HybridBackend) Lock(n MetadataNode) (UnlockFunc, error) {
}
}
return func() error {
err := mlock.Close()
if err != nil {
return err
}
return os.Remove(metaLockPath)
// Warning: do not remove the lockfile or we may lock the same file more than once, https://github.com/opencloud-eu/opencloud/issues/1793
return mlock.Close()
}, nil
}

Expand Down
17 changes: 12 additions & 5 deletions pkg/storage/pkg/decomposedfs/metadata/messagepack_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,16 @@ func (b MessagePackBackend) Purge(_ context.Context, n MetadataNode) error {
if err := b.metaCache.RemoveMetadata(b.cacheKey(n)); err != nil {
return err
}

internalPath := n.InternalPath()
// for trash files always use the path without the timestamp
parts := strings.SplitN(n.GetID(), ".T.", 2)
if len(parts) > 1 {
internalPath = strings.TrimSuffix(internalPath, ".T."+parts[1])
}

_ = os.Remove(internalPath + ".mlock")

return os.Remove(b.MetadataPath(n))
}

Expand Down Expand Up @@ -319,11 +329,8 @@ func (b MessagePackBackend) Lock(n MetadataNode) (UnlockFunc, error) {
return nil, err
}
return func() error {
err := mlock.Close()
if err != nil {
return err
}
return os.Remove(metaLockPath)
// Warning: do not remove the lockfile or we may lock the same file more than once, https://github.com/opencloud-eu/opencloud/issues/1793
return mlock.Close()
}, nil
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/storage/pkg/decomposedfs/metadata/xattrs_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,9 @@ func (b XattrsBackend) Purge(ctx context.Context, n MetadataNode) error {
}
}

// delete the metadata lockfile
_ = os.Remove(b.LockfilePath(n))

return b.metaCache.RemoveMetadata(b.cacheKey(n))
}

Expand Down Expand Up @@ -278,17 +281,14 @@ func (b XattrsBackend) Lock(n MetadataNode) (UnlockFunc, error) {
return nil, err
}
return func() error {
err := mlock.Close()
if err != nil {
return err
}
return os.Remove(metaLockPath)
// Warning: do not remove the lockfile or we may lock the same file more than once, https://github.com/opencloud-eu/opencloud/issues/1793
return mlock.Close()
}, nil
}

func cleanupLockfile(_ context.Context, f *lockedfile.File) {
_ = f.Close()
_ = os.Remove(f.Name())
// Warning: do not remove the lockfile or we may lock the same file more than once, https://github.com/opencloud-eu/opencloud/issues/1793
}

// AllWithLockedSource reads all extended attributes from the given reader.
Expand Down