From 5f74919292047ae66b82eca4e43888ff7d1e89bb Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Sun, 2 Apr 2023 16:22:56 +0800 Subject: [PATCH] redo(ticdc): simplify reader initialization (#8407) ref pingcap/tiflow#8056 --- cdc/redo/reader/file.go | 316 ++++++++++++--------------------- cdc/redo/reader/file_test.go | 254 +++++--------------------- cdc/redo/reader/reader.go | 103 +++++------ cdc/redo/reader/reader_test.go | 172 ++++++++---------- pkg/applier/redo.go | 11 +- 5 files changed, 280 insertions(+), 576 deletions(-) diff --git a/cdc/redo/reader/file.go b/cdc/redo/reader/file.go index 71ccfd63acf..71d723835ce 100644 --- a/cdc/redo/reader/file.go +++ b/cdc/redo/reader/file.go @@ -16,6 +16,7 @@ package reader import ( "bufio" + "bytes" "container/heap" "context" "encoding/binary" @@ -24,6 +25,7 @@ import ( "net/url" "os" "path/filepath" + "strings" "sync" "time" @@ -35,7 +37,6 @@ import ( "github.com/pingcap/tiflow/cdc/redo/writer/file" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/redo" - "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -47,7 +48,7 @@ const ( // defaultWorkerNum is the num of workers used to sort the log file to sorted file, // will load the file to memory first then write the sorted file to disk // the memory used is defaultWorkerNum * defaultMaxLogSize (64 * megabyte) total - defaultWorkerNum = 50 + defaultWorkerNum = 16 ) type fileReader interface { @@ -70,47 +71,38 @@ type readerConfig struct { type reader struct { cfg *readerConfig mu sync.Mutex - br *bufio.Reader + br io.Reader fileName string closer io.Closer // lastValidOff file offset following the last valid decoded record lastValidOff int64 } -func newReader(ctx context.Context, cfg *readerConfig) ([]fileReader, error) { +func newReaders(ctx context.Context, cfg *readerConfig) ([]fileReader, error) { if cfg == nil { return nil, cerror.WrapError(cerror.ErrRedoConfigInvalid, errors.New("readerConfig can not be nil")) } + if !cfg.useExternalStorage { + log.Panic("external storage is not enabled, please check your configuration") + } if cfg.workerNums == 0 { cfg.workerNums = defaultWorkerNum } start := time.Now() - if cfg.useExternalStorage { - extStorage, err := redo.InitExternalStorage(ctx, cfg.uri) - if err != nil { - return nil, err - } - - err = downLoadToLocal(ctx, cfg.dir, extStorage, cfg.fileType, cfg.workerNums) - if err != nil { - return nil, cerror.WrapError(cerror.ErrRedoDownloadFailed, err) - } - } - - rr, err := openSelectedFiles(ctx, cfg.dir, cfg.fileType, cfg.startTs, cfg.workerNums) + sortedFiles, err := downLoadAndSortFiles(ctx, cfg) if err != nil { return nil, err } readers := []fileReader{} - for i := range rr { + for i := range sortedFiles { readers = append(readers, &reader{ cfg: cfg, - br: bufio.NewReader(rr[i]), - fileName: rr[i].(*os.File).Name(), - closer: rr[i], + br: bufio.NewReader(sortedFiles[i]), + fileName: sortedFiles[i].(*os.File).Name(), + closer: sortedFiles[i], }) } @@ -120,134 +112,99 @@ func newReader(ctx context.Context, cfg *readerConfig) ([]fileReader, error) { return readers, nil } -func selectDownLoadFile( - ctx context.Context, extStorage storage.ExternalStorage, fixedType string, -) ([]string, error) { - files := []string{} - err := extStorage.WalkDir(ctx, &storage.WalkOption{}, - func(path string, size int64) error { - fileName := filepath.Base(path) - _, fileType, err := redo.ParseLogFileName(fileName) - if err != nil { - return err - } - - if fileType == fixedType { - files = append(files, path) - } - return nil - }) +func downLoadAndSortFiles(ctx context.Context, cfg *readerConfig) ([]io.ReadCloser, error) { + dir := cfg.dir + // create temp dir in local storage + err := os.MkdirAll(dir, redo.DefaultDirMode) if err != nil { - return nil, cerror.WrapError(cerror.ErrExternalStorageAPI, err) + return nil, cerror.WrapError(cerror.ErrRedoFileOp, err) } - return files, nil -} - -func downLoadToLocal( - ctx context.Context, dir string, - extStorage storage.ExternalStorage, - fixedType string, workerNum int, -) error { - files, err := selectDownLoadFile(ctx, extStorage, fixedType) + // get all files + extStorage, err := redo.InitExternalStorage(ctx, cfg.uri) if err != nil { - return err + return nil, err + } + files, err := selectDownLoadFile(ctx, extStorage, cfg.fileType, cfg.startTs) + if err != nil { + return nil, err } - limit := make(chan struct{}, workerNum) + limit := make(chan struct{}, cfg.workerNums) eg, eCtx := errgroup.WithContext(ctx) + sortedFileNames := make([]string, 0, len(files)) for _, file := range files { select { - case <-ctx.Done(): - return ctx.Err() + case <-eCtx.Done(): + return nil, eCtx.Err() case limit <- struct{}{}: } - f := file + + fileName := file + if strings.HasSuffix(fileName, redo.SortLogEXT) { + log.Panic("should not download sorted log file") + } + sortedFileNames = append(sortedFileNames, getSortedFileName(fileName)) eg.Go(func() error { defer func() { <-limit }() - data, err := extStorage.ReadFile(eCtx, f) - if err != nil { - return cerror.WrapError(cerror.ErrExternalStorageAPI, err) - } - - err = os.MkdirAll(dir, redo.DefaultDirMode) - if err != nil { - return cerror.WrapError(cerror.ErrRedoFileOp, err) - } - path := filepath.Join(dir, f) - err = os.WriteFile(path, data, redo.DefaultFileMode) - return cerror.WrapError(cerror.ErrRedoFileOp, err) + return sortAndWriteFile(ctx, extStorage, fileName, cfg) }) } - - return eg.Wait() -} - -func openSelectedFiles(ctx context.Context, dir, fixedType string, startTs uint64, workerNum int) ([]io.ReadCloser, error) { - files, err := os.ReadDir(dir) - if err != nil { - return nil, cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotatef(err, "can't read log file directory: %s", dir)) - } - - sortedFileList := map[string]bool{} - for _, file := range files { - if filepath.Ext(file.Name()) == redo.SortLogEXT { - sortedFileList[file.Name()] = false - } + if err := eg.Wait(); err != nil { + return nil, err } - logFiles := []io.ReadCloser{} - unSortedFile := []string{} - for _, f := range files { - name := f.Name() - ret, err := shouldOpen(startTs, name, fixedType) + // open all sorted files + ret := []io.ReadCloser{} + for _, sortedFileName := range sortedFileNames { + path := filepath.Join(dir, sortedFileName) + f, err := os.OpenFile(path, os.O_RDONLY, redo.DefaultFileMode) if err != nil { - log.Warn("check selected log file fail", - zap.String("logFile", name), - zap.Error(err)) - continue - } - - if ret { - sortedName := name - if filepath.Ext(sortedName) != redo.SortLogEXT { - sortedName += redo.SortLogEXT - } - if opened, ok := sortedFileList[sortedName]; ok { - if opened { - continue - } - } else { - unSortedFile = append(unSortedFile, name) + if os.IsNotExist(err) { continue } - path := filepath.Join(dir, sortedName) - file, err := openReadFile(path) - if err != nil { - return nil, cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotate(err, "can't open redo logfile")) - } - logFiles = append(logFiles, file) - sortedFileList[sortedName] = true + return nil, cerror.WrapError(cerror.ErrRedoFileOp, err) } + ret = append(ret, f) } + return ret, nil +} + +func getSortedFileName(name string) string { + return filepath.Base(name) + redo.SortLogEXT +} - sortFiles, err := createSortedFiles(ctx, dir, unSortedFile, workerNum) +func selectDownLoadFile( + ctx context.Context, extStorage storage.ExternalStorage, + fixedType string, startTs uint64, +) ([]string, error) { + files := []string{} + // add changefeed filter and endTs filter + err := extStorage.WalkDir(ctx, &storage.WalkOption{}, + func(path string, size int64) error { + fileName := filepath.Base(path) + ret, err := shouldOpen(startTs, fileName, fixedType) + if err != nil { + log.Warn("check selected log file fail", + zap.String("logFile", fileName), + zap.Error(err)) + return err + } + if ret { + files = append(files, path) + } + return nil + }) if err != nil { - return nil, err + return nil, cerror.WrapError(cerror.ErrExternalStorageAPI, err) } - logFiles = append(logFiles, sortFiles...) - return logFiles, nil -} -func openReadFile(name string) (*os.File, error) { - return os.OpenFile(name, os.O_RDONLY, redo.DefaultFileMode) + return files, nil } -func readFile(file *os.File) (logHeap, error) { +func readAllFromBuffer(buf []byte) (logHeap, error) { r := &reader{ - br: bufio.NewReader(file), - fileName: file.Name(), - closer: file, + br: bytes.NewReader(buf), } defer r.Close() @@ -266,19 +223,54 @@ func readFile(file *os.File) (logHeap, error) { return h, nil } -// writFile if not safely closed, the sorted file will end up with .sort.tmp as the file name suffix -func writFile(ctx context.Context, dir, name string, h logHeap) error { - cfg := &writer.LogWriterConfig{ - Dir: dir, +// sortAndWriteFile read file from external storage, then sort the file and write +// to local storage. +func sortAndWriteFile( + egCtx context.Context, + extStorage storage.ExternalStorage, + fileName string, cfg *readerConfig, +) error { + sortedName := getSortedFileName(fileName) + writerCfg := &writer.LogWriterConfig{ + Dir: cfg.dir, MaxLogSizeInBytes: math.MaxInt32, } - w, err := file.NewFileWriter(ctx, cfg, writer.WithLogFileName(func() string { return name })) + w, err := file.NewFileWriter(egCtx, writerCfg, writer.WithLogFileName(func() string { + return sortedName + })) if err != nil { return err } + fileContent, err := extStorage.ReadFile(egCtx, fileName) + if err != nil { + return cerror.WrapError(cerror.ErrExternalStorageAPI, err) + } + if len(fileContent) == 0 { + log.Warn("download file is empty", zap.String("file", fileName)) + return nil + } + + // sort data + h, err := readAllFromBuffer(fileContent) + if err != nil { + return err + } + heap.Init(&h) for h.Len() != 0 { item := heap.Pop(&h).(*logWithIdx).data + // This is min commitTs in log heap. + if item.GetCommitTs() > cfg.endTs { + // If the commitTs is greater than endTs, we should stop sorting + // and ignore the rest of the logs. + log.Info("ignore logs which commitTs is greater than resolvedTs", + zap.Any("filename", fileName), zap.Uint64("endTs", cfg.endTs)) + break + } + if item.GetCommitTs() <= cfg.startTs { + // If the commitTs is equal or less than startTs, we should skip this log. + continue + } data, err := item.MarshalMsg(nil) if err != nil { return cerror.WrapError(cerror.ErrMarshalFailed, err) @@ -292,80 +284,6 @@ func writFile(ctx context.Context, dir, name string, h logHeap) error { return w.Close() } -func createSortedFiles(ctx context.Context, dir string, names []string, workerNum int) ([]io.ReadCloser, error) { - logFiles := []io.ReadCloser{} - errCh := make(chan error) - retCh := make(chan io.ReadCloser) - - var errs error - i := 0 - for i != len(names) { - nn := []string{} - for i < len(names) { - if len(nn) < workerNum { - nn = append(nn, names[i]) - i++ - continue - } - break - } - - for i := 0; i < len(nn); i++ { - go createSortedFile(ctx, dir, nn[i], errCh, retCh) - } - for i := 0; i < len(nn); i++ { - select { - case err := <-errCh: - errs = multierr.Append(errs, err) - case ret := <-retCh: - if ret != nil { - logFiles = append(logFiles, ret) - } - } - } - if errs != nil { - return nil, errs - } - } - - return logFiles, nil -} - -func createSortedFile(ctx context.Context, dir string, name string, errCh chan error, retCh chan io.ReadCloser) { - path := filepath.Join(dir, name) - file, err := openReadFile(path) - if err != nil { - errCh <- cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotate(err, "can't open redo logfile")) - return - } - - h, err := readFile(file) - if err != nil { - errCh <- err - return - } - - heap.Init(&h) - if h.Len() == 0 { - retCh <- nil - return - } - - sortFileName := name + redo.SortLogEXT - err = writFile(ctx, dir, sortFileName, h) - if err != nil { - errCh <- err - return - } - - file, err = openReadFile(filepath.Join(dir, sortFileName)) - if err != nil { - errCh <- cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotate(err, "can't open redo logfile")) - return - } - retCh <- file -} - func shouldOpen(startTs uint64, name, fixedType string) (bool, error) { // .sort.tmp will return error commitTs, fileType, err := redo.ParseLogFileName(name) diff --git a/cdc/redo/reader/file_test.go b/cdc/redo/reader/file_test.go index 614117c888a..02a4d96cd03 100644 --- a/cdc/redo/reader/file_test.go +++ b/cdc/redo/reader/file_test.go @@ -14,238 +14,66 @@ package reader import ( - "bufio" + "context" "fmt" "io" - "os" - "path/filepath" + "net/url" "testing" - "time" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/redo/writer" - "github.com/pingcap/tiflow/cdc/redo/writer/file" + "github.com/pingcap/log" "github.com/pingcap/tiflow/pkg/redo" - "github.com/pingcap/tiflow/pkg/uuid" "github.com/stretchr/testify/require" - "golang.org/x/net/context" ) func TestReaderNewReader(t *testing.T) { - _, err := newReader(context.Background(), nil) + _, err := newReaders(context.Background(), nil) require.NotNil(t, err) dir := t.TempDir() - _, err = newReader(context.Background(), &readerConfig{dir: dir}) - require.Nil(t, err) -} - -func TestReaderRead(t *testing.T) { - dir := t.TempDir() - - cfg := &writer.LogWriterConfig{ - MaxLogSizeInBytes: 100000, - Dir: dir, - ChangeFeedID: model.DefaultChangeFeedID("test-cf"), - CaptureID: "cp", - LogType: redo.RedoRowLogFileType, - } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - uuidGen := uuid.NewConstGenerator("const-uuid") - w, err := file.NewFileWriter(ctx, cfg, - writer.WithUUIDGenerator(func() uuid.Generator { return uuidGen }), - ) - require.Nil(t, err) - log := &model.RedoLog{ - RedoRow: &model.RedoRowChangedEvent{Row: &model.RowChangedEvent{CommitTs: 1123}}, - } - data, err := log.MarshalMsg(nil) - require.Nil(t, err) - w.AdvanceTs(11) - _, err = w.Write(data) - require.Nil(t, err) - err = w.Close() - require.Nil(t, err) - require.True(t, !w.IsRunning()) - fileName := fmt.Sprintf(redo.RedoLogFileFormatV1, cfg.CaptureID, - cfg.ChangeFeedID.ID, - cfg.LogType, 11, uuidGen.NewString(), redo.LogEXT) - path := filepath.Join(cfg.Dir, fileName) - info, err := os.Stat(path) - require.Nil(t, err) - require.Equal(t, fileName, info.Name()) - - r, err := newReader(ctx, &readerConfig{ - dir: dir, - startTs: 1, - endTs: 12, - fileType: redo.RedoRowLogFileType, + require.Panics(t, func() { + _, err = newReaders(context.Background(), &readerConfig{dir: dir}) }) - require.Nil(t, err) - require.Equal(t, 1, len(r)) - defer r[0].Close() //nolint:errcheck - log, err = r[0].Read() - require.Nil(t, err) - require.EqualValues(t, 1123, log.RedoRow.Row.CommitTs) - time.Sleep(1001 * time.Millisecond) } -func TestReaderOpenSelectedFiles(t *testing.T) { +func TestFileReaderRead(t *testing.T) { dir := t.TempDir() - ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cfg := &writer.LogWriterConfig{ - MaxLogSizeInBytes: 100000, - Dir: dir, - } - uuidGen := uuid.NewGenerator() - fileName := fmt.Sprintf(redo.RedoLogFileFormatV2, "cp", - "default", "test-cf", redo.RedoDDLLogFileType, 11, - uuidGen.NewString(), redo.LogEXT+redo.TmpEXT) - w, err := file.NewFileWriter(ctx, cfg, writer.WithLogFileName(func() string { - return fileName - })) - require.Nil(t, err) - log := &model.RedoLog{ - RedoRow: &model.RedoRowChangedEvent{Row: &model.RowChangedEvent{CommitTs: 11}}, - } - data, err := log.MarshalMsg(nil) - require.Nil(t, err) - _, err = w.Write(data) - require.Nil(t, err) - log = &model.RedoLog{ - RedoRow: &model.RedoRowChangedEvent{Row: &model.RowChangedEvent{CommitTs: 10}}, - } - data, err = log.MarshalMsg(nil) - require.Nil(t, err) - _, err = w.Write(data) - require.Nil(t, err) - err = w.Close() - require.Nil(t, err) - path := filepath.Join(cfg.Dir, fileName) - f, err := os.Open(path) - require.Nil(t, err) - - // no data, wil not open - fileName = fmt.Sprintf(redo.RedoLogFileFormatV2, "cp", - "default", "test-cf11", redo.RedoDDLLogFileType, 10, - uuidGen.NewString(), redo.LogEXT) - path = filepath.Join(dir, fileName) - _, err = os.Create(path) - require.Nil(t, err) - // SortLogEXT, wil open - fileName = fmt.Sprintf(redo.RedoLogFileFormatV2, "cp", "default", - "test-cf111", redo.RedoDDLLogFileType, 10, uuidGen.NewString(), - redo.LogEXT) + redo.SortLogEXT - path = filepath.Join(dir, fileName) - f1, err := os.Create(path) - require.Nil(t, err) - - dir1 := t.TempDir() - fileName = fmt.Sprintf(redo.RedoLogFileFormatV2, "cp", "default", "test-cf", - redo.RedoDDLLogFileType, 11, uuidGen.NewString(), redo.LogEXT+"test") - path = filepath.Join(dir1, fileName) - _, err = os.Create(path) - require.Nil(t, err) - - type arg struct { - dir, fixedName string - startTs uint64 + uri, err := url.Parse(fmt.Sprintf("file://%s", dir)) + require.NoError(t, err) + cfg := &readerConfig{ + dir: t.TempDir(), + startTs: 10, + endTs: 12, + fileType: redo.RedoRowLogFileType, + uri: *uri, + useExternalStorage: true, } - - tests := []struct { - name string - args arg - wantRet []io.ReadCloser - wantErr string - }{ - { - name: "dir not exist", - args: arg{ - dir: dir + "test", - fixedName: redo.RedoDDLLogFileType, - startTs: 0, - }, - wantErr: ".*CDC:ErrRedoFileOp*.", - }, - { - name: "happy", - args: arg{ - dir: dir, - fixedName: redo.RedoDDLLogFileType, - startTs: 0, - }, - wantRet: []io.ReadCloser{f, f1}, - }, - { - name: "wrong ts", - args: arg{ - dir: dir, - fixedName: redo.RedoDDLLogFileType, - startTs: 12, - }, - wantRet: []io.ReadCloser{f}, - }, - { - name: "wrong fixedName", - args: arg{ - dir: dir, - fixedName: redo.RedoDDLLogFileType + "test", - startTs: 0, - }, - }, - { - name: "wrong ext", - args: arg{ - dir: dir1, - fixedName: redo.RedoDDLLogFileType, - startTs: 0, - }, - }, - } - - for _, tt := range tests { - ret, err := openSelectedFiles(ctx, tt.args.dir, tt.args.fixedName, tt.args.startTs, 100) - if tt.wantErr == "" { - require.Nil(t, err, tt.name) - require.Equal(t, len(tt.wantRet), len(ret), tt.name) - for _, closer := range tt.wantRet { - name := closer.(*os.File).Name() - if filepath.Ext(name) != redo.SortLogEXT { - name += redo.SortLogEXT - } - contains := false - for _, r := range ret { - if r.(*os.File).Name() == name { - contains = true - break - } - } - require.Equal(t, true, contains, tt.name) - } - var preTs uint64 = 0 - for _, r := range ret { - r := &reader{ - br: bufio.NewReader(r), - fileName: r.(*os.File).Name(), - closer: r, - } - for { - rl, err := r.Read() - if err == io.EOF { - break - } - require.Greater(t, rl.RedoRow.Row.CommitTs, preTs, tt.name) - preTs = rl.RedoRow.Row.CommitTs - } - } - } else { - require.Regexp(t, tt.wantErr, err.Error(), tt.name) - } + // log file with maxCommitTs<=startTs, fileter when download file + genLogFile(ctx, t, dir, redo.RedoRowLogFileType, 1, cfg.startTs) + // normal log file, include [10, 11, 12] and [11, 12, ... 20] + genLogFile(ctx, t, dir, redo.RedoRowLogFileType, cfg.startTs, cfg.endTs+2) + genLogFile(ctx, t, dir, redo.RedoRowLogFileType, cfg.endTs-1, 20) + // log file with minCommitTs>endTs, filtered when sort file + genLogFile(ctx, t, dir, redo.RedoRowLogFileType, 2000, 2023) + + log.Info("start to read redo log files") + readers, err := newReaders(ctx, cfg) + require.NoError(t, err) + require.Equal(t, 2, len(readers)) + defer readers[0].Close() //nolint:errcheck + + for _, r := range readers { + log, err := r.Read() + require.NoError(t, err) + require.EqualValues(t, 11, log.RedoRow.Row.CommitTs) + log, err = r.Read() + require.NoError(t, err) + require.EqualValues(t, 12, log.RedoRow.Row.CommitTs) + log, err = r.Read() + require.Nil(t, log) + require.ErrorIs(t, err, io.EOF) + require.NoError(t, r.Close()) } - time.Sleep(1001 * time.Millisecond) } diff --git a/cdc/redo/reader/reader.go b/cdc/redo/reader/reader.go index 6153aea25f2..ad0d740aaec 100644 --- a/cdc/redo/reader/reader.go +++ b/cdc/redo/reader/reader.go @@ -19,8 +19,7 @@ import ( "io" "net/url" "os" - "path/filepath" - "sync" + "strings" "time" "github.com/pingcap/log" @@ -86,12 +85,10 @@ type LogReaderConfig struct { // LogReader implement RedoLogReader interface type LogReader struct { - cfg *LogReaderConfig - meta *common.LogMeta - rowCh chan *model.RowChangedEvent - ddlCh chan *model.DDLEvent - metaLock sync.Mutex - sync.Mutex + cfg *LogReaderConfig + meta *common.LogMeta + rowCh chan *model.RowChangedEvent + ddlCh chan *model.DDLEvent } // newLogReader creates a LogReader instance. @@ -112,20 +109,12 @@ func newLogReader(ctx context.Context, cfg *LogReaderConfig) (*LogReader, error) rowCh: make(chan *model.RowChangedEvent, defaultReaderChanSize), ddlCh: make(chan *model.DDLEvent, defaultReaderChanSize), } - if cfg.UseExternalStorage { - extStorage, err := redo.InitExternalStorage(ctx, cfg.URI) - if err != nil { - return nil, err - } - // remove logs in local dir first, if have logs left belongs to previous changefeed with the same name may have error when apply logs - err = os.RemoveAll(cfg.Dir) - if err != nil { - return nil, errors.WrapError(errors.ErrRedoFileOp, err) - } - err = downLoadToLocal(ctx, cfg.Dir, extStorage, redo.RedoMetaFileType, cfg.WorkerNums) - if err != nil { - return nil, errors.WrapError(errors.ErrRedoDownloadFailed, err) - } + // remove logs in local dir first, if have logs left belongs to previous changefeed with the same name may have error when apply logs + if err := os.RemoveAll(cfg.Dir); err != nil { + return nil, errors.WrapError(errors.ErrRedoFileOp, err) + } + if err := logReader.initMeta(ctx); err != nil { + return nil, err } return logReader, nil } @@ -139,10 +128,7 @@ func (l *LogReader) Run(ctx context.Context) error { } if l.meta == nil { - _, _, err := l.ReadMeta(ctx) - if err != nil { - return err - } + return errors.Trace(errors.ErrRedoMetaFileNotFound.GenWithStackByArgs(l.cfg.Dir)) } eg, egCtx := errgroup.WithContext(ctx) @@ -184,7 +170,7 @@ func (l *LogReader) runDDLReader(egCtx context.Context) error { } func (l *LogReader) runReader(egCtx context.Context, cfg *readerConfig) error { - fileReaders, err := newReader(egCtx, cfg) + fileReaders, err := newReaders(egCtx, cfg) if err != nil { return errors.Trace(err) } @@ -288,50 +274,45 @@ func (l *LogReader) ReadNextDDL(ctx context.Context) (*model.DDLEvent, error) { } } -// ReadMeta implement ReadMeta interface -func (l *LogReader) ReadMeta(ctx context.Context) (checkpointTs, resolvedTs uint64, err error) { +func (l *LogReader) initMeta(ctx context.Context) error { select { case <-ctx.Done(): - return 0, 0, errors.Trace(ctx.Err()) + return errors.Trace(ctx.Err()) default: } - - l.metaLock.Lock() - defer l.metaLock.Unlock() - - if l.meta != nil { - return l.meta.CheckpointTs, l.meta.ResolvedTs, nil - } - - files, err := os.ReadDir(l.cfg.Dir) + extStorage, err := redo.InitExternalStorage(ctx, l.cfg.URI) if err != nil { - err = errors.Annotate(err, "can't read log file directory") - return 0, 0, errors.WrapError(errors.ErrRedoFileOp, err) + return err } - metas := make([]*common.LogMeta, 0, 64) - for _, file := range files { - if filepath.Ext(file.Name()) == redo.MetaEXT { - path := filepath.Join(l.cfg.Dir, file.Name()) - fileData, err := os.ReadFile(path) - if err != nil { - return 0, 0, errors.WrapError(errors.ErrRedoFileOp, err) - } + err = extStorage.WalkDir(ctx, nil, func(path string, size int64) error { + if !strings.HasSuffix(path, redo.MetaEXT) { + return nil + } - log.Debug("unmarshal redo meta", zap.Int("size", len(fileData))) - meta := &common.LogMeta{} - _, err = meta.UnmarshalMsg(fileData) + data, err := extStorage.ReadFile(ctx, path) + if err != nil && !util.IsNotExistInExtStorage(err) { + return err + } + if len(data) != 0 { + var meta common.LogMeta + _, err = meta.UnmarshalMsg(data) if err != nil { - return 0, 0, errors.WrapError(errors.ErrRedoFileOp, err) + return err } - metas = append(metas, meta) + metas = append(metas, &meta) } + return nil + }) + if err != nil { + return errors.WrapError(errors.ErrRedoMetaInitialize, + errors.Annotate(err, "read meta file fail")) } - if len(metas) == 0 { - return 0, 0, errors.ErrRedoMetaFileNotFound.GenWithStackByArgs(l.cfg.Dir) + return errors.ErrRedoMetaFileNotFound.GenWithStackByArgs(l.cfg.Dir) } + var checkpointTs, resolvedTs uint64 common.ParseMeta(metas, &checkpointTs, &resolvedTs) if resolvedTs < checkpointTs { log.Panic("in all meta files, resolvedTs is less than checkpointTs", @@ -339,7 +320,15 @@ func (l *LogReader) ReadMeta(ctx context.Context) (checkpointTs, resolvedTs uint zap.Uint64("checkpointTs", checkpointTs)) } l.meta = &common.LogMeta{CheckpointTs: checkpointTs, ResolvedTs: resolvedTs} - return + return nil +} + +// ReadMeta implement ReadMeta interface +func (l *LogReader) ReadMeta(ctx context.Context) (checkpointTs, resolvedTs uint64, err error) { + if l.meta == nil { + return 0, 0, errors.Trace(errors.ErrRedoMetaFileNotFound.GenWithStackByArgs(l.cfg.Dir)) + } + return l.meta.CheckpointTs, l.meta.ResolvedTs, nil } type logWithIdx struct { diff --git a/cdc/redo/reader/reader_test.go b/cdc/redo/reader/reader_test.go index 930a9abc4ef..4abb597e048 100644 --- a/cdc/redo/reader/reader_test.go +++ b/cdc/redo/reader/reader_test.go @@ -22,10 +22,7 @@ import ( "testing" "time" - "github.com/golang/mock/gomock" "github.com/google/uuid" - mockstorage "github.com/pingcap/tidb/br/pkg/mock/storage" - "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/cdc/redo/writer" @@ -35,48 +32,10 @@ import ( "golang.org/x/sync/errgroup" ) -func TestNewLogReader(t *testing.T) { - t.Parallel() - - _, err := newLogReader(context.Background(), nil) - require.NotNil(t, err) - - _, err = newLogReader(context.Background(), &LogReaderConfig{}) - require.Nil(t, err) - - dir := t.TempDir() - - s3URI, err := url.Parse("s3://logbucket/test-changefeed?endpoint=http://111/") - require.Nil(t, err) - - origin := redo.InitExternalStorage - defer func() { - redo.InitExternalStorage = origin - }() - controller := gomock.NewController(t) - mockStorage := mockstorage.NewMockExternalStorage(controller) - // no file to download - mockStorage.EXPECT().WalkDir(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) - redo.InitExternalStorage = func( - ctx context.Context, uri url.URL, - ) (storage.ExternalStorage, error) { - return mockStorage, nil - } - - // after init should rm the dir - _, err = newLogReader(context.Background(), &LogReaderConfig{ - UseExternalStorage: true, - Dir: dir, - URI: *s3URI, - }) - require.Nil(t, err) - _, err = os.Stat(dir) - require.True(t, os.IsNotExist(err)) -} - func genLogFile( ctx context.Context, t *testing.T, - dir string, logType string, maxCommitTs uint64, + dir string, logType string, + minCommitTs, maxCommitTs uint64, ) { cfg := &writer.LogWriterConfig{ MaxLogSizeInBytes: 100000, @@ -88,23 +47,27 @@ func genLogFile( return fileName })) require.Nil(t, err) - log := &model.RedoLog{ - RedoRow: &model.RedoRowChangedEvent{}, - RedoDDL: &model.RedoDDLEvent{}, - } if logType == redo.RedoRowLogFileType { - log.RedoRow.Row = &model.RowChangedEvent{CommitTs: maxCommitTs} + // generate unsorted logs + for ts := maxCommitTs; ts >= minCommitTs; ts-- { + event := &model.RowChangedEvent{CommitTs: ts} + log := event.ToRedoLog() + rawData, err := log.MarshalMsg(nil) + require.Nil(t, err) + _, err = w.Write(rawData) + require.Nil(t, err) + } } else if logType == redo.RedoDDLLogFileType { - log.RedoDDL.DDL = &model.DDLEvent{ + event := &model.DDLEvent{ CommitTs: maxCommitTs, TableInfo: &model.TableInfo{}, } - log.Type = model.RedoLogTypeDDL + log := event.ToRedoLog() + rawData, err := log.MarshalMsg(nil) + require.Nil(t, err) + _, err = w.Write(rawData) + require.Nil(t, err) } - rawData, err := log.MarshalMsg(nil) - require.Nil(t, err) - _, err = w.Write(rawData) - require.Nil(t, err) err = w.Close() require.Nil(t, err) } @@ -120,16 +83,22 @@ func TestReadLogs(t *testing.T) { ResolvedTs: 100, } for _, logType := range []string{redo.RedoRowLogFileType, redo.RedoDDLLogFileType} { - genLogFile(ctx, t, dir, logType, meta.CheckpointTs) - genLogFile(ctx, t, dir, logType, meta.CheckpointTs) - genLogFile(ctx, t, dir, logType, 12) - genLogFile(ctx, t, dir, logType, meta.ResolvedTs) + genLogFile(ctx, t, dir, logType, meta.CheckpointTs, meta.CheckpointTs) + genLogFile(ctx, t, dir, logType, meta.CheckpointTs, meta.CheckpointTs) + genLogFile(ctx, t, dir, logType, 12, 12) + genLogFile(ctx, t, dir, logType, meta.ResolvedTs, meta.ResolvedTs) } expectedRows := []uint64{12, meta.ResolvedTs} expectedDDLs := []uint64{meta.CheckpointTs, meta.CheckpointTs, 12, meta.ResolvedTs} + uri, err := url.Parse(fmt.Sprintf("file://%s", dir)) + require.NoError(t, err) r := &LogReader{ - cfg: &LogReaderConfig{Dir: dir}, + cfg: &LogReaderConfig{ + Dir: t.TempDir(), + URI: *uri, + UseExternalStorage: true, + }, meta: meta, rowCh: make(chan *model.RowChangedEvent, defaultReaderChanSize), ddlCh: make(chan *model.DDLEvent, defaultReaderChanSize), @@ -165,14 +134,20 @@ func TestLogReaderClose(t *testing.T) { ResolvedTs: 100, } for _, logType := range []string{redo.RedoRowLogFileType, redo.RedoDDLLogFileType} { - genLogFile(ctx, t, dir, logType, meta.CheckpointTs) - genLogFile(ctx, t, dir, logType, meta.CheckpointTs) - genLogFile(ctx, t, dir, logType, 12) - genLogFile(ctx, t, dir, logType, meta.ResolvedTs) + genLogFile(ctx, t, dir, logType, meta.CheckpointTs, meta.CheckpointTs) + genLogFile(ctx, t, dir, logType, meta.CheckpointTs, meta.CheckpointTs) + genLogFile(ctx, t, dir, logType, 12, 12) + genLogFile(ctx, t, dir, logType, meta.ResolvedTs, meta.CheckpointTs) } + uri, err := url.Parse(fmt.Sprintf("file://%s", dir)) + require.NoError(t, err) r := &LogReader{ - cfg: &LogReaderConfig{Dir: dir}, + cfg: &LogReaderConfig{ + Dir: t.TempDir(), + URI: *uri, + UseExternalStorage: true, + }, meta: meta, rowCh: make(chan *model.RowChangedEvent, 1), ddlCh: make(chan *model.DDLEvent, 1), @@ -182,44 +157,23 @@ func TestLogReaderClose(t *testing.T) { return r.Run(egCtx) }) + time.Sleep(2 * time.Second) cancel() require.ErrorIs(t, eg.Wait(), context.Canceled) } -func TestLogReaderReadMeta(t *testing.T) { - dir := t.TempDir() +func TestNewLogReaderAndReadMeta(t *testing.T) { + t.Parallel() - fileName := fmt.Sprintf("%s_%s_%d_%s%s", "cp", - "test-changefeed", - time.Now().Unix(), redo.RedoMetaFileType, redo.MetaEXT) - path := filepath.Join(dir, fileName) - f, err := os.Create(path) - require.Nil(t, err) - meta := &common.LogMeta{ + dir := t.TempDir() + genMetaFile(t, dir, &common.LogMeta{ CheckpointTs: 11, ResolvedTs: 22, - } - data, err := meta.MarshalMsg(nil) - require.Nil(t, err) - _, err = f.Write(data) - require.Nil(t, err) - - fileName = fmt.Sprintf("%s_%s_%d_%s%s", "cp1", - "test-changefeed", - time.Now().Unix(), redo.RedoMetaFileType, redo.MetaEXT) - path = filepath.Join(dir, fileName) - f, err = os.Create(path) - require.Nil(t, err) - meta = &common.LogMeta{ + }) + genMetaFile(t, dir, &common.LogMeta{ CheckpointTs: 12, ResolvedTs: 21, - } - data, err = meta.MarshalMsg(nil) - require.Nil(t, err) - _, err = f.Write(data) - require.Nil(t, err) - - dir1 := t.TempDir() + }) tests := []struct { name string @@ -235,13 +189,13 @@ func TestLogReaderReadMeta(t *testing.T) { }, { name: "no meta file", - dir: dir1, + dir: t.TempDir(), wantErr: ".*no redo meta file found in dir*.", }, { name: "wrong dir", dir: "xxx", - wantErr: ".*can't read log file directory*.", + wantErr: ".*fail to open storage for redo log*.", }, { name: "context cancel", @@ -252,24 +206,40 @@ func TestLogReaderReadMeta(t *testing.T) { }, } for _, tt := range tests { - l := &LogReader{ - cfg: &LogReaderConfig{ - Dir: tt.dir, - }, - } ctx := context.Background() if tt.name == "context cancel" { ctx1, cancel := context.WithCancel(context.Background()) cancel() ctx = ctx1 } - cts, rts, err := l.ReadMeta(ctx) + uriStr := fmt.Sprintf("file://%s", tt.dir) + uri, err := url.Parse(uriStr) + require.Nil(t, err) + l, err := newLogReader(ctx, &LogReaderConfig{ + Dir: t.TempDir(), + URI: *uri, + UseExternalStorage: redo.IsExternalStorage(uri.Scheme), + }) if tt.wantErr != "" { require.Regexp(t, tt.wantErr, err, tt.name) } else { + require.Nil(t, err, tt.name) + cts, rts, err := l.ReadMeta(ctx) require.Nil(t, err, tt.name) require.Equal(t, tt.wantCheckpointTs, cts, tt.name) require.Equal(t, tt.wantResolvedTs, rts, tt.name) } } } + +func genMetaFile(t *testing.T, dir string, meta *common.LogMeta) { + fileName := fmt.Sprintf(redo.RedoMetaFileFormat, "capture", "default", + "changefeed", redo.RedoMetaFileType, uuid.NewString(), redo.MetaEXT) + path := filepath.Join(dir, fileName) + f, err := os.Create(path) + require.Nil(t, err) + data, err := meta.MarshalMsg(nil) + require.Nil(t, err) + _, err = f.Write(data) + require.Nil(t, err) +} diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index 0a89c82ea34..1d32d35d0bc 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -97,15 +97,14 @@ func (rac *RedoApplierConfig) toLogReaderConfig() (string, *reader.LogReaderConf if err != nil { return "", nil, errors.WrapError(errors.ErrConsistentStorage, err) } + if redo.IsLocalStorage(uri.Scheme) { + uri.Scheme = "file" + } cfg := &reader.LogReaderConfig{ - Dir: uri.Path, + URI: *uri, + Dir: rac.Dir, UseExternalStorage: redo.IsExternalStorage(uri.Scheme), } - if cfg.UseExternalStorage { - cfg.URI = *uri - // If use external storage as backend, applier will download redo logs to local dir. - cfg.Dir = rac.Dir - } return uri.Scheme, cfg, nil }