From 5d08b9778cb1d1d433cd48c831570c7be61c7065 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 20 Dec 2023 17:30:23 +0800 Subject: [PATCH] s3: fix s3 concurrent uploader will overwrite error (#48163) (#49220) close pingcap/tidb#48164 --- br/pkg/storage/s3.go | 5 +++-- br/pkg/storage/s3_test.go | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index 614c04a795bb5..8cc6e1970aff9 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -954,9 +954,10 @@ func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOpti s3Writer.wg.Add(1) go func() { _, err := up.UploadWithContext(ctx, upParams) - err1 := rd.Close() + // like a channel we only let sender close the pipe in happy path if err != nil { - log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err), zap.Error(err1)) + log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err)) + _ = rd.CloseWithError(err) } s3Writer.err = err s3Writer.wg.Done() diff --git a/br/pkg/storage/s3_test.go b/br/pkg/storage/s3_test.go index a067f0f71d0c5..52468dbe32a9b 100644 --- a/br/pkg/storage/s3_test.go +++ b/br/pkg/storage/s3_test.go @@ -477,6 +477,24 @@ func TestWriteNoError(t *testing.T) { require.NoError(t, err) } +func TestMultiUploadErrorNotOverwritten(t *testing.T) { + s := createS3Suite(t) + ctx := aws.BackgroundContext() + + s.s3.EXPECT(). + CreateMultipartUploadWithContext(ctx, gomock.Any(), gomock.Any()). + Return(nil, errors.New("mock error")) + + w, err := s.storage.Create(ctx, "file", &WriterOption{Concurrency: 2}) + require.NoError(t, err) + // data should be larger than 5MB to trigger CreateMultipartUploadWithContext path + data := make([]byte, 5*1024*1024+6716) + n, err := w.Write(ctx, data) + require.NoError(t, err) + require.Equal(t, 5*1024*1024+6716, n) + require.ErrorContains(t, w.Close(ctx), "mock error") +} + // TestReadNoError ensures the ReadFile API issues a GetObject request and correctly // read the entire body. func TestReadNoError(t *testing.T) {