Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track BackupEngine and BackupStorageHandle errors together. #6350

Merged
merged 8 commits into from
Jun 22, 2020
Next Next commit
Use the same error recorder in backupengine as backuphandle
(Yes, I need to document, and also fix the interface implementation for
all the other implementations of BackupHandle)

Signed-off-by: Andrew Mason <amason@slack-corp.com>
  • Loading branch information
ajm188 committed Jun 21, 2020
commit 61965380acf1e94d650a91c90f0e2f5563eb9e8b
5 changes: 5 additions & 0 deletions go/vt/mysqlctl/backupstorage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io"

"golang.org/x/net/context"
"vitess.io/vitess/go/vt/concurrency"
)

var (
Expand Down Expand Up @@ -74,6 +75,10 @@ type BackupHandle interface {
// The context is valid for the duration of the reads, until the
// ReadCloser is closed.
ReadFile(ctx context.Context, filename string) (io.ReadCloser, error)

// concurrency.ErrorRecorder is embedded here to coordinate reporting and
// handling of errors among all the components involved in taking a backup.
concurrency.ErrorRecorder
}

// BackupStorage is the interface to the storage system
Expand Down
18 changes: 13 additions & 5 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, params BackupPar

// Backup with the provided concurrency.
sema := sync2.NewSemaphore(params.Concurrency, 0)
rec := concurrency.AllErrorRecorder{}
wg := sync.WaitGroup{}
for i := range fes {
wg.Add(1)
Expand All @@ -287,19 +286,28 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, params BackupPar
// encountered an error.
sema.Acquire()
defer sema.Release()
if rec.HasErrors() {
if bh.HasErrors() {
return
}

// Backup the individual file.
name := fmt.Sprintf("%v", i)
rec.RecordError(be.backupFile(ctx, params, bh, &fes[i], name))
bh.RecordError(be.backupFile(ctx, params, bh, &fes[i], name))
}(i)
}

wg.Wait()
if rec.HasErrors() {
return rec.Error()

// BackupHandles now support the ErrorRecorder interface for tracking errors
// across any goroutines that fan out to take the backup. This means that we
// can discard the local error recorder and put everything through the bh.
//
// This mitigates the scenario where bh.AddFile() encounters an error asynchronously,
// which ordinarily would be lost in the context of `be.backupFile`, i.e. if an
// error were encoutered
// [here](https://github.com/vitessio/vitess/blob/d26b6c7975b12a87364e471e2e2dfa4e253c2a5b/go/vt/mysqlctl/s3backupstorage/s3.go#L139-L142).
if bh.HasErrors() {
return bh.Error()
}

// open the MANIFEST
Expand Down
15 changes: 15 additions & 0 deletions go/vt/mysqlctl/s3backupstorage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,21 @@ func (bh *S3BackupHandle) Name() string {
return bh.name
}

// RecordError is part of the concurrency.ErrorRecorder interface.
func (bh *S3BackupHandle) RecordError(err error) {
bh.errors.RecordError(err)
}

// HasErrors is part of the concurrency.ErrorRecorder interface.
func (bh *S3BackupHandle) HasErrors() bool {
return bh.errors.HasErrors()
}

// Error is part of the concurrency.ErrorRecorder interface.
func (bh *S3BackupHandle) Error() error {
return bh.errors.Error()
}

// AddFile is part of the backupstorage.BackupHandle interface.
func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) {
if bh.readOnly {
Expand Down