Skip to content

Commit

Permalink
redo(ticdc): limit worker number in redo reader (#8296) (#8308)
Browse files Browse the repository at this point in the history
close #8085
  • Loading branch information
ti-chi-bot authored Feb 21, 2023
1 parent d791b17 commit 7831ce2
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 7 deletions.
19 changes: 14 additions & 5 deletions cdc/redo/reader/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,21 @@ func newReader(ctx context.Context, cfg *readerConfig) ([]fileReader, error) {
if cfg == nil {
return nil, cerror.WrapError(cerror.ErrRedoConfigInvalid, errors.New("readerConfig can not be nil"))
}
if cfg.workerNums == 0 {
cfg.workerNums = defaultWorkerNum
}

if cfg.useExternalStorage {
extStorage, err := redo.InitExternalStorage(ctx, cfg.uri)
if err != nil {
return nil, err
}

err = downLoadToLocal(ctx, cfg.dir, extStorage, cfg.fileType)
err = downLoadToLocal(ctx, cfg.dir, extStorage, cfg.fileType, cfg.workerNums)
if err != nil {
return nil, cerror.WrapError(cerror.ErrRedoDownloadFailed, err)
}
}
if cfg.workerNums == 0 {
cfg.workerNums = defaultWorkerNum
}

rr, err := openSelectedFiles(ctx, cfg.dir, cfg.fileType, cfg.startTs, cfg.workerNums)
if err != nil {
Expand Down Expand Up @@ -141,17 +141,26 @@ func selectDownLoadFile(
}

func downLoadToLocal(
ctx context.Context, dir string, extStorage storage.ExternalStorage, fixedType string,
ctx context.Context, dir string,
extStorage storage.ExternalStorage,
fixedType string, workerNum int,
) error {
files, err := selectDownLoadFile(ctx, extStorage, fixedType)
if err != nil {
return err
}

limit := make(chan struct{}, workerNum)
eg, eCtx := errgroup.WithContext(ctx)
for _, file := range files {
select {
case <-ctx.Done():
return ctx.Err()
case limit <- struct{}{}:
}
f := file
eg.Go(func() error {
defer func() { <-limit }()
data, err := extStorage.ReadFile(eCtx, f)
if err != nil {
return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
Expand Down
5 changes: 4 additions & 1 deletion cdc/redo/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ func newLogReader(ctx context.Context, cfg *LogReaderConfig) (*LogReader, error)
if cfg == nil {
return nil, cerror.WrapError(cerror.ErrRedoConfigInvalid, errors.New("LogReaderConfig can not be nil"))
}
if cfg.WorkerNums == 0 {
cfg.WorkerNums = defaultWorkerNum
}

logReader := &LogReader{
cfg: cfg,
Expand All @@ -120,7 +123,7 @@ func newLogReader(ctx context.Context, cfg *LogReaderConfig) (*LogReader, error)
if err != nil {
return nil, cerror.WrapError(cerror.ErrRedoFileOp, err)
}
err = downLoadToLocal(ctx, cfg.Dir, extStorage, redo.RedoMetaFileType)
err = downLoadToLocal(ctx, cfg.Dir, extStorage, redo.RedoMetaFileType, cfg.WorkerNums)
if err != nil {
return nil, cerror.WrapError(cerror.ErrRedoDownloadFailed, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/redo/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func newMetaOptions() *metaOptions {
return &metaOptions{}
}

// run runs the `redo apply` command.
// run runs the `redo meta` command.
func (o *metaOptions) run(cmd *cobra.Command) error {
ctx := cmdcontext.GetDefaultContext()

Expand Down

0 comments on commit 7831ce2

Please sign in to comment.