From 46baa928fe2ed3a7952b29c45a70817e3921c289 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 21 Mar 2023 23:23:48 +0800 Subject: [PATCH] wait all event flushed before finish aplly operation --- pkg/applier/redo.go | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index f6489100b9c..ddfbd8b292f 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -318,6 +318,7 @@ func (ra *RedoApplier) applyRow( ra.tableSinks[tableID] = tableSink } if _, ok := ra.tableResolvedTsMap[tableID]; !ok { + // Initialize table record using checkpointTs. ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{ ResolvedTs: model.ResolvedTs{ Mode: model.BatchResolvedMode, @@ -358,14 +359,23 @@ func (ra *RedoApplier) waitTableFlush( ticker := time.NewTicker(warnDuration) defer ticker.Stop() - ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{ - ResolvedTs: model.ResolvedTs{ - Mode: model.BatchResolvedMode, - Ts: rts, - BatchID: 1, - }, - Size: ra.tableResolvedTsMap[tableID].Size, + oldTableRecord := ra.tableResolvedTsMap[tableID] + if oldTableRecord.ResolvedTs.Ts < rts { + // Use new batch resolvedTs to flush data. + ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{ + ResolvedTs: model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: rts, + BatchID: 1, + }, + Size: ra.tableResolvedTsMap[tableID].Size, + } + } else if oldTableRecord.ResolvedTs.Ts > rts { + log.Panic("resolved ts of redo log regressed", + zap.Any("oldResolvedTs", oldTableRecord), + zap.Any("newResolvedTs", rts)) } + tableRecord := ra.tableResolvedTsMap[tableID] if err := ra.tableSinks[tableID].UpdateResolvedTs(tableRecord.ResolvedTs); err != nil { return err