Skip to content

Commit 71f4dc6

Browse files
committed
feat(rst): cleanup incomplete downloads
1 parent 0fed2b6 commit 71f4dc6

File tree

2 files changed

+30
-12
lines changed

2 files changed

+30
-12
lines changed

common/rst/rst.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -327,8 +327,8 @@ func IsFileLocked(lockedInfo *flex.JobLockedInfo) bool {
327327
return lockedInfo != nil && lockedInfo.ReadWriteLocked
328328
}
329329

330-
// FileExists returns whether the file exists.
331-
func FileExists(lockedInfo *flex.JobLockedInfo) bool {
330+
// FileExistedBeforeJob returns whether the file exists when the job was created.
331+
func FileExistedBeforeJob(lockedInfo *flex.JobLockedInfo) bool {
332332
return lockedInfo.Exists
333333
}
334334

@@ -378,7 +378,7 @@ func BuildJobRequest(ctx context.Context, client Provider, mountPoint filesystem
378378
}
379379

380380
lockedInfo := cfg.LockedInfo
381-
if !IsFileLocked(lockedInfo) && FileExists(lockedInfo) {
381+
if !IsFileLocked(lockedInfo) && FileExistedBeforeJob(lockedInfo) {
382382
return getRequestWithFailedPrecondition("path lock has not been acquired")
383383

384384
}
@@ -397,7 +397,7 @@ func BuildJobRequest(ctx context.Context, client Provider, mountPoint filesystem
397397
}
398398

399399
if cfg.Download && cfg.RemotePath == "" {
400-
if !FileExists(lockedInfo) {
400+
if !FileExistedBeforeJob(lockedInfo) {
401401
return getRequestWithFailedPrecondition(fmt.Sprintf("unable to determine remote path: %s", fs.ErrNotExist.Error()))
402402
}
403403

@@ -464,7 +464,7 @@ func PrepareFileStateForWorkRequests(ctx context.Context, client Provider, mount
464464

465465
alreadySynced := IsFileAlreadySynced(lockedInfo)
466466
if cfg.StubLocal {
467-
if (cfg.Download && (cfg.Overwrite || !FileExists(lockedInfo))) || alreadySynced {
467+
if (cfg.Download && (cfg.Overwrite || !FileExistedBeforeJob(lockedInfo))) || alreadySynced {
468468
if mappings == nil {
469469
mappings, err = util.GetMappings(ctx)
470470
if err != nil && !errors.Is(err, util.ErrMappingRSTs) {
@@ -492,11 +492,11 @@ func PrepareFileStateForWorkRequests(ctx context.Context, client Provider, mount
492492
return ErrJobAlreadyOffloaded
493493
}
494494

495-
if cfg.Download && !cfg.Overwrite && FileExists(lockedInfo) {
495+
if cfg.Download && !cfg.Overwrite && FileExistedBeforeJob(lockedInfo) {
496496
err = fmt.Errorf("download would overwrite existing path but the overwrite flag was not set: %w", fs.ErrExist)
497497
return
498498
}
499-
} else if FileExists(lockedInfo) {
499+
} else if FileExistedBeforeJob(lockedInfo) {
500500
if alreadySynced {
501501
return GetErrJobAlreadyCompleteWithMtime(lockedInfo.Mtime.AsTime())
502502
}
@@ -552,8 +552,8 @@ func PrepareFileStateForWorkRequests(ctx context.Context, client Provider, mount
552552
err = fmt.Errorf("failed to collect information for new file: %w", err)
553553
return
554554
}
555+
lockedInfo.SetExists(false) // Setting to false since the file did not previously exist.
555556
lockedInfo.SetReadWriteLocked(info.ReadWriteLocked)
556-
lockedInfo.SetExists(info.Exists)
557557
lockedInfo.SetSize(info.Size)
558558
lockedInfo.SetMtime(info.Mtime)
559559
lockedInfo.SetMode(info.Mode)
@@ -592,7 +592,7 @@ func GetLockedInfo(ctx context.Context, mountPoint filesystem.Provider, mappings
592592
}
593593
return
594594
}
595-
lockedInfo.Exists = true
595+
lockedInfo.SetExists(true)
596596

597597
if rstIds == nil {
598598
rstIds = entryInfo.Entry.Remote.RSTIDs

common/rst/s3.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -424,9 +424,27 @@ func (r *S3Client) completeSyncWorkRequests_Download(ctx context.Context, job *b
424424
}
425425
job.SetStopMtime(timestamppb.New(mtime))
426426

427-
// Skip checking the file was modified if we were told to abort since the mtime may not have
428-
// been set correctly anyway given the error check is skipped above.
429-
if !abort {
427+
if abort {
428+
// If the file previously existed, it is converted into a stub file. Otherwise, the file is
429+
// deleted.
430+
lockedInfo := sync.LockedInfo
431+
if FileExistedBeforeJob(lockedInfo) {
432+
mappings, err := util.GetMappings(ctx)
433+
if err != nil {
434+
return fmt.Errorf("failed to leave stub file: %w", err)
435+
}
436+
437+
err = CreateOffloadedDataFile(ctx, r.mountPoint, mappings, request.Path, sync.RemotePath, request.RemoteStorageTarget, true)
438+
if err != nil {
439+
return fmt.Errorf("failed to leave stub file: %w", err)
440+
}
441+
job.GetStatus().SetState(beeremote.Job_OFFLOADED)
442+
} else {
443+
r.mountPoint.Remove(request.Path)
444+
}
445+
} else {
446+
// Skip checking the file was modified if we were told to abort since the mtime may not have
447+
// been set correctly anyway given the error check is skipped above.
430448
start := job.GetStartMtime().AsTime()
431449
stop := job.GetStopMtime().AsTime()
432450
if !start.Equal(stop) {

0 commit comments

Comments
 (0)