From 8196f8a6dfdbd15f89f5a211ae57bd5fca6f21d0 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 23 Aug 2023 10:53:04 +0800 Subject: [PATCH] sink(ticdc): fix data loss (#9618) (#9628) close pingcap/tiflow#9592 --- .../sinkmanager/table_sink_wrapper.go | 14 +++----- .../sinkmanager/table_sink_wrapper_test.go | 33 +++++++++++++++++-- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index f01e03c53f2..1f6f7dca3c7 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -289,12 +289,6 @@ func (t *tableSinkWrapper) asyncStop() bool { return false } -func (t *tableSinkWrapper) stop() { - t.markAsClosing() - t.closeAndClearTableSink() - t.markAsClosed() -} - // Return true means the internal table sink has been initialized. func (t *tableSinkWrapper) initTableSink() bool { t.tableSink.Lock() @@ -329,9 +323,11 @@ func (t *tableSinkWrapper) closeTableSink() { } func (t *tableSinkWrapper) asyncCloseAndClearTableSink() bool { - t.asyncCloseTableSink() - t.doTableSinkClear() - return true + closed := t.asyncCloseTableSink() + if closed { + t.doTableSinkClear() + } + return closed } func (t *tableSinkWrapper) closeAndClearTableSink() { diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index 206d02d32fb..c5dd31efdd9 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -74,6 +74,22 @@ func (m *mockSink) AckAllEvents() { } } +type mockDelayedTableSink struct { + tablesink.TableSink + + closeCnt int + closeTarget int +} + +func (t *mockDelayedTableSink) AsyncClose() bool { + t.closeCnt++ + if t.closeCnt >= t.closeTarget { + t.TableSink.Close() + return true + } + return false +} + //nolint:unparam func createTableSinkWrapper( changefeedID model.ChangeFeedID, span tablepb.Span, @@ -96,14 +112,27 @@ func createTableSinkWrapper( return wrapper, sink } -func TestTableSinkWrapperClose(t *testing.T) { +func TestTableSinkWrapperStop(t *testing.T) { t.Parallel() wrapper, _ := createTableSinkWrapper( model.DefaultChangeFeedID("1"), spanz.TableIDToComparableSpan(1)) + wrapper.tableSink.s = &mockDelayedTableSink{ + TableSink: wrapper.tableSink.s, + closeCnt: 0, + closeTarget: 10, + } require.Equal(t, tablepb.TableStatePreparing, wrapper.getState()) - wrapper.stop() + + closeCnt := 0 + for { + closeCnt++ + if wrapper.asyncStop() { + break + } + } require.Equal(t, tablepb.TableStateStopped, wrapper.getState(), "table sink state should be stopped") + require.Equal(t, 10, closeCnt, "table sink should be closed 10 times") } func TestUpdateReceivedSorterResolvedTs(t *testing.T) {