Skip to content

Commit

Permalink
wait all event flushed before finish aplly operation
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Mar 21, 2023
1 parent 33581b6 commit 46baa92
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions pkg/applier/redo.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func (ra *RedoApplier) applyRow(
ra.tableSinks[tableID] = tableSink
}
if _, ok := ra.tableResolvedTsMap[tableID]; !ok {
// Initialize table record using checkpointTs.
ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{
ResolvedTs: model.ResolvedTs{
Mode: model.BatchResolvedMode,
Expand Down Expand Up @@ -358,14 +359,23 @@ func (ra *RedoApplier) waitTableFlush(
ticker := time.NewTicker(warnDuration)
defer ticker.Stop()

ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{
ResolvedTs: model.ResolvedTs{
Mode: model.BatchResolvedMode,
Ts: rts,
BatchID: 1,
},
Size: ra.tableResolvedTsMap[tableID].Size,
oldTableRecord := ra.tableResolvedTsMap[tableID]
if oldTableRecord.ResolvedTs.Ts < rts {
// Use new batch resolvedTs to flush data.
ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{
ResolvedTs: model.ResolvedTs{
Mode: model.BatchResolvedMode,
Ts: rts,
BatchID: 1,
},
Size: ra.tableResolvedTsMap[tableID].Size,
}
} else if oldTableRecord.ResolvedTs.Ts > rts {
log.Panic("resolved ts of redo log regressed",
zap.Any("oldResolvedTs", oldTableRecord),
zap.Any("newResolvedTs", rts))
}

tableRecord := ra.tableResolvedTsMap[tableID]
if err := ra.tableSinks[tableID].UpdateResolvedTs(tableRecord.ResolvedTs); err != nil {
return err
Expand Down

0 comments on commit 46baa92

Please sign in to comment.