diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index 54f402f3e70..7e2af0a15bc 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -138,6 +138,7 @@ func (ra *RedoApplier) initSink(ctx context.Context) (err error) { func (ra *RedoApplier) bgReleaseQuota(ctx context.Context) error { ticker := time.NewTicker(time.Second) + defer ticker.Stop() for { select { case <-ctx.Done(): @@ -312,6 +313,7 @@ func (ra *RedoApplier) applyRow( ra.tableSinks[tableID] = tableSink } if _, ok := ra.tableResolvedTsMap[tableID]; !ok { + // Initialize table record using checkpointTs. ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{ ResolvedTs: model.ResolvedTs{ Mode: model.BatchResolvedMode, @@ -352,14 +354,23 @@ func (ra *RedoApplier) waitTableFlush( ticker := time.NewTicker(warnDuration) defer ticker.Stop() - ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{ - ResolvedTs: model.ResolvedTs{ - Mode: model.BatchResolvedMode, - Ts: rts, - BatchID: 1, - }, - Size: ra.tableResolvedTsMap[tableID].Size, + oldTableRecord := ra.tableResolvedTsMap[tableID] + if oldTableRecord.ResolvedTs.Ts < rts { + // Use new batch resolvedTs to flush data. + ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{ + ResolvedTs: model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: rts, + BatchID: 1, + }, + Size: ra.tableResolvedTsMap[tableID].Size, + } + } else if oldTableRecord.ResolvedTs.Ts > rts { + log.Panic("resolved ts of redo log regressed", + zap.Any("oldResolvedTs", oldTableRecord), + zap.Any("newResolvedTs", rts)) } + tableRecord := ra.tableResolvedTsMap[tableID] if err := ra.tableSinks[tableID].UpdateResolvedTs(tableRecord.ResolvedTs); err != nil { return err