Skip to content

Commit

Permalink
*(ticdc): reduce collect table stats frequency (pingcap#8003) (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 28, 2023
1 parent db2e700 commit 08ed958
Show file tree
Hide file tree
Showing 16 changed files with 243 additions and 111 deletions.
14 changes: 11 additions & 3 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (p *processor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) {
}

// GetTableStatus implements TableExecutor interface
func (p *processor) GetTableStatus(tableID model.TableID) tablepb.TableStatus {
func (p *processor) GetTableStatus(tableID model.TableID, collectStat bool) tablepb.TableStatus {
if p.pullBasedSinking {
state, exist := p.sinkManager.GetTableState(tableID)
if !exist {
Expand All @@ -458,14 +458,18 @@ func (p *processor) GetTableStatus(tableID model.TableID) tablepb.TableStatus {
}
}
sinkStats := p.sinkManager.GetTableStats(tableID)
stats := tablepb.Stats{}
if collectStat {
stats = p.getStatsFromSourceManagerAndSinkManager(tableID, sinkStats)
}
return tablepb.TableStatus{
TableID: tableID,
Checkpoint: tablepb.Checkpoint{
CheckpointTs: sinkStats.CheckpointTs,
ResolvedTs: sinkStats.ResolvedTs,
},
State: state,
Stats: p.getStatsFromSourceManagerAndSinkManager(tableID, sinkStats),
Stats: stats,
}
}
table, ok := p.tables[tableID]
Expand All @@ -475,14 +479,18 @@ func (p *processor) GetTableStatus(tableID model.TableID) tablepb.TableStatus {
State: tablepb.TableStateAbsent,
}
}
stats := tablepb.Stats{}
if collectStat {
stats = table.Stats()
}
return tablepb.TableStatus{
TableID: tableID,
Checkpoint: tablepb.Checkpoint{
CheckpointTs: table.CheckpointTs(),
ResolvedTs: table.ResolvedTs(),
},
State: table.State(),
Stats: table.Stats(),
Stats: stats,
}
}

Expand Down
4 changes: 2 additions & 2 deletions cdc/scheduler/internal/table_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ type TableExecutor interface {
// called immediately before.
GetCheckpoint() (checkpointTs, resolvedTs model.Ts)

// GetTableStatus return the checkpoint and resolved ts for the given table
GetTableStatus(tableID model.TableID) tablepb.TableStatus
// GetTableStatus return the checkpoint and resolved ts for the given table.
GetTableStatus(tableID model.TableID, collectStat bool) tablepb.TableStatus
}
4 changes: 2 additions & 2 deletions cdc/scheduler/internal/v3/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,15 +237,15 @@ func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) *schedulep
allTables := a.tableM.getAllTables()
result := make([]tablepb.TableStatus, 0, len(allTables))
for _, table := range allTables {
status := table.getTableStatus()
status := table.getTableStatus(request.CollectStats)
if table.task != nil && table.task.IsRemove {
status.State = tablepb.TableStateStopping
}
result = append(result, status)
}
for _, tableID := range request.GetTableIDs() {
if _, ok := allTables[tableID]; !ok {
status := a.tableM.getTableStatus(tableID)
status := a.tableM.getTableStatus(tableID, request.CollectStats)
result = append(result, status)
}
}
Expand Down
4 changes: 3 additions & 1 deletion cdc/scheduler/internal/v3/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,9 @@ func (e *MockTableExecutor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts)
}

// GetTableStatus implements TableExecutor interface
func (e *MockTableExecutor) GetTableStatus(tableID model.TableID) tablepb.TableStatus {
func (e *MockTableExecutor) GetTableStatus(
tableID model.TableID, collectStat bool,
) tablepb.TableStatus {
state, ok := e.tables[tableID]
if !ok {
state = tablepb.TableStateAbsent
Expand Down
32 changes: 17 additions & 15 deletions cdc/scheduler/internal/v3/agent/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func newTable(
func (t *table) getAndUpdateTableState() (tablepb.TableState, bool) {
oldState := t.state

meta := t.executor.GetTableStatus(t.id)
meta := t.executor.GetTableStatus(t.id, false)
t.state = meta.State

if oldState != t.state {
Expand All @@ -68,8 +68,8 @@ func (t *table) getAndUpdateTableState() (tablepb.TableState, bool) {
return t.state, false
}

func (t *table) getTableStatus() tablepb.TableStatus {
return t.executor.GetTableStatus(t.id)
func (t *table) getTableStatus(collectStat bool) tablepb.TableStatus {
return t.executor.GetTableStatus(t.id, collectStat)
}

func newAddTableResponseMessage(status tablepb.TableStatus) *schedulepb.Message {
Expand Down Expand Up @@ -113,20 +113,20 @@ func (t *table) handleRemoveTableTask() *schedulepb.Message {
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.id))
t.task = nil
return newRemoveTableResponseMessage(t.getTableStatus())
return newRemoveTableResponseMessage(t.getTableStatus(false))
case tablepb.TableStateStopping, // stopping now is useless
tablepb.TableStateStopped:
// release table resource, and get the latest checkpoint
// this will let the table become `absent`
checkpointTs, done := t.executor.IsRemoveTableFinished(t.id)
if !done {
// actually, this should never be hit, since we know that table is stopped.
status := t.getTableStatus()
status := t.getTableStatus(false)
status.State = tablepb.TableStateStopping
return newRemoveTableResponseMessage(status)
}
t.task = nil
status := t.getTableStatus()
status := t.getTableStatus(false)
status.State = tablepb.TableStateStopped
status.Checkpoint.CheckpointTs = checkpointTs
return newRemoveTableResponseMessage(status)
Expand All @@ -135,7 +135,7 @@ func (t *table) handleRemoveTableTask() *schedulepb.Message {
tablepb.TableStateReplicating:
done := t.executor.RemoveTable(t.task.TableID)
if !done {
status := t.getTableStatus()
status := t.getTableStatus(false)
status.State = tablepb.TableStateStopping
return newRemoveTableResponseMessage(status)
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.id), zap.Any("task", t.task),
zap.Error(err))
status := t.getTableStatus()
status := t.getTableStatus(false)
return newAddTableResponseMessage(status), errors.Trace(err)
}
state, changed = t.getAndUpdateTableState()
Expand All @@ -173,7 +173,7 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.id), zap.Stringer("state", state))
t.task = nil
status := t.getTableStatus()
status := t.getTableStatus(false)
return newAddTableResponseMessage(status), nil
case tablepb.TableStatePrepared:
if t.task.IsPrepare {
Expand All @@ -183,7 +183,7 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.id), zap.Stringer("state", state))
t.task = nil
return newAddTableResponseMessage(t.getTableStatus()), nil
return newAddTableResponseMessage(t.getTableStatus(false)), nil
}

if t.task.status == dispatchTableTaskReceived {
Expand All @@ -194,15 +194,15 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.id), zap.Stringer("state", state),
zap.Error(err))
status := t.getTableStatus()
status := t.getTableStatus(false)
return newAddTableResponseMessage(status), errors.Trace(err)
}
t.task.status = dispatchTableTaskProcessed
}

done := t.executor.IsAddTableFinished(t.task.TableID, false)
if !done {
return newAddTableResponseMessage(t.getTableStatus()), nil
return newAddTableResponseMessage(t.getTableStatus(false)), nil
}
state, changed = t.getAndUpdateTableState()
case tablepb.TableStatePreparing:
Expand All @@ -224,7 +224,7 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.id))
t.task = nil
return newAddTableResponseMessage(t.getTableStatus()), nil
return newAddTableResponseMessage(t.getTableStatus(false)), nil
default:
log.Panic("schedulerv3: unknown table state",
zap.String("namespace", t.changefeedID.Namespace),
Expand Down Expand Up @@ -357,10 +357,12 @@ func (tm *tableManager) dropTable(tableID model.TableID) {
delete(tm.tables, tableID)
}

func (tm *tableManager) getTableStatus(tableID model.TableID) tablepb.TableStatus {
func (tm *tableManager) getTableStatus(
tableID model.TableID, collectStat bool,
) tablepb.TableStatus {
table, ok := tm.getTable(tableID)
if ok {
return table.getTableStatus()
return table.getTableStatus(collectStat)
}

return tablepb.TableStatus{
Expand Down
3 changes: 1 addition & 2 deletions cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ func newCoordinator(
captureID: captureID,
replicationM: replication.NewReplicationManager(
cfg.MaxTaskConcurrency, changefeedID),
captureM: member.NewCaptureManager(
captureID, changefeedID, revision, cfg.HeartbeatTick),
captureM: member.NewCaptureManager(captureID, changefeedID, revision, cfg),
schedulerM: scheduler.NewSchedulerManager(changefeedID, cfg),
changefeedID: changefeedID,
}
Expand Down
15 changes: 11 additions & 4 deletions cdc/scheduler/internal/v3/coordinator_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/replication"
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/transport"
"github.com/pingcap/tiflow/cdc/scheduler/schedulepb"
"github.com/pingcap/tiflow/pkg/config"
"go.uber.org/zap/zapcore"
)

Expand Down Expand Up @@ -69,12 +70,14 @@ func BenchmarkCoordinatorInit(b *testing.B) {
for i := 0; i < total; i++ {
currentTables = append(currentTables, int64(10000+i))
}
// Disable heartbeat.
cfg := config.NewDefaultSchedulerConfig()
cfg.HeartbeatTick = math.MaxInt
coord = &coordinator{
trans: transport.NewMockTrans(),
replicationM: replication.NewReplicationManager(10, model.ChangeFeedID{}),
// Disable heartbeat.
captureM: member.NewCaptureManager(
"", model.ChangeFeedID{}, schedulepb.OwnerRevision{}, math.MaxInt),
"", model.ChangeFeedID{}, schedulepb.OwnerRevision{}, cfg),
}
name = fmt.Sprintf("InitTable %d", total)
return name, coord, currentTables, captures
Expand All @@ -91,8 +94,10 @@ func BenchmarkCoordinatorHeartbeat(b *testing.B) {
const captureCount = 8
captures = map[model.CaptureID]*model.CaptureInfo{}
// Always heartbeat.
cfg := config.NewDefaultSchedulerConfig()
cfg.HeartbeatTick = 1
captureM := member.NewCaptureManager(
"", model.ChangeFeedID{}, schedulepb.OwnerRevision{}, 0)
"", model.ChangeFeedID{}, schedulepb.OwnerRevision{}, cfg)
captureM.SetInitializedForTests(true)
for i := 0; i < captureCount; i++ {
captures[fmt.Sprint(i)] = &model.CaptureInfo{}
Expand Down Expand Up @@ -124,8 +129,10 @@ func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) {
const captureCount = 8
captures = map[model.CaptureID]*model.CaptureInfo{}
// Disable heartbeat.
cfg := config.NewDefaultSchedulerConfig()
cfg.HeartbeatTick = math.MaxInt
captureM := member.NewCaptureManager(
"", model.ChangeFeedID{}, schedulepb.OwnerRevision{}, math.MaxInt)
"", model.ChangeFeedID{}, schedulepb.OwnerRevision{}, cfg)
captureM.SetInitializedForTests(true)
for i := 0; i < captureCount; i++ {
captures[fmt.Sprint(i)] = &model.CaptureInfo{}
Expand Down
10 changes: 8 additions & 2 deletions cdc/scheduler/internal/v3/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func TestCoordinatorSendMsgs(t *testing.T) {
captureID: "0",
trans: trans,
}
coord.captureM = member.NewCaptureManager("", model.ChangeFeedID{}, coord.revision, 0)
cfg := config.NewDefaultSchedulerConfig()
coord.captureM = member.NewCaptureManager("", model.ChangeFeedID{}, coord.revision, cfg)
coord.sendMsgs(
ctx, []*schedulepb.Message{{To: "1", MsgType: schedulepb.MsgDispatchTableRequest}})

Expand Down Expand Up @@ -120,6 +121,7 @@ func TestCoordinatorHeartbeat(t *testing.T) {

coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
CollectStatsTick: math.MaxInt,
MaxTaskConcurrency: 1,
AddTableBatchSize: 50,
})
Expand Down Expand Up @@ -178,6 +180,7 @@ func TestCoordinatorAddCapture(t *testing.T) {
t.Parallel()
coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
CollectStatsTick: math.MaxInt,
MaxTaskConcurrency: 1,
})
trans := transport.NewMockTrans()
Expand Down Expand Up @@ -234,6 +237,7 @@ func TestCoordinatorRemoveCapture(t *testing.T) {

coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
CollectStatsTick: math.MaxInt,
MaxTaskConcurrency: 1,
AddTableBatchSize: 50,
})
Expand Down Expand Up @@ -278,7 +282,8 @@ func TestCoordinatorDrainCapture(t *testing.T) {
revision: schedulepb.OwnerRevision{Revision: 3},
captureID: "a",
}
coord.captureM = member.NewCaptureManager("", model.ChangeFeedID{}, coord.revision, 0)
cfg := config.NewDefaultSchedulerConfig()
coord.captureM = member.NewCaptureManager("", model.ChangeFeedID{}, coord.revision, cfg)

coord.captureM.SetInitializedForTests(true)
coord.captureM.Captures["a"] = &member.CaptureStatus{State: member.CaptureStateUninitialized}
Expand Down Expand Up @@ -325,6 +330,7 @@ func TestCoordinatorAdvanceCheckpoint(t *testing.T) {

coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
CollectStatsTick: math.MaxInt,
MaxTaskConcurrency: 1,
})
trans := transport.NewMockTrans()
Expand Down
28 changes: 18 additions & 10 deletions cdc/scheduler/internal/v3/member/capture_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/replication"
"github.com/pingcap/tiflow/cdc/scheduler/schedulepb"
"github.com/pingcap/tiflow/pkg/config"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -121,8 +122,10 @@ type CaptureManager struct {
changes *CaptureChanges

// A logical clock counter, for heartbeat.
tickCounter int
heartbeatTick int
tickCounter int
heartbeatTick int
collectStatsTick int
pendingCollect bool

changefeedID model.ChangeFeedID
ownerID model.CaptureID
Expand All @@ -131,12 +134,13 @@ type CaptureManager struct {
// NewCaptureManager returns a new capture manager.
func NewCaptureManager(
ownerID model.CaptureID, changefeedID model.ChangeFeedID,
rev schedulepb.OwnerRevision, heartbeatTick int,
rev schedulepb.OwnerRevision, cfg *config.SchedulerConfig,
) *CaptureManager {
return &CaptureManager{
OwnerRev: rev,
Captures: make(map[model.CaptureID]*CaptureStatus),
heartbeatTick: heartbeatTick,
OwnerRev: rev,
Captures: make(map[model.CaptureID]*CaptureStatus),
heartbeatTick: cfg.HeartbeatTick,
collectStatsTick: cfg.CollectStatsTick,

changefeedID: changefeedID,
ownerID: ownerID,
Expand All @@ -160,16 +164,18 @@ func (c *CaptureManager) checkAllCaptureInitialized() bool {
return len(c.Captures) != 0
}

// Tick advances the logical lock of capture manager and produce heartbeat when
// Tick advances the logical clock of capture manager and produce heartbeat when
// necessary.
func (c *CaptureManager) Tick(
reps map[model.TableID]*replication.ReplicationSet, drainingCapture model.CaptureID,
) []*schedulepb.Message {
c.tickCounter++
if c.tickCounter < c.heartbeatTick {
if c.tickCounter%c.collectStatsTick == 0 {
c.pendingCollect = true
}
if c.tickCounter%c.heartbeatTick != 0 {
return nil
}
c.tickCounter = 0
tables := make(map[model.CaptureID][]model.TableID)
for tableID, rep := range reps {
for captureID := range rep.Captures {
Expand All @@ -185,10 +191,12 @@ func (c *CaptureManager) Tick(
TableIDs: tables[to],
// IsStopping let the receiver capture know that it should be stopping now.
// At the moment, this is triggered by `DrainCapture` scheduler.
IsStopping: drainingCapture == to,
IsStopping: drainingCapture == to,
CollectStats: c.pendingCollect,
},
})
}
c.pendingCollect = false
return msgs
}

Expand Down
Loading

0 comments on commit 08ed958

Please sign in to comment.