From c0ac9fa5efef8d78a3d53d2ad5c0512ab34f347b Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Tue, 22 Aug 2023 10:23:03 +0800 Subject: [PATCH] sink(ticdc): cherry pick data loss fix to release 6.5 (#9621) close pingcap/tiflow#9592 --- .../sinkmanager/table_sink_wrapper.go | 14 +++----- .../sinkmanager/table_sink_wrapper_test.go | 33 +++++++++++++++++-- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index df7a727ff4f..90c3fc38d59 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -288,12 +288,6 @@ func (t *tableSinkWrapper) asyncStop() bool { return false } -func (t *tableSinkWrapper) stop() { - t.markAsClosing() - t.closeAndClearTableSink() - t.markAsClosed() -} - // Return true means the internal table sink has been initialized. func (t *tableSinkWrapper) initTableSink() bool { t.tableSink.Lock() @@ -328,9 +322,11 @@ func (t *tableSinkWrapper) closeTableSink() { } func (t *tableSinkWrapper) asyncCloseAndClearTableSink() bool { - t.asyncCloseTableSink() - t.doTableSinkClear() - return true + closed := t.asyncCloseTableSink() + if closed { + t.doTableSinkClear() + } + return closed } func (t *tableSinkWrapper) closeAndClearTableSink() { diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index 6e97ddee192..e3314ac1df1 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -65,6 +65,22 @@ func (m *mockSink) Dead() <-chan struct{} { return make(chan struct{}) } +type mockDelayedTableSink struct { + tablesink.TableSink + + closeCnt int + closeTarget int +} + +func (t *mockDelayedTableSink) AsyncClose() bool { + t.closeCnt++ + if t.closeCnt >= t.closeTarget { + t.TableSink.Close() + return true + } + return false +} + //nolint:unparam func createTableSinkWrapper(changefeedID model.ChangeFeedID, tableID model.TableID) (*tableSinkWrapper, *mockSink) { tableState := tablepb.TableStatePreparing @@ -84,13 +100,26 @@ func createTableSinkWrapper(changefeedID model.ChangeFeedID, tableID model.Table return wrapper, sink } -func TestTableSinkWrapperClose(t *testing.T) { +func TestTableSinkWrapperStop(t *testing.T) { t.Parallel() wrapper, _ := createTableSinkWrapper(model.DefaultChangeFeedID("1"), 1) + wrapper.tableSink.s = &mockDelayedTableSink{ + TableSink: wrapper.tableSink.s, + closeCnt: 0, + closeTarget: 10, + } require.Equal(t, tablepb.TableStatePreparing, wrapper.getState()) - wrapper.stop() + + closeCnt := 0 + for { + closeCnt++ + if wrapper.asyncStop() { + break + } + } require.Equal(t, tablepb.TableStateStopped, wrapper.getState(), "table sink state should be stopped") + require.Equal(t, 10, closeCnt, "table sink should be closed 10 times") } func TestUpdateReceivedSorterResolvedTs(t *testing.T) {