-
Notifications
You must be signed in to change notification settings - Fork 38
/
copy.go
129 lines (118 loc) · 3.66 KB
/
copy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package afs
import (
"context"
"github.com/pkg/errors"
"github.com/viant/afs/file"
"github.com/viant/afs/option"
"github.com/viant/afs/storage"
"github.com/viant/afs/url"
"io"
"io/ioutil"
"os"
"path"
"strings"
)
//updateDestURL updated dest file
func (s *service) updateDestURL(sourceURL, destURL string) string {
sourcePath := url.Path(sourceURL)
_, sourceName := path.Split(sourcePath)
baseURL, destPath := url.Base(destURL, file.Scheme)
_, destName := path.Split(destPath)
if destName == sourceName || path.Ext(destName) == path.Ext(sourceName) {
return destURL
}
sourceExt := path.Ext(sourceName)
if len(sourceExt) > 5 { //not real extension
sourceExt = ""
}
if sourceExt != "" && !strings.Contains(destName, sourceExt) {
destPath = path.Join(destPath, sourceName)
}
return url.Join(baseURL, destPath)
}
func (s *service) copy(ctx context.Context, sourceURL, destURL string, srcOptions *option.Source, destOptions *option.Dest,
walker storage.Walker, uploader storage.BatchUploader) (err error) {
source, err := s.Object(ctx, sourceURL, *srcOptions...)
if err != nil {
return errors.Wrapf(err, "source not found: %v", sourceURL)
}
var modifier option.Modifier
option.Assign(*destOptions, &modifier)
if modifier == nil {
if remaining, ok := option.Assign(*srcOptions, &modifier); ok {
*srcOptions = remaining
}
}
_, isInternalWalker := walker.(*service)
mappedName := ""
if source.IsDir() || !isInternalWalker {
err = s.Create(ctx, destURL, source.Mode()|os.ModeDir, source.IsDir(), *destOptions...)
} else {
destURL, mappedName = url.Split(destURL, file.Scheme)
}
if url.IsSchemeEquals(sourceURL, destURL) && modifier == nil && isInternalWalker {
sourceManager, err := s.manager(ctx, sourceURL, *srcOptions)
if err != nil {
return err
}
if copier, ok := sourceManager.(storage.Copier); ok {
if mappedName != "" {
destURL = url.Join(destURL, mappedName)
}
if !s.IsAuthChanged(ctx, sourceManager, sourceURL, *destOptions) {
return copier.Copy(ctx, sourceURL, destURL, *srcOptions...)
}
}
}
upload, closer, err := uploader.Uploader(ctx, destURL, *destOptions...)
if err != nil {
return err
}
defer func() {
closeErr := closer.Close()
if err == nil {
err = closeErr
}
}()
err = walker.Walk(ctx, sourceURL, func(ctx context.Context, baseURL string, parent string, info os.FileInfo, reader io.Reader) (toContinue bool, err error) {
if mappedName != "" {
info = file.NewInfo(mappedName, info.Size(), info.Mode(), info.ModTime(), info.IsDir())
}
if modifier != nil && reader != nil {
info, reader, err = modifier(parent, info, ioutil.NopCloser(reader))
if err != nil {
return false, err
}
}
err = upload(ctx, parent, info, reader)
return err == nil, err
}, *srcOptions...)
return err
}
func (s *service) Copy(ctx context.Context, sourceURL, destURL string, options ...storage.Option) (err error) {
sourceURL = url.Normalize(sourceURL, file.Scheme)
destURL = url.Normalize(destURL, file.Scheme)
sourceOptions := option.NewSource()
destOptions := option.NewDest()
var walker storage.Walker
var uploader storage.BatchUploader
match, modifier := option.GetWalkOptions(options)
option.Assign(options, &sourceOptions, &destOptions, &match, &walker, &uploader, &modifier)
if match != nil {
*sourceOptions = append(*sourceOptions, match)
}
if modifier != nil {
*sourceOptions = append(*sourceOptions, modifier)
}
if walker == nil {
walker = s
}
if uploader == nil {
uploader = s
}
_, isInteralWalker := walker.(*service)
if isInteralWalker {
destURL = s.updateDestURL(sourceURL, destURL)
}
return s.copy(ctx, sourceURL, destURL, sourceOptions, destOptions, walker, uploader)
}