Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 91 additions & 38 deletions pkg/storage/fs/posix/blobstore/blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,21 @@
package blobstore

import (
"bufio"
"context"
"fmt"
"io"
"os"
"path/filepath"
"time"

"github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs/node"
"github.com/pkg/errors"
"github.com/pkg/xattr"

"github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs/node"
)

const (
TMPDir = ".oc-tmp"
)

// Blobstore provides an interface to an filesystem based blobstore
Expand All @@ -41,61 +49,106 @@ func New(root string) (*Blobstore, error) {
}, nil
}

// Upload stores some data in the blobstore under the given key
func (bs *Blobstore) Upload(node *node.Node, source, copyTarget string) error {
path := node.InternalPath()
// Upload is responsible for transferring data from a source file (upload) to its final location;
// the file operation is done atomically using a temporary file followed by a rename
func (bs *Blobstore) Upload(n *node.Node, source, copyTarget string) error {
tempName := filepath.Join(n.SpaceRoot.InternalPath(), TMPDir, filepath.Base(source))

// preserve the mtime of the file
fi, _ := os.Stat(path)
// there is no guarantee that the space root TMPDir exists at this point, so we create the directory if needed
if err := os.MkdirAll(filepath.Dir(tempName), 0700); err != nil {
return err
}

file, err := os.Open(source)
sourceFile, err := os.Open(source)
if err != nil {
return errors.Wrap(err, "Decomposedfs: posix blobstore: Can not open source file to upload")
return fmt.Errorf("failed to open source file '%s': %v", source, err)
}
defer file.Close()
defer func() {
_ = sourceFile.Close()
}()

f, err := os.OpenFile(node.InternalPath(), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0700)
tempFile, err := os.OpenFile(tempName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0700)
if err != nil {
return errors.Wrapf(err, "could not open blob '%s' for writing", node.InternalPath())
return fmt.Errorf("unable to create temp file '%s': %v", tempName, err)
}
defer f.Close()

w := bufio.NewWriter(f)
_, err = w.ReadFrom(file)
if err != nil {
return errors.Wrapf(err, "could not write blob '%s'", node.InternalPath())
if _, err := tempFile.ReadFrom(sourceFile); err != nil {
return fmt.Errorf("failed to write data from source file '%s' to temp file '%s' - %v", source, tempName, err)
}
err = w.Flush()
if err != nil {
return err

if err := tempFile.Sync(); err != nil {
return fmt.Errorf("failed to sync temp file '%s' - %v", tempName, err)
}

if err := tempFile.Close(); err != nil {
return fmt.Errorf("failed to close temp file '%s' - %v", tempName, err)
}
err = os.Chtimes(path, fi.ModTime(), fi.ModTime())

nodeAttributes, err := n.Xattrs(context.Background())
if err != nil {
return err
return fmt.Errorf("failed to get xattrs for node '%s': %v", n.InternalPath(), err)
}

if copyTarget != "" {
// also "upload" the file to a local path, e.g. for keeping the "current" version of the file
err := os.MkdirAll(filepath.Dir(copyTarget), 0700)
if err != nil {
return err
var mtime *time.Time
for k, v := range nodeAttributes {
if err := xattr.Set(tempName, k, v); err != nil {
return fmt.Errorf("failed to set xattr '%s' on temp file '%s' - %v", k, tempName, err)
}

_, err = file.Seek(0, 0)
if err != nil {
return err
if k == "user.oc.mtime" {
tv, err := time.Parse(time.RFC3339Nano, string(v))
if err == nil {
mtime = &tv
}
}
}

copyFile, err := os.OpenFile(copyTarget, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil {
return errors.Wrapf(err, "could not open copy target '%s' for writing", copyTarget)
// the extended attributes should always contain a mtime, but in case they don't, we fetch it from the node
if mtime == nil {
switch nodeMtime, err := n.GetMTime(context.Background()); {
case err != nil:
return fmt.Errorf("failed to get mtime for node '%s' - %v", n.InternalPath(), err)
default:
mtime = &nodeMtime
}
defer copyFile.Close()

_, err = copyFile.ReadFrom(file)
if err != nil {
return errors.Wrapf(err, "could not write blob copy of '%s' to '%s'", node.InternalPath(), copyTarget)
}
}

// etags rely on the id and the mtime, so we need to ensure the mtime is set correctly
if err := os.Chtimes(tempName, *mtime, *mtime); err != nil {
return fmt.Errorf("failed to set mtime on temp file '%s' - %v", tempName, err)
}

// atomically move the file to its final location,
// on Windows systems (unsupported oc os) os.Rename is not atomic
if err := os.Rename(tempName, n.InternalPath()); err != nil {
return fmt.Errorf("failed to move temp file '%s' to node '%s' - %v", tempName, n.InternalPath(), err)
}

// upload successfully, now handle the copy target if set
if copyTarget == "" {
return nil
}

// also "upload" the file to a local path, e.g., for keeping the "current" version of the file
if err := os.MkdirAll(filepath.Dir(copyTarget), 0700); err != nil {
return err
}

if _, err := sourceFile.Seek(0, 0); err != nil {
return err
}

copyFile, err := os.OpenFile(copyTarget, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil {
return errors.Wrapf(err, "could not open copy target '%s' for writing", copyTarget)
}
defer func() {
_ = copyFile.Close()
}()

if _, err := copyFile.ReadFrom(sourceFile); err != nil {
return errors.Wrapf(err, "could not write blob copy of '%s' to '%s'", n.InternalPath(), copyTarget)
}

return nil
Expand Down
8 changes: 7 additions & 1 deletion pkg/storage/fs/posix/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ import (
"golang.org/x/sync/errgroup"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"

"github.com/opencloud-eu/reva/v2/pkg/appctx"
"github.com/opencloud-eu/reva/v2/pkg/errtypes"
"github.com/opencloud-eu/reva/v2/pkg/events"
"github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/blobstore"
"github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/lookup"
"github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/options"
"github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/trashbin"
Expand Down Expand Up @@ -724,6 +726,10 @@ func (t *Tree) isIndex(path string) bool {
return strings.HasPrefix(path, filepath.Join(t.options.Root, "indexes"))
}

func (t *Tree) isTemporary(path string) bool {
return path == blobstore.TMPDir
}

func (t *Tree) isRootPath(path string) bool {
return path == t.options.Root ||
path == t.personalSpacesRoot ||
Expand All @@ -736,7 +742,7 @@ func (t *Tree) isSpaceRoot(path string) bool {
}

func (t *Tree) isInternal(path string) bool {
return t.isIndex(path) || strings.Contains(path, lookup.MetadataDir)
return t.isIndex(path) || strings.Contains(path, lookup.MetadataDir) || t.isTemporary(path)
}

func isLockFile(path string) bool {
Expand Down
11 changes: 11 additions & 0 deletions pkg/storage/pkg/decomposedfs/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,17 @@ func (session *DecomposedFsSession) Finalize(ctx context.Context) (err error) {
revisionNode := node.New(session.SpaceID(), session.NodeID(), "", "", session.Size(), session.ID(),
provider.ResourceType_RESOURCE_TYPE_FILE, session.SpaceOwner(), session.store.lu)

switch spaceRoot, err := session.store.lu.NodeFromSpaceID(ctx, session.SpaceID()); {
case err != nil:
return fmt.Errorf("failed to get space root for space id %s: %v", session.SpaceID(), err)
case spaceRoot == nil:
return fmt.Errorf("space root for space id %s not found", session.SpaceID())
case spaceRoot.InternalPath() == "":
return fmt.Errorf("space root for space id %s has no valid internal path", session.SpaceID())
default:
revisionNode.SpaceRoot = spaceRoot
}

// lock the node before writing the blob
unlock, err := session.store.lu.MetadataBackend().Lock(revisionNode)
if err != nil {
Expand Down