Skip to content

Commit

Permalink
fix test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen committed Oct 9, 2024
1 parent d521209 commit dbd8d58
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 8 deletions.
4 changes: 1 addition & 3 deletions br/pkg/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ package storage
import (
"bufio"
"context"
stderrors "errors"
"io"
"os"
"path/filepath"
"strings"
"syscall"

"github.com/google/uuid"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -51,7 +49,7 @@ func (l *LocalStorage) DeleteFile(_ context.Context, name string) error {
err := os.Remove(path)
if err != nil &&
l.IgnoreEnoentForDelete &&
stderrors.Is(err, syscall.ENOENT) {
os.IsNotExist(err) {
return nil
}
return err
Expand Down
45 changes: 42 additions & 3 deletions br/pkg/stream/stream_metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
baseMigrationSN = 0
baseMigrationName = "BASE"
baseTmp = "BASE_TMP"
metaSuffix = ".meta"
migrationPrefix = "v1/migrations"
)

Expand Down Expand Up @@ -735,11 +736,27 @@ func (m MigrationExt) MergeAndMigrateTo(
}

migTo := &result.MigratedTo

// Put the generated BASE before we do any operation.
// Or a user who reads a subset of migrations may get a broken storage.
//
// An example here, if we put the new BASE *after* executing:
// | BASE => ∅
// | [1] => Del(foo.log)
// Assume we are running `MergeAndMigrateTo(1)`, then we deleted `foo.log`,
// but BR crashed and new BASE not written. A user want to read the BASE version,
// the user cannot found the `foo.log` here.
err = m.writeBase(ctx, newBase)
if err != nil {
result.MigratedTo.Warnings = append(result.MigratedTo.Warnings, errors.Annotatef(err, "failed to save the new base"))
result.Warnings = append(
result.MigratedTo.Warnings,
errors.Annotatef(err, "failed to save the merged new base, nothing will happen"),
)
// Put the new BASE here anyway. The caller may want this.
result.NewBase = newBase
return
}

for _, mig := range result.Source {
// Perhaps a phanom migration.
if len(mig.Path) > 0 {
Expand All @@ -753,6 +770,12 @@ func (m MigrationExt) MergeAndMigrateTo(
}
}
result.MigratedTo = m.MigrateTo(ctx, newBase, MTMaybeSkipTruncateLog(!config.alwaysRunTruncate && canSkipTruncate))

// Put the final BASE.
err = m.writeBase(ctx, result.MigratedTo.NewBase)
if err != nil {
result.Warnings = append(result.MigratedTo.Warnings, errors.Annotatef(err, "failed to save the new base"))
}
return
}

Expand Down Expand Up @@ -894,13 +917,20 @@ func (m MigrationExt) applyMetaEdit(ctx context.Context, medit *pb.MetaEdit) (er
}

func (m MigrationExt) applyMetaEditTo(ctx context.Context, medit *pb.MetaEdit, metadata *pb.Metadata) error {
if isEmptyEdition(medit) {
// Fast path: skip write back for empty meta edits.
return nil
}

metadata.Files = slices.DeleteFunc(metadata.Files, func(dfi *pb.DataFileInfo) bool {
// Here, `DeletePhysicalFiles` is usually tiny.
// Use a hashmap to filter out if this gets slow in the future.
return slices.Contains(medit.DeletePhysicalFiles, dfi.Path)
})
metadata.FileGroups = slices.DeleteFunc(metadata.FileGroups, func(dfg *pb.DataFileGroup) bool {
return slices.Contains(medit.DeletePhysicalFiles, dfg.Path)
del := slices.Contains(medit.DeletePhysicalFiles, dfg.Path)
fmt.Println(medit.Path, medit.DeletePhysicalFiles, dfg.Path, del)
return del
})
for _, group := range metadata.FileGroups {
idx := slices.IndexFunc(medit.DeleteLogicalFiles, func(dsof *pb.DeleteSpansOfFile) bool {
Expand Down Expand Up @@ -1095,13 +1125,15 @@ func (m MigrationExt) doTruncateLogs(
wg.Wait()
}

// hookedStorage is a wrapper over the external storage,
// which triggers the `BeforeDoWriteBack` hook when putting a metadata.
type hookedStorage struct {
storage.ExternalStorage
metaSet *StreamMetadataSet
}

func (h hookedStorage) WriteFile(ctx context.Context, name string, data []byte) error {
if h.metaSet.BeforeDoWriteBack != nil {
if strings.HasSuffix(name, metaSuffix) && h.metaSet.BeforeDoWriteBack != nil {
meta, err := h.metaSet.Helper.ParseToMetadataHard(data)
if err != nil {
// Note: will this be too strict? But for now it seems this check won't fail.
Expand All @@ -1117,6 +1149,13 @@ func (h hookedStorage) WriteFile(ctx context.Context, name string, data []byte)
return h.ExternalStorage.WriteFile(ctx, name, data)
}

func (h hookedStorage) DeleteFile(ctx context.Context, name string) error {
if strings.HasSuffix(name, metaSuffix) && h.metaSet.BeforeDoWriteBack != nil {
h.metaSet.BeforeDoWriteBack(name, new(pb.Metadata))
}
return h.ExternalStorage.DeleteFile(ctx, name)
}

func (ms *StreamMetadataSet) hook(s storage.ExternalStorage) hookedStorage {
hooked := hookedStorage{
ExternalStorage: s,
Expand Down
9 changes: 8 additions & 1 deletion br/pkg/stream/stream_metas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ func fakeDataFilesV2(s storage.ExternalStorage, base, item int) (result []*backu
Path: path,
MinTs: uint64(i),
MaxTs: uint64(i + 2),
// Make it looks not empty.
DataFilesInfo: []*backuppb.DataFileInfo{
{
RangeOffset: 0,
Length: 1,
},
},
Length: 1,
}
result = append(result, data)
}
Expand Down Expand Up @@ -2729,7 +2737,6 @@ func TestRetry(t *testing.T) {
mg := est.MergeAndMigrateTo(ctx, 2)
require.Len(t, mg.Warnings, 1)
require.Error(t, mg.Warnings[0], "this disk remembers nothing")
fmt.Printf("mg: %v\n", mg)
requireMigrationsEqual(t, mg.NewBase, mig(mDel(mN(1), lN(1), lN(2))))

mg = est.MergeAndMigrateTo(ctx, 2)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/stream/stream_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func FastUnmarshalMetaData(
eg, ectx := errgroup.WithContext(ctx)
opt := &storage.WalkOption{SubDir: GetStreamBackupMetaPrefix()}
err := s.WalkDir(ectx, opt, func(path string, size int64) error {
if !strings.HasSuffix(path, ".meta") {
if !strings.HasSuffix(path, metaSuffix) {
return nil
}
readPath := path
Expand Down

0 comments on commit dbd8d58

Please sign in to comment.