diff --git a/cdc/redo/writer/memory/encoding_worker.go b/cdc/redo/writer/memory/encoding_worker.go index 883ebb13314..8930e7969e7 100644 --- a/cdc/redo/writer/memory/encoding_worker.go +++ b/cdc/redo/writer/memory/encoding_worker.go @@ -103,7 +103,7 @@ type encodingWorkerGroup struct { workerNum int nextWorker atomic.Uint64 - closed chan struct{} + closed chan error } func newEncodingWorkerGroup(cfg *writer.LogWriterConfig) *encodingWorkerGroup { @@ -120,19 +120,20 @@ func newEncodingWorkerGroup(cfg *writer.LogWriterConfig) *encodingWorkerGroup { inputChs: inputChs, outputCh: make(chan *polymorphicRedoEvent, redo.DefaultEncodingOutputChanSize), workerNum: workerNum, - closed: make(chan struct{}), + closed: make(chan error, 1), } } func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) { defer func() { - close(e.closed) + log.Warn("redo encoding workers closed", + zap.String("namespace", e.changefeed.Namespace), + zap.String("changefeed", e.changefeed.ID), + zap.Error(err)) if err != nil && errors.Cause(err) != context.Canceled { - log.Warn("redo fileWorkerGroup closed with error", - zap.String("namespace", e.changefeed.Namespace), - zap.String("changefeed", e.changefeed.ID), - zap.Error(err)) + e.closed <- err } + close(e.closed) }() eg, egCtx := errgroup.WithContext(ctx) for i := 0; i < e.workerNum; i++ { @@ -182,8 +183,8 @@ func (e *encodingWorkerGroup) input( select { case <-ctx.Done(): return ctx.Err() - case <-e.closed: - return errors.ErrRedoWriterStopped.GenWithStackByArgs("encoding worker is closed") + case err := <-e.closed: + return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") case e.inputChs[idx] <- event: return nil } @@ -195,8 +196,8 @@ func (e *encodingWorkerGroup) output( select { case <-ctx.Done(): return ctx.Err() - case <-e.closed: - return errors.ErrRedoWriterStopped.GenWithStackByArgs("encoding worker is closed") + case err := <-e.closed: + return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") case e.outputCh <- event: return nil } @@ -220,8 +221,8 @@ func (e *encodingWorkerGroup) FlushAll(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() - case <-e.closed: - return errors.ErrRedoWriterStopped.GenWithStackByArgs("encoding worker is closed") + case err := <-e.closed: + return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") case <-flushCh: } return nil @@ -244,8 +245,8 @@ func (e *encodingWorkerGroup) broadcastAndWaitEncoding(ctx context.Context) erro select { case <-ctx.Done(): return ctx.Err() - case <-e.closed: - return errors.ErrRedoWriterStopped.GenWithStackByArgs("encoding worker is closed") + case err := <-e.closed: + return errors.WrapError(errors.ErrRedoWriterStopped, err, "encoding worker is closed") case <-ch: } } diff --git a/cdc/redo/writer/memory/file_worker.go b/cdc/redo/writer/memory/file_worker.go index d566ddf66c5..2e8fe622de3 100644 --- a/cdc/redo/writer/memory/file_worker.go +++ b/cdc/redo/writer/memory/file_worker.go @@ -155,12 +155,10 @@ func (f *fileWorkerGroup) Run( ) (err error) { defer func() { f.close() - if err != nil && errors.Cause(err) != context.Canceled { - log.Warn("redo file workers closed with error", - zap.String("namespace", f.cfg.ChangeFeedID.Namespace), - zap.String("changefeed", f.cfg.ChangeFeedID.ID), - zap.Error(err)) - } + log.Warn("redo file workers closed", + zap.String("namespace", f.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", f.cfg.ChangeFeedID.ID), + zap.Error(err)) }() eg, egCtx := errgroup.WithContext(ctx) diff --git a/cdc/redo/writer/memory/mem_log_writer_test.go b/cdc/redo/writer/memory/mem_log_writer_test.go index 53943e52a29..d3b72541f78 100644 --- a/cdc/redo/writer/memory/mem_log_writer_test.go +++ b/cdc/redo/writer/memory/mem_log_writer_test.go @@ -95,11 +95,9 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) { require.NoError(t, err) require.ErrorIs(t, lw.Close(), context.Canceled) - require.Eventually(t, func() bool { - err = lw.WriteEvents(ctx, events...) - return err != nil - }, 2*time.Second, 10*time.Millisecond) - require.ErrorContains(t, err, "redo log writer stopped") + + err = lw.WriteEvents(ctx, events...) + require.NoError(t, err) err = lw.FlushLog(ctx) - require.ErrorContains(t, err, "redo log writer stopped") + require.NoError(t, err) }