Skip to content

Commit

Permalink
patched cross manager single asset move
Browse files Browse the repository at this point in the history
  • Loading branch information
adrianwit@gmail.com authored and adrianwit@gmail.com committed Aug 27, 2019
1 parent 96d5c79 commit e5d8ad4
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 5 deletions.
10 changes: 9 additions & 1 deletion base/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,24 @@ import (
"io"
"os"
"path"
"strings"
"sync/atomic"
)

type uploader struct {
storage.Manager
}

func (u *uploader) Uploader(ctx context.Context, URL string, options ...storage.Option) (storage.Upload, io.Closer, error) {
index := int32(0)
handler := func(ctx context.Context, parent string, info os.FileInfo, reader io.Reader) error {
location := path.Join(parent, info.Name())

if atomic.AddInt32(&index, 1) == 1 {
if strings.HasSuffix(URL, location) {
URL = string(URL[:len(URL)-len(location)])
}
}
URL := url.Join(URL, location)
if info.Mode()&os.ModeSymlink > 0 {
if rawInfo, ok := info.(*file.Info); ok && rawInfo.Linkname != "" {
Expand All @@ -26,7 +35,6 @@ func (u *uploader) Uploader(ctx context.Context, URL string, options ...storage.
if info.IsDir() {
return u.Manager.Create(ctx, URL, info.Mode(), info.IsDir(), options...)
}

return u.Manager.Upload(ctx, URL, info.Mode(), reader, options...)
}
return handler, u.Manager, nil
Expand Down
6 changes: 5 additions & 1 deletion copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ func (s *service) updateDestURL(sourceURL, destURL string) string {
if destName == sourceName {
return destURL
}
if len(path.Ext(sourceName)) != len(path.Ext(destName)) {
destPath = path.Join(destPath, sourceName)
}
return url.Join(baseURL, destPath)
}

func (s *service) copy(ctx context.Context, sourceURL, destURL string, srcOptions *option.Source, destOptions *option.Dest,
walker storage.Walker, uploader storage.BatchUploader) error {
destURL = s.updateDestURL(sourceURL, destURL)
object, err := s.Object(ctx, sourceURL, *srcOptions...)
destOpts := *destOptions
if err == nil && object.IsDir() {
Expand All @@ -34,6 +36,7 @@ func (s *service) copy(ctx context.Context, sourceURL, destURL string, srcOption
if err != nil {
return err
}

upload, closer, err := uploader.Uploader(ctx, destURL, destOpts...)
if err != nil {
return err
Expand All @@ -53,6 +56,7 @@ func (s *service) Copy(ctx context.Context, sourceURL, destURL string, options .
destURL = url.Normalize(destURL, file.Scheme)
sourceOptions := option.NewSource()
destOptions := option.NewDest()
destURL = s.updateDestURL(sourceURL, destURL)
var walker storage.Walker
var uploader storage.BatchUploader
var matcher option.Matcher
Expand Down
1 change: 1 addition & 0 deletions move.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func (s *service) Move(ctx context.Context, sourceURL, destURL string, options .
destURL = url.Normalize(destURL, file.Scheme)
sourceScheme := url.Scheme(sourceURL, file.Scheme)
destScheme := url.Scheme(destURL, file.Scheme)
destURL = s.updateDestURL(sourceURL, destURL)

sourceOptions := option.NewSource()
destOptions := option.NewDest()
Expand Down
33 changes: 30 additions & 3 deletions move_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/viant/afs/asset"
"github.com/viant/afs/option"
"github.com/viant/afs/storage"
"github.com/viant/afs/url"
"os"
"path"
"testing"
Expand All @@ -25,6 +26,17 @@ func TestService_Move(t *testing.T) {
destOptions []storage.Option
sourceOptions []storage.Option
}{

{
description: "single file move",
source: path.Join(baseDir, "service_move_00/src"),
dest: path.Join(baseDir, "service_move_00/dst"),

assets: []*asset.Resource{
asset.NewFile("asset1.txt", []byte("test 1"), 0644),
},
},

{
description: "mover move",
source: path.Join(baseDir, "service_move_01/src"),
Expand All @@ -45,6 +57,7 @@ func TestService_Move(t *testing.T) {
asset.NewFile("asset2.txt", []byte("test 2"), 0644),
},
},

{
description: "crosss storage move: mem to file",
source: "mem://" + path.Join(baseDir, "service_move_03/src"),
Expand All @@ -54,6 +67,15 @@ func TestService_Move(t *testing.T) {
asset.NewFile("asset2.txt", []byte("test 2"), 0644),
},
},

{
description: "memory move",
source: "mem://" + path.Join(baseDir, "service_move_04/src"),
dest: "mem://" + path.Join(baseDir, "service_move_04/dst"),
assets: []*asset.Resource{
asset.NewFile("asset10.txt", []byte("test 1"), 0644),
},
},
}

for _, useCase := range useCases {
Expand All @@ -62,17 +84,24 @@ func TestService_Move(t *testing.T) {
if !assert.Nil(t, err, useCase.description) {
continue
}

err = asset.Create(srcManager, useCase.source, useCase.assets)
if !assert.Nil(t, err, useCase.description) {
continue
}

destManager, err := Manager(useCase.dest, useCase.destOptions...)
if !assert.Nil(t, err, useCase.description) {
continue
}
_ = asset.Cleanup(destManager, useCase.dest)

err = service.Move(ctx, useCase.source, useCase.dest, option.NewSource(useCase.sourceOptions...), option.NewDest(useCase.destOptions...))
source := useCase.source
if len(useCase.assets) == 1 {
source = url.Join(source, useCase.assets[0].Name)
}

err = service.Move(ctx, source, useCase.dest, option.NewSource(useCase.sourceOptions...), option.NewDest(useCase.destOptions...))
assert.Nil(t, err, useCase.description)

actuals, err := asset.Load(destManager, useCase.dest)
Expand All @@ -90,8 +119,6 @@ func TestService_Move(t *testing.T) {
}

_ = service.CloseAll()
//_ = asset.Cleanup(srcManager, useCase.source)
//_ = asset.Cleanup(destManager, useCase.dest)
}

}

0 comments on commit e5d8ad4

Please sign in to comment.