Skip to content

Commit ce8f1df

Browse files
authored
Merge pull request #56 from MegaByte875/fix_download_bug
2 parents d6b6f97 + 0c562a5 commit ce8f1df

File tree

1 file changed

+22
-8
lines changed

1 file changed

+22
-8
lines changed

pkg/storage/gs.go

+22-8
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,7 @@ func (g *GS) ListDir(ctx context.Context, uri string) ([]string, error) {
183183
}
184184

185185
bucket := b.GetGs().Bucket
186-
path := b.GetGs().Path
187-
prefix := getPrefix(path)
188-
186+
prefix := getPrefix(b.GetGs().Path)
189187
names, err := g.readdirNames(bucket, prefix)
190188
if err != nil {
191189
return nil, err
@@ -197,10 +195,12 @@ func (g *GS) ListDir(ctx context.Context, uri string) ([]string, error) {
197195
func (g *GS) RemoveDir(ctx context.Context, uri string) error {
198196
pathInfo, err := g.gcsFS.Stat(uri)
199197
if errors.Is(err, gcsfs.ErrFileNotFound) {
198+
log.Errorf("URI %s not found", uri)
200199
// return early if file doesn't exist
201200
return nil
202201
}
203202
if err != nil {
203+
log.Errorf("stat URI %s failed: %v", uri, err)
204204
return err
205205
}
206206

@@ -272,12 +272,15 @@ func (g *GS) uploadToStorage(key, file string) error {
272272
//}
273273
//o = o.If(storage.Conditions{GenerationMatch: attrs.Generation})
274274
wc := o.NewWriter(context.Background())
275-
if _, err = io.Copy(wc, f); err != nil {
275+
written, err := io.Copy(wc, f)
276+
if err != nil {
276277
return fmt.Errorf("io.Copy: %w", err)
277278
}
278279
if err := wc.Close(); err != nil {
279280
return fmt.Errorf("Writer.Close: %w", err)
280281
}
282+
283+
log.Infof("Upload from %s to %s successfully, bytes=%d", file, key, written)
281284
return nil
282285
}
283286

@@ -302,7 +305,7 @@ func (g *GS) uploadPrefix(prefix, localDir string) error {
302305
}
303306
}
304307

305-
log.Debugf("Upload from %s to %s recursively.", localDir, g.backend.Uri())
308+
log.Infof("Upload from %s to %s recursively", localDir, g.backend.Uri())
306309
return nil
307310
}
308311

@@ -334,14 +337,16 @@ func (g *GS) downloadToFile(file, key string) error {
334337
}
335338
defer rc.Close()
336339

337-
if _, err := io.Copy(f, rc); err != nil {
340+
written, err := io.Copy(f, rc)
341+
if err != nil {
338342
return fmt.Errorf("io.Copy: %w", err)
339343
}
340344

341345
if err = f.Close(); err != nil {
342346
return fmt.Errorf("f.Close: %w", err)
343347
}
344348

349+
log.Infof("Download from %s to %s successfully, bytes=%d", key, file, written)
345350
return nil
346351
}
347352

@@ -352,14 +357,16 @@ func (g *GS) downloadPrefix(localPath, uri, baseUri string) error {
352357
// localPath: /tmp
353358
// uri: gs://nebula-2024/BACKUP_2024_01_20_01_20_56/data/2/3/...
354359
// baseUri: gs://nebula-2024/BACKUP_2024_01_20_01_20_56/
355-
// object key: BACKUP_2023_11_14_04_49_54/data/2/3/data/000009.sst
360+
// object key: BACKUP_2024_01_20_01_20_56/data/2/3/data/000009.sst
356361
func (g *GS) downloadToDir(localPath, uri, baseUri string) error {
357362
pathInfo, err := g.gcsFS.Stat(uri)
358363
if errors.Is(err, gcsfs.ErrFileNotFound) {
364+
log.Errorf("URI %s not found", uri)
359365
// return early if file doesn't exist
360366
return nil
361367
}
362368
if err != nil {
369+
log.Errorf("stat URI %s failed: %v", uri, err)
363370
return err
364371
}
365372

@@ -390,7 +397,7 @@ func (g *GS) downloadToDir(localPath, uri, baseUri string) error {
390397
return err
391398
}
392399
for _, name := range names {
393-
subUri := uri + "/" + name
400+
subUri := getSubUri(uri, name)
394401
err = g.downloadToDir(localPath, subUri, baseUri)
395402
if err != nil {
396403
return err
@@ -432,6 +439,13 @@ func (g *GS) readdirNames(bucket, prefix string) ([]string, error) {
432439
return names, nil
433440
}
434441

442+
func getSubUri(uri, name string) string {
443+
if strings.HasSuffix(uri, "/") {
444+
return uri + name
445+
}
446+
return uri + "/" + name
447+
}
448+
435449
func getObjectKey(uri string) (string, error) {
436450
u, err := url.Parse(uri)
437451
if err != nil {

0 commit comments

Comments
 (0)