Skip to content

Commit

Permalink
scheduler, processor(ticdc): advance redo resolvedTs before start sink (
Browse files Browse the repository at this point in the history
#9277)

close #9172
  • Loading branch information
CharlesCheung96 authored Jun 21, 2023
1 parent 5c368b5 commit b5dc8fa
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 33 deletions.
15 changes: 13 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ var _ scheduler.TableExecutor = (*processor)(nil)
// 2. Prepare phase for 2 phase scheduling, `isPrepare` should be true.
// 3. Replicating phase for 2 phase scheduling, `isPrepare` should be false
func (p *processor) AddTableSpan(
ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool,
ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool, barrier *schedulepb.Barrier,
) (bool, error) {
if !p.checkReadyForMessages() {
return false, nil
Expand Down Expand Up @@ -143,6 +143,16 @@ func (p *processor) AddTableSpan(
// table is `prepared`, and a `isPrepare = false` request indicate that old table should
// be stopped on original capture already, it's safe to start replicating data now.
if !isPrepare {
if p.redo.r.Enabled() {
var redoResolvedTs model.Ts
if barrier != nil {
redoResolvedTs = barrier.GlobalBarrierTs
} else {
stats := p.sinkManager.r.GetTableStats(span)
redoResolvedTs = stats.BarrierTs
}
p.redo.r.StartTable(span, redoResolvedTs)
}
if err := p.sinkManager.r.StartTable(span, startTs); err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -222,7 +232,8 @@ func (p *processor) IsAddTableSpanFinished(span tablepb.Span, isPrepare bool) bo
var tableResolvedTs, tableCheckpointTs uint64
var state tablepb.TableState
done := func() bool {
state, alreadyExist := p.sinkManager.r.GetTableState(span)
var alreadyExist bool
state, alreadyExist = p.sinkManager.r.GetTableState(span)
if alreadyExist {
stats := p.sinkManager.r.GetTableStats(span)
tableResolvedTs = stats.ResolvedTs
Expand Down
20 changes: 10 additions & 10 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) {

// table-1: `preparing` -> `prepared` -> `replicating`
span := spanz.TableIDToComparableSpan(1)
ok, err := p.AddTableSpan(ctx, span, 20, true)
ok, err := p.AddTableSpan(ctx, span, 20, true, nil)
require.NoError(t, err)
require.True(t, ok)
p.sinkManager.r.UpdateBarrierTs(20, nil)
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) {
require.True(t, ok)
require.Equal(t, tablepb.TableStatePrepared, state)

ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 30, true)
ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 30, true, nil)
require.NoError(t, err)
require.True(t, ok)
stats = p.sinkManager.r.GetTableStats(span)
Expand All @@ -261,7 +261,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) {
require.Equal(t, model.Ts(20), stats.BarrierTs)

// Start to replicate table-1.
ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 30, false)
ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 30, false, nil)
require.NoError(t, err)
require.True(t, ok)

Expand Down Expand Up @@ -357,10 +357,10 @@ func TestProcessorClose(t *testing.T) {
tester.MustApplyPatches()

// add tables
done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 20, false)
done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 20, false, nil)
require.Nil(t, err)
require.True(t, done)
done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), 30, false)
done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), 30, false, nil)
require.Nil(t, err)
require.True(t, done)

Expand Down Expand Up @@ -396,10 +396,10 @@ func TestProcessorClose(t *testing.T) {
tester.MustApplyPatches()

// add tables
done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 20, false)
done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 20, false, nil)
require.Nil(t, err)
require.True(t, done)
done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), 30, false)
done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), 30, false, nil)
require.Nil(t, err)
require.True(t, done)
err = p.Tick(ctx)
Expand Down Expand Up @@ -441,10 +441,10 @@ func TestPositionDeleted(t *testing.T) {
tester.MustApplyPatches()

// add table
done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 30, false)
done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 30, false, nil)
require.Nil(t, err)
require.True(t, done)
done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), 40, false)
done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), 40, false, nil)
require.Nil(t, err)
require.True(t, done)

Expand Down Expand Up @@ -550,7 +550,7 @@ func TestUpdateBarrierTs(t *testing.T) {
tester.MustApplyPatches()

span := spanz.TableIDToComparableSpan(1)
done, err := p.AddTableSpan(ctx, span, 5, false)
done, err := p.AddTableSpan(ctx, span, 5, false, nil)
require.True(t, done)
require.Nil(t, err)
err = p.Tick(ctx)
Expand Down
6 changes: 6 additions & 0 deletions cdc/processor/sinkmanager/redo_log_advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ func (m *mockRedoDMLManager) UpdateResolvedTs(ctx context.Context,
return nil
}

func (m *mockRedoDMLManager) StartTable(span tablepb.Span, resolvedTs uint64) {
m.mu.Lock()
defer m.mu.Unlock()
m.resolvedTss[span.TableID] = resolvedTs
}

func (m *mockRedoDMLManager) GetResolvedTs(span tablepb.Span) model.Ts {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
7 changes: 7 additions & 0 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (m *ddlManager) GetResolvedTs() model.Ts {
type DMLManager interface {
redoManager
AddTable(span tablepb.Span, startTs uint64)
StartTable(span tablepb.Span, startTs uint64)
RemoveTable(span tablepb.Span)
UpdateResolvedTs(ctx context.Context, span tablepb.Span, resolvedTs uint64) error
GetResolvedTs(span tablepb.Span) model.Ts
Expand Down Expand Up @@ -305,6 +306,12 @@ func (m *logManager) emitRedoEvents(
})
}

// StartTable starts a table, which means the table is ready to emit redo events.
// Note that this function should only be called once when adding a new table to processor.
func (m *logManager) StartTable(span tablepb.Span, resolvedTs uint64) {
m.onResolvedTsMsg(span, resolvedTs)
}

// UpdateResolvedTs asynchronously updates resolved ts of a single table.
func (m *logManager) UpdateResolvedTs(
ctx context.Context,
Expand Down
3 changes: 2 additions & 1 deletion cdc/scheduler/internal/table_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/scheduler/schedulepb"
)

// TableExecutor is an abstraction for "Processor".
Expand All @@ -30,7 +31,7 @@ type TableExecutor interface {
// if `isPrepare` is true, the 1st phase of the 2 phase scheduling protocol.
// if `isPrepare` is false, the 2nd phase.
AddTableSpan(
ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool,
ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool, barrier *schedulepb.Barrier,
) (done bool, err error)

// IsAddTableSpanFinished make sure the requested table span is in the proper status
Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/v3/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (a *agent) Tick(ctx context.Context) (*schedulepb.Barrier, error) {
return nil, errors.Trace(err)
}

responses, err := a.tableM.poll(ctx)
responses, err := a.tableM.poll(ctx, barrier)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
24 changes: 12 additions & 12 deletions cdc/scheduler/internal/v3/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) {
// remove table not exist
ctx := context.Background()
a.handleMessageDispatchTableRequest(removeTableRequest, processorEpoch)
responses, err := a.tableM.poll(ctx)
responses, err := a.tableM.poll(ctx, &schedulepb.Barrier{})
require.NoError(t, err)
require.Len(t, responses, 0)

Expand All @@ -183,7 +183,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) {
mockTableExecutor.On("AddTableSpan", mock.Anything, mock.Anything,
mock.Anything, mock.Anything).Return(false, nil)
a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch)
responses, err = a.tableM.poll(ctx)
responses, err = a.tableM.poll(ctx, &schedulepb.Barrier{})
require.NoError(t, err)
require.Len(t, responses, 1)

Expand All @@ -203,14 +203,14 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) {
mockTableExecutor.On("IsAddTableSpanFinished", mock.Anything,
mock.Anything, mock.Anything).Return(false, nil)
a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch)
_, err = a.tableM.poll(ctx)
_, err = a.tableM.poll(ctx, &schedulepb.Barrier{})
require.NoError(t, err)

mockTableExecutor.ExpectedCalls = mockTableExecutor.ExpectedCalls[:1]
mockTableExecutor.On("IsAddTableSpanFinished", mock.Anything,
mock.Anything, mock.Anything).Return(true, nil)
a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch)
responses, err = a.tableM.poll(ctx)
responses, err = a.tableM.poll(ctx, &schedulepb.Barrier{})
require.NoError(t, err)
require.Len(t, responses, 1)

Expand All @@ -232,7 +232,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) {
mock.Anything, mock.Anything).Return(false, nil)

a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch)
responses, err = a.tableM.poll(ctx)
responses, err = a.tableM.poll(ctx, &schedulepb.Barrier{})
require.NoError(t, err)
require.Len(t, responses, 1)

Expand All @@ -247,7 +247,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) {
mockTableExecutor.On("IsAddTableSpanFinished", mock.Anything,
mock.Anything, mock.Anything).Return(true, nil)
a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch)
responses, err = a.tableM.poll(ctx)
responses, err = a.tableM.poll(ctx, &schedulepb.Barrier{})
require.NoError(t, err)
require.Len(t, responses, 1)

Expand All @@ -262,7 +262,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) {
Return(false)
// remove table in the replicating state failed, should still in replicating.
a.handleMessageDispatchTableRequest(removeTableRequest, processorEpoch)
responses, err = a.tableM.poll(ctx)
responses, err = a.tableM.poll(ctx, &schedulepb.Barrier{})
require.NoError(t, err)
require.Len(t, responses, 1)
removeTableResponse, ok := responses[0].DispatchTableResponse.
Expand All @@ -279,7 +279,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) {
Return(3, false)
// remove table in the replicating state failed, should still in replicating.
a.handleMessageDispatchTableRequest(removeTableRequest, processorEpoch)
responses, err = a.tableM.poll(ctx)
responses, err = a.tableM.poll(ctx, &schedulepb.Barrier{})
require.NoError(t, err)
require.Len(t, responses, 1)
removeTableResponse, ok = responses[0].DispatchTableResponse.
Expand All @@ -293,7 +293,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) {
Return(3, true)
// remove table in the replicating state success, should in stopped
a.handleMessageDispatchTableRequest(removeTableRequest, processorEpoch)
responses, err = a.tableM.poll(ctx)
responses, err = a.tableM.poll(ctx, &schedulepb.Barrier{})
require.NoError(t, err)
require.Len(t, responses, 1)
removeTableResponse, ok = responses[0].DispatchTableResponse.
Expand Down Expand Up @@ -809,7 +809,7 @@ func TestAgentCommitAddTableDuringStopping(t *testing.T) {

// Prepare add table is still in-progress.
mockTableExecutor.
On("AddTableSpan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
On("AddTableSpan", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(true, nil).Once()
mockTableExecutor.
On("IsAddTableSpanFinished", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Expand All @@ -819,7 +819,7 @@ func TestAgentCommitAddTableDuringStopping(t *testing.T) {
require.Len(t, trans.SendBuffer, 0)

mockTableExecutor.
On("AddTableSpan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
On("AddTableSpan", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(true, nil).Once()
mockTableExecutor.
On("IsAddTableSpanFinished", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Expand Down Expand Up @@ -1069,7 +1069,7 @@ func newMockTableExecutor() *MockTableExecutor {

// AddTableSpan adds a table span to the executor.
func (e *MockTableExecutor) AddTableSpan(
ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool,
ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool, _ *schedulepb.Barrier,
) (bool, error) {
log.Info("AddTableSpan",
zap.String("span", span.String()),
Expand Down
14 changes: 7 additions & 7 deletions cdc/scheduler/internal/v3/agent/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,14 @@ func (t *tableSpan) handleRemoveTableTask() *schedulepb.Message {
}

func (t *tableSpan) handleAddTableTask(
ctx context.Context,
ctx context.Context, barrier *schedulepb.Barrier,
) (result *schedulepb.Message, err error) {
state, _ := t.getAndUpdateTableSpanState()
changed := true
for changed {
switch state {
case tablepb.TableStateAbsent:
done, err := t.executor.AddTableSpan(ctx, t.task.Span, t.task.StartTs, t.task.IsPrepare)
done, err := t.executor.AddTableSpan(ctx, t.task.Span, t.task.StartTs, t.task.IsPrepare, barrier)
if err != nil || !done {
log.Warn("schedulerv3: agent add table failed",
zap.String("namespace", t.changefeedID.Namespace),
Expand Down Expand Up @@ -208,7 +208,7 @@ func (t *tableSpan) handleAddTableTask(
}

if t.task.status == dispatchTableTaskReceived {
done, err := t.executor.AddTableSpan(ctx, t.task.Span, t.task.StartTs, false)
done, err := t.executor.AddTableSpan(ctx, t.task.Span, t.task.StartTs, false, barrier)
if err != nil || !done {
log.Warn("schedulerv3: agent add table failed",
zap.String("namespace", t.changefeedID.Namespace),
Expand Down Expand Up @@ -283,14 +283,14 @@ func (t *tableSpan) injectDispatchTableTask(task *dispatchTableTask) {
zap.Any("ignoredTask", task))
}

func (t *tableSpan) poll(ctx context.Context) (*schedulepb.Message, error) {
func (t *tableSpan) poll(ctx context.Context, barrier *schedulepb.Barrier) (*schedulepb.Message, error) {
if t.task == nil {
return nil, nil
}
if t.task.IsRemove {
return t.handleRemoveTableTask(), nil
}
return t.handleAddTableTask(ctx)
return t.handleAddTableTask(ctx, barrier)
}

type tableSpanManager struct {
Expand All @@ -310,12 +310,12 @@ func newTableSpanManager(
}
}

func (tm *tableSpanManager) poll(ctx context.Context) ([]*schedulepb.Message, error) {
func (tm *tableSpanManager) poll(ctx context.Context, barrier *schedulepb.Barrier) ([]*schedulepb.Message, error) {
result := make([]*schedulepb.Message, 0)
var err error
toBeDropped := []tablepb.Span{}
tm.tables.Ascend(func(span tablepb.Span, table *tableSpan) bool {
message, err1 := table.poll(ctx)
message, err1 := table.poll(ctx, barrier)
if err != nil {
err = errors.Trace(err1)
return false
Expand Down

0 comments on commit b5dc8fa

Please sign in to comment.