Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions pkg/storage/fs/posix/lookup/store_idcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,17 @@ func NewStoreIDCache(o *options.Options) *StoreIDCache {

// Delete removes an entry from the cache
func (c *StoreIDCache) Delete(_ context.Context, spaceID, nodeID string) error {
var rerr error
v, err := c.cache.Read(cacheKey(spaceID, nodeID))
if err == nil {
err := c.cache.Delete(reverseCacheKey(string(v[0].Value)))
if err != nil {
return err
}
rerr = c.cache.Delete(reverseCacheKey(string(v[0].Value)))
}

return c.cache.Delete(cacheKey(spaceID, nodeID))
err = c.cache.Delete(cacheKey(spaceID, nodeID))
if err != nil {
return err
}
return rerr
}

// DeleteByPath removes an entry from the cache
Expand Down
17 changes: 8 additions & 9 deletions pkg/storage/fs/posix/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,19 @@ type Options struct {

// New returns a new Options instance for the given configuration
func New(m map[string]interface{}) (*Options, error) {
o := &Options{}
if err := mapstructure.Decode(m, o); err != nil {
err = errors.Wrap(err, "error decoding conf")
return nil, err
}

// default to hybrid metadatabackend for posixfs
if _, ok := m["metadata_backend"]; !ok {
m["metadata_backend"] = "hybrid"
}

// debounced scan delay
if o.ScanDebounceDelay == 0 {
o.ScanDebounceDelay = 10 * time.Millisecond
if _, ok := m["scan_debounce_delay"]; !ok {
m["scan_debounce_delay"] = 10 * time.Millisecond
}

o := &Options{}
if err := mapstructure.Decode(m, o); err != nil {
err = errors.Wrap(err, "error decoding conf")
return nil, err
}

do, err := decomposedoptions.New(m)
Expand Down
99 changes: 54 additions & 45 deletions pkg/storage/fs/posix/tree/assimilation.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ func (t *Tree) HandleFileDelete(path string) error {
return err
}
n := node.NewBaseNode(spaceID, id, t.lookup)
if n.InternalPath() != path {
return fmt.Errorf("internal path does not match path")
}

// purge metadata
if err := t.lookup.IDCache.DeleteByPath(context.Background(), path); err != nil {
Expand Down Expand Up @@ -615,54 +618,56 @@ assimilate:

n.SpaceRoot = &node.Node{BaseNode: node.BaseNode{SpaceID: spaceID, ID: spaceID}}

go func() {
// Copy the previous current version to a revision
currentNode := node.NewBaseNode(n.SpaceID, n.ID+node.CurrentIDDelimiter, t.lookup)
currentPath := currentNode.InternalPath()
stat, err := os.Stat(currentPath)
if err != nil {
t.log.Error().Err(err).Str("path", path).Str("currentPath", currentPath).Msg("could not stat current path")
return
}
revisionPath := t.lookup.VersionPath(n.SpaceID, n.ID, stat.ModTime().UTC().Format(time.RFC3339Nano))
if t.options.EnableFSRevisions {
go func() {
// Copy the previous current version to a revision
currentNode := node.NewBaseNode(n.SpaceID, n.ID+node.CurrentIDDelimiter, t.lookup)
currentPath := currentNode.InternalPath()
stat, err := os.Stat(currentPath)
if err != nil {
t.log.Error().Err(err).Str("path", path).Str("currentPath", currentPath).Msg("could not stat current path")
return
}
revisionPath := t.lookup.VersionPath(n.SpaceID, n.ID, stat.ModTime().UTC().Format(time.RFC3339Nano))

err = os.Rename(currentPath, revisionPath)
if err != nil {
t.log.Error().Err(err).Str("path", path).Str("revisionPath", revisionPath).Msg("could not create revision")
return
}
err = os.Rename(currentPath, revisionPath)
if err != nil {
t.log.Error().Err(err).Str("path", path).Str("revisionPath", revisionPath).Msg("could not create revision")
return
}

// Copy the new version to the current version
w, err := os.OpenFile(currentPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil {
t.log.Error().Err(err).Str("path", path).Str("currentPath", currentPath).Msg("could not open current path for writing")
return
}
defer w.Close()
r, err := os.OpenFile(n.InternalPath(), os.O_RDONLY, 0600)
if err != nil {
t.log.Error().Err(err).Str("path", path).Msg("could not open file for reading")
return
}
defer r.Close()
// Copy the new version to the current version
w, err := os.OpenFile(currentPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil {
t.log.Error().Err(err).Str("path", path).Str("currentPath", currentPath).Msg("could not open current path for writing")
return
}
defer w.Close()
r, err := os.OpenFile(n.InternalPath(), os.O_RDONLY, 0600)
if err != nil {
t.log.Error().Err(err).Str("path", path).Msg("could not open file for reading")
return
}
defer r.Close()

_, err = io.Copy(w, r)
if err != nil {
t.log.Error().Err(err).Str("currentPath", currentPath).Str("path", path).Msg("could not copy new version to current version")
return
}
_, err = io.Copy(w, r)
if err != nil {
t.log.Error().Err(err).Str("currentPath", currentPath).Str("path", path).Msg("could not copy new version to current version")
return
}

err = t.lookup.CopyMetadata(context.Background(), n, currentNode, func(attributeName string, value []byte) (newValue []byte, copy bool) {
return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
attributeName == prefixes.TypeAttr ||
attributeName == prefixes.BlobIDAttr ||
attributeName == prefixes.BlobsizeAttr
}, false)
if err != nil {
t.log.Error().Err(err).Str("currentPath", currentPath).Str("path", path).Msg("failed to copy xattrs to 'current' file")
return
}
}()
err = t.lookup.CopyMetadata(context.Background(), n, currentNode, func(attributeName string, value []byte) (newValue []byte, copy bool) {
return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
attributeName == prefixes.TypeAttr ||
attributeName == prefixes.BlobIDAttr ||
attributeName == prefixes.BlobsizeAttr
}, false)
if err != nil {
t.log.Error().Err(err).Str("currentPath", currentPath).Str("path", path).Msg("failed to copy xattrs to 'current' file")
return
}
}()
}

err = t.Propagate(context.Background(), n, 0)
if err != nil {
Expand Down Expand Up @@ -791,7 +796,11 @@ func (t *Tree) WarmupIDCache(root string, assimilate, onlyDirty bool) error {
t.log.Error().Err(err).Str("path", path).Msg("could not assimilate item")
}
}
return t.setDirty(path, false)

if info.IsDir() {
return t.setDirty(path, false)
}
return nil
})

for dir, size := range sizes {
Expand Down
30 changes: 14 additions & 16 deletions pkg/storage/fs/posix/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,11 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node)
return errors.Wrap(err, "Decomposedfs: Move: error deleting target node "+newNode.ID)
}
}

// we are moving the node to a new parent, any target has been removed
// bring old node to the new parent
oldParent := oldNode.ParentPath()
newParent := newNode.ParentPath()
if newNode.ID == "" {
newNode.ID = oldNode.ID
}

// update target parentid and name
attribs := node.Attributes{}
Expand All @@ -311,29 +313,25 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node)
return errors.Wrap(err, "Decomposedfs: could not update old node attributes")
}

// rename node
err = os.Rename(
filepath.Join(oldNode.ParentPath(), oldNode.Name),
filepath.Join(newNode.ParentPath(), newNode.Name),
)
if err != nil {
return errors.Wrap(err, "Decomposedfs: could not move child")
}

// update the id cache
if newNode.ID == "" {
newNode.ID = oldNode.ID
}
// invalidate old tree
err = t.lookup.IDCache.DeleteByPath(ctx, filepath.Join(oldNode.ParentPath(), oldNode.Name))
if err != nil {
return err
}

if err := t.lookup.CacheID(ctx, newNode.SpaceID, newNode.ID, filepath.Join(newNode.ParentPath(), newNode.Name)); err != nil {
t.log.Error().Err(err).Str("spaceID", newNode.SpaceID).Str("id", newNode.ID).Str("path", filepath.Join(newNode.ParentPath(), newNode.Name)).Msg("could not cache id")
}

// rename node
err = os.Rename(
filepath.Join(oldParent, oldNode.Name),
filepath.Join(newParent, newNode.Name),
)
if err != nil {
return errors.Wrap(err, "Decomposedfs: could not move child")
}

// rename the lock (if it exists)
if _, err := os.Stat(lockFilePath); err == nil {
err = os.Rename(lockFilePath, newNode.LockFilePath())
Expand Down