Skip to content

Commit

Permalink
sink(ticdc): cherry pick data loss fix to release 6.5 (#9621)
Browse files Browse the repository at this point in the history
close #9592
  • Loading branch information
CharlesCheung96 authored Aug 22, 2023
1 parent da4a195 commit c0ac9fa
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 11 deletions.
14 changes: 5 additions & 9 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,6 @@ func (t *tableSinkWrapper) asyncStop() bool {
return false
}

func (t *tableSinkWrapper) stop() {
t.markAsClosing()
t.closeAndClearTableSink()
t.markAsClosed()
}

// Return true means the internal table sink has been initialized.
func (t *tableSinkWrapper) initTableSink() bool {
t.tableSink.Lock()
Expand Down Expand Up @@ -328,9 +322,11 @@ func (t *tableSinkWrapper) closeTableSink() {
}

func (t *tableSinkWrapper) asyncCloseAndClearTableSink() bool {
t.asyncCloseTableSink()
t.doTableSinkClear()
return true
closed := t.asyncCloseTableSink()
if closed {
t.doTableSinkClear()
}
return closed
}

func (t *tableSinkWrapper) closeAndClearTableSink() {
Expand Down
33 changes: 31 additions & 2 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,22 @@ func (m *mockSink) Dead() <-chan struct{} {
return make(chan struct{})
}

type mockDelayedTableSink struct {
tablesink.TableSink

closeCnt int
closeTarget int
}

func (t *mockDelayedTableSink) AsyncClose() bool {
t.closeCnt++
if t.closeCnt >= t.closeTarget {
t.TableSink.Close()
return true
}
return false
}

//nolint:unparam
func createTableSinkWrapper(changefeedID model.ChangeFeedID, tableID model.TableID) (*tableSinkWrapper, *mockSink) {
tableState := tablepb.TableStatePreparing
Expand All @@ -84,13 +100,26 @@ func createTableSinkWrapper(changefeedID model.ChangeFeedID, tableID model.Table
return wrapper, sink
}

func TestTableSinkWrapperClose(t *testing.T) {
func TestTableSinkWrapperStop(t *testing.T) {
t.Parallel()

wrapper, _ := createTableSinkWrapper(model.DefaultChangeFeedID("1"), 1)
wrapper.tableSink.s = &mockDelayedTableSink{
TableSink: wrapper.tableSink.s,
closeCnt: 0,
closeTarget: 10,
}
require.Equal(t, tablepb.TableStatePreparing, wrapper.getState())
wrapper.stop()

closeCnt := 0
for {
closeCnt++
if wrapper.asyncStop() {
break
}
}
require.Equal(t, tablepb.TableStateStopped, wrapper.getState(), "table sink state should be stopped")
require.Equal(t, 10, closeCnt, "table sink should be closed 10 times")
}

func TestUpdateReceivedSorterResolvedTs(t *testing.T) {
Expand Down

0 comments on commit c0ac9fa

Please sign in to comment.