Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track BackupEngine and BackupStorageHandle errors together. #6350

Merged
merged 8 commits into from
Jun 22, 2020
19 changes: 17 additions & 2 deletions go/vt/mysqlctl/azblobbackupstorage/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,21 @@ func (bh *AZBlobBackupHandle) Name() string {
return bh.name
}

// RecordError is part of the concurrency.ErrorRecorder interface.
func (bh *AZBlobBackupHandle) RecordError(err error) {
bh.errors.RecordError(err)
}

// HasErrors is part of the concurrency.ErrorRecorder interface.
func (bh *AZBlobBackupHandle) HasErrors() bool {
return bh.errors.HasErrors()
}

// Error is part of the concurrency.ErrorRecorder interface.
func (bh *AZBlobBackupHandle) Error() error {
return bh.errors.Error()
}

// AddFile implements BackupHandle.
func (bh *AZBlobBackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) {
if bh.readOnly {
Expand Down Expand Up @@ -156,7 +171,7 @@ func (bh *AZBlobBackupHandle) AddFile(ctx context.Context, filename string, file
})
if err != nil {
reader.CloseWithError(err)
bh.errors.RecordError(err)
bh.RecordError(err)
}
}()

Expand All @@ -169,7 +184,7 @@ func (bh *AZBlobBackupHandle) EndBackup(ctx context.Context) error {
return fmt.Errorf("EndBackup cannot be called on read-only backup")
}
bh.waitGroup.Wait()
return bh.errors.Error()
return bh.Error()
}

// AbortBackup implements BackupHandle.
Expand Down
5 changes: 5 additions & 0 deletions go/vt/mysqlctl/backupstorage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io"

"golang.org/x/net/context"
"vitess.io/vitess/go/vt/concurrency"
)

var (
Expand Down Expand Up @@ -74,6 +75,10 @@ type BackupHandle interface {
// The context is valid for the duration of the reads, until the
// ReadCloser is closed.
ReadFile(ctx context.Context, filename string) (io.ReadCloser, error)

// concurrency.ErrorRecorder is embedded here to coordinate reporting and
// handling of errors among all the components involved in taking a backup.
concurrency.ErrorRecorder
}

// BackupStorage is the interface to the storage system
Expand Down
18 changes: 13 additions & 5 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, params BackupPar

// Backup with the provided concurrency.
sema := sync2.NewSemaphore(params.Concurrency, 0)
rec := concurrency.AllErrorRecorder{}
wg := sync.WaitGroup{}
for i := range fes {
wg.Add(1)
Expand All @@ -287,19 +286,28 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, params BackupPar
// encountered an error.
sema.Acquire()
defer sema.Release()
if rec.HasErrors() {
if bh.HasErrors() {
return
}

// Backup the individual file.
name := fmt.Sprintf("%v", i)
rec.RecordError(be.backupFile(ctx, params, bh, &fes[i], name))
bh.RecordError(be.backupFile(ctx, params, bh, &fes[i], name))
}(i)
}

wg.Wait()
if rec.HasErrors() {
return rec.Error()

// BackupHandle supports the ErrorRecorder interface for tracking errors
// across any goroutines that fan out to take the backup. This means that we
// don't need a local error recorder and can put everything through the bh.
//
// This handles the scenario where bh.AddFile() encounters an error asynchronously,
// which ordinarily would be lost in the context of `be.backupFile`, i.e. if an
// error were encountered
// [here](https://github.com/vitessio/vitess/blob/d26b6c7975b12a87364e471e2e2dfa4e253c2a5b/go/vt/mysqlctl/s3backupstorage/s3.go#L139-L142).
if bh.HasErrors() {
return bh.Error()
}

// open the MANIFEST
Expand Down
19 changes: 17 additions & 2 deletions go/vt/mysqlctl/cephbackupstorage/ceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,21 @@ type CephBackupHandle struct {
waitGroup sync.WaitGroup
}

// RecordError is part of the concurrency.ErrorRecorder interface.
func (bh *CephBackupHandle) RecordError(err error) {
bh.errors.RecordError(err)
}

// HasErrors is part of the concurrency.ErrorRecorder interface.
func (bh *CephBackupHandle) HasErrors() bool {
return bh.errors.HasErrors()
}

// Error is part of the concurrency.ErrorRecorder interface.
func (bh *CephBackupHandle) Error() error {
return bh.errors.Error()
}

// Directory implements BackupHandle.
func (bh *CephBackupHandle) Directory() string {
return bh.dir
Expand Down Expand Up @@ -94,7 +109,7 @@ func (bh *CephBackupHandle) AddFile(ctx context.Context, filename string, filesi
// Signal the writer that an error occurred, in case it's not done writing yet.
reader.CloseWithError(err)
// In case the error happened after the writer finished, we need to remember it.
bh.errors.RecordError(err)
bh.RecordError(err)
}
}()
// Give our caller the write end of the pipe.
Expand All @@ -108,7 +123,7 @@ func (bh *CephBackupHandle) EndBackup(ctx context.Context) error {
}
bh.waitGroup.Wait()
// Return the saved PutObject() errors, if any.
return bh.errors.Error()
return bh.Error()
}

// AbortBackup implements BackupHandle.
Expand Down
17 changes: 17 additions & 0 deletions go/vt/mysqlctl/filebackupstorage/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"golang.org/x/net/context"

"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
)

Expand All @@ -43,6 +44,22 @@ type FileBackupHandle struct {
dir string
name string
readOnly bool
errors concurrency.AllErrorRecorder
}

// RecordError is part of the concurrency.ErrorRecorder interface.
func (fbh *FileBackupHandle) RecordError(err error) {
fbh.errors.RecordError(err)
}

// HasErrors is part of the concurrency.ErrorRecorder interface.
func (fbh *FileBackupHandle) HasErrors() bool {
return fbh.errors.HasErrors()
}

// Error is part of the concurrency.ErrorRecorder interface.
func (fbh *FileBackupHandle) Error() error {
return fbh.errors.Error()
}

// Directory is part of the BackupHandle interface
Expand Down
17 changes: 17 additions & 0 deletions go/vt/mysqlctl/gcsbackupstorage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"google.golang.org/api/option"

"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
)

Expand All @@ -51,6 +52,22 @@ type GCSBackupHandle struct {
dir string
name string
readOnly bool
errors concurrency.AllErrorRecorder
}

// RecordError is part of the concurrency.ErrorRecorder interface.
func (bh *GCSBackupHandle) RecordError(err error) {
bh.errors.RecordError(err)
}

// HasErrors is part of the concurrency.ErrorRecorder interface.
func (bh *GCSBackupHandle) HasErrors() bool {
return bh.errors.HasErrors()
}

// Error is part of the concurrency.ErrorRecorder interface.
func (bh *GCSBackupHandle) Error() error {
return bh.errors.Error()
}

// Directory implements BackupHandle.
Expand Down
22 changes: 19 additions & 3 deletions go/vt/mysqlctl/s3backupstorage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"golang.org/x/net/context"

Expand Down Expand Up @@ -85,7 +86,7 @@ var logNameMap logNameToLogLevel

// S3BackupHandle implements the backupstorage.BackupHandle interface.
type S3BackupHandle struct {
client *s3.S3
client s3iface.S3API
bs *S3BackupStorage
dir string
name string
Expand All @@ -104,6 +105,21 @@ func (bh *S3BackupHandle) Name() string {
return bh.name
}

// RecordError is part of the concurrency.ErrorRecorder interface.
func (bh *S3BackupHandle) RecordError(err error) {
bh.errors.RecordError(err)
}

// HasErrors is part of the concurrency.ErrorRecorder interface.
func (bh *S3BackupHandle) HasErrors() bool {
return bh.errors.HasErrors()
}

// Error is part of the concurrency.ErrorRecorder interface.
func (bh *S3BackupHandle) Error() error {
return bh.errors.Error()
}

// AddFile is part of the backupstorage.BackupHandle interface.
func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) {
if bh.readOnly {
Expand Down Expand Up @@ -143,7 +159,7 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize
})
if err != nil {
reader.CloseWithError(err)
bh.errors.RecordError(err)
bh.RecordError(err)
}
}()

Expand All @@ -156,7 +172,7 @@ func (bh *S3BackupHandle) EndBackup(ctx context.Context) error {
return fmt.Errorf("EndBackup cannot be called on read-only backup")
}
bh.waitGroup.Wait()
return bh.errors.Error()
return bh.Error()
}

// AbortBackup is part of the backupstorage.BackupHandle interface.
Expand Down
43 changes: 43 additions & 0 deletions go/vt/mysqlctl/s3backupstorage/s3_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package s3backupstorage

import (
"errors"
"net/http"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type s3ErrorClient struct{ s3iface.S3API }

func (s3errclient *s3ErrorClient) PutObjectRequest(in *s3.PutObjectInput) (*request.Request, *s3.PutObjectOutput) {
req := request.Request{
HTTPRequest: &http.Request{}, // without this we segfault \_(ツ)_/¯ (see https://github.com/aws/aws-sdk-go/blob/v1.28.8/aws/request/request_context.go#L13)
Error: errors.New("some error"), // this forces req.Send() (which is called by the uploader) to always return non-nil error
}

return &req, &s3.PutObjectOutput{}
}

func TestAddFileError(t *testing.T) {
bh := &S3BackupHandle{client: &s3ErrorClient{}, readOnly: false}

wc, err := bh.AddFile(aws.BackgroundContext(), "somefile", 100000)
require.NoErrorf(t, err, "AddFile() expected no error, got %s", err)
assert.NotEqual(t, nil, wc, "AddFile() expected non-nil WriteCloser")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also an assert.NotNil, but I'll accept this.


n, err := wc.Write([]byte("here are some bytes"))
require.NoErrorf(t, err, "TestAddFile() could not write to uploader, got %d bytes written, err %s", n, err)

err = wc.Close()
require.NoErrorf(t, err, "TestAddFile() could not close writer, got %s", err)

bh.waitGroup.Wait() // wait for the goroutine to finish, at which point it should have recorded an error

require.Equal(t, bh.HasErrors(), true, "AddFile() expected bh to record async error but did not")
}