Skip to content

Commit

Permalink
redo(ticdc): fix resolvedTs regression in applier's memory quota reco…
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 authored and 3AceShowHand committed Apr 3, 2023
1 parent 16ff3a2 commit 1dc3fc3
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions pkg/applier/redo.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (ra *RedoApplier) initSink(ctx context.Context) (err error) {

func (ra *RedoApplier) bgReleaseQuota(ctx context.Context) error {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -313,6 +314,7 @@ func (ra *RedoApplier) applyRow(
ra.tableSinks[tableID] = tableSink
}
if _, ok := ra.tableResolvedTsMap[tableID]; !ok {
// Initialize table record using checkpointTs.
ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{
ResolvedTs: model.ResolvedTs{
Mode: model.BatchResolvedMode,
Expand Down Expand Up @@ -353,14 +355,23 @@ func (ra *RedoApplier) waitTableFlush(
ticker := time.NewTicker(warnDuration)
defer ticker.Stop()

ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{
ResolvedTs: model.ResolvedTs{
Mode: model.BatchResolvedMode,
Ts: rts,
BatchID: 1,
},
Size: ra.tableResolvedTsMap[tableID].Size,
oldTableRecord := ra.tableResolvedTsMap[tableID]
if oldTableRecord.ResolvedTs.Ts < rts {
// Use new batch resolvedTs to flush data.
ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{
ResolvedTs: model.ResolvedTs{
Mode: model.BatchResolvedMode,
Ts: rts,
BatchID: 1,
},
Size: ra.tableResolvedTsMap[tableID].Size,
}
} else if oldTableRecord.ResolvedTs.Ts > rts {
log.Panic("resolved ts of redo log regressed",
zap.Any("oldResolvedTs", oldTableRecord),
zap.Any("newResolvedTs", rts))
}

tableRecord := ra.tableResolvedTsMap[tableID]
if err := ra.tableSinks[tableID].UpdateResolvedTs(tableRecord.ResolvedTs); err != nil {
return err
Expand Down

0 comments on commit 1dc3fc3

Please sign in to comment.