Skip to content

Commit

Permalink
sink(ticdc): fix data loss (#9618) (#9628)
Browse files Browse the repository at this point in the history
close #9592
  • Loading branch information
ti-chi-bot authored Aug 23, 2023
1 parent a754418 commit 8196f8a
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 11 deletions.
14 changes: 5 additions & 9 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,6 @@ func (t *tableSinkWrapper) asyncStop() bool {
return false
}

func (t *tableSinkWrapper) stop() {
t.markAsClosing()
t.closeAndClearTableSink()
t.markAsClosed()
}

// Return true means the internal table sink has been initialized.
func (t *tableSinkWrapper) initTableSink() bool {
t.tableSink.Lock()
Expand Down Expand Up @@ -329,9 +323,11 @@ func (t *tableSinkWrapper) closeTableSink() {
}

func (t *tableSinkWrapper) asyncCloseAndClearTableSink() bool {
t.asyncCloseTableSink()
t.doTableSinkClear()
return true
closed := t.asyncCloseTableSink()
if closed {
t.doTableSinkClear()
}
return closed
}

func (t *tableSinkWrapper) closeAndClearTableSink() {
Expand Down
33 changes: 31 additions & 2 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,22 @@ func (m *mockSink) AckAllEvents() {
}
}

type mockDelayedTableSink struct {
tablesink.TableSink

closeCnt int
closeTarget int
}

func (t *mockDelayedTableSink) AsyncClose() bool {
t.closeCnt++
if t.closeCnt >= t.closeTarget {
t.TableSink.Close()
return true
}
return false
}

//nolint:unparam
func createTableSinkWrapper(
changefeedID model.ChangeFeedID, span tablepb.Span,
Expand All @@ -96,14 +112,27 @@ func createTableSinkWrapper(
return wrapper, sink
}

func TestTableSinkWrapperClose(t *testing.T) {
func TestTableSinkWrapperStop(t *testing.T) {
t.Parallel()

wrapper, _ := createTableSinkWrapper(
model.DefaultChangeFeedID("1"), spanz.TableIDToComparableSpan(1))
wrapper.tableSink.s = &mockDelayedTableSink{
TableSink: wrapper.tableSink.s,
closeCnt: 0,
closeTarget: 10,
}
require.Equal(t, tablepb.TableStatePreparing, wrapper.getState())
wrapper.stop()

closeCnt := 0
for {
closeCnt++
if wrapper.asyncStop() {
break
}
}
require.Equal(t, tablepb.TableStateStopped, wrapper.getState(), "table sink state should be stopped")
require.Equal(t, 10, closeCnt, "table sink should be closed 10 times")
}

func TestUpdateReceivedSorterResolvedTs(t *testing.T) {
Expand Down

0 comments on commit 8196f8a

Please sign in to comment.