Skip to content

Commit

Permalink
This is an automated cherry-pick of #8620
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Mar 22, 2023
1 parent 1b0f286 commit 10640aa
Showing 1 changed file with 217 additions and 0 deletions.
217 changes: 217 additions & 0 deletions pkg/applier/redo.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,41 @@ func (ra *RedoApplier) catchError(ctx context.Context) error {
}
}

<<<<<<< HEAD
=======
func (ra *RedoApplier) initSink(ctx context.Context) (err error) {
replicaConfig := config.GetDefaultReplicaConfig()
ra.sinkFactory, err = dmlfactory.New(ctx, ra.cfg.SinkURI, replicaConfig, ra.errCh)
if err != nil {
return err
}
ra.ddlSink, err = ddlfactory.New(ctx, ra.cfg.SinkURI, replicaConfig)
if err != nil {
return err
}

ra.tableSinks = make(map[model.TableID]tablesink.TableSink)
ra.tableResolvedTsMap = make(map[model.TableID]*memquota.MemConsumeRecord)
return nil
}

func (ra *RedoApplier) bgReleaseQuota(ctx context.Context) error {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-ticker.C:
for tableID, tableSink := range ra.tableSinks {
checkpointTs := tableSink.GetCheckpointTs()
ra.memQuota.Release(spanz.TableIDToComparableSpan(tableID), checkpointTs)
}
}
}
}

>>>>>>> def41a61d5 (redo(ticdc): fix resolvedTs regression in applier's memory quota record (#8620))
func (ra *RedoApplier) consumeLogs(ctx context.Context) error {
checkpointTs, resolvedTs, err := ra.rd.ReadMeta(ctx)
if err != nil {
Expand Down Expand Up @@ -168,6 +203,7 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error {
}
}

<<<<<<< HEAD
const warnDuration = 3 * time.Minute
const flushWaitDuration = 200 * time.Millisecond
ticker := time.NewTicker(warnDuration)
Expand All @@ -176,6 +212,187 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error {
resolvedTs := model.NewResolvedTs(resolvedTs)
if err := ra.tableSinks[tableID].UpdateResolvedTs(resolvedTs); err != nil {
return err
=======
log.Info("apply redo log finishes",
zap.Uint64("appliedLogCount", ra.appliedLogCount),
zap.Uint64("appliedDDLCount", ra.appliedDDLCount),
zap.Uint64("currentCheckpoint", resolvedTs))
return errApplyFinished
}

func (ra *RedoApplier) resetQuota(rowSize uint64) error {
if rowSize >= config.DefaultChangefeedMemoryQuota || rowSize < ra.pendingQuota {
log.Panic("row size exceeds memory quota",
zap.Uint64("rowSize", rowSize),
zap.Uint64("memoryQuota", config.DefaultChangefeedMemoryQuota))
}

// flush all tables before acquire new quota
for tableID, tableRecord := range ra.tableResolvedTsMap {
if !tableRecord.ResolvedTs.IsBatchMode() {
log.Panic("table resolved ts should always be in batch mode when apply redo log")
}

if err := ra.tableSinks[tableID].UpdateResolvedTs(tableRecord.ResolvedTs); err != nil {
return err
}
ra.memQuota.Record(spanz.TableIDToComparableSpan(tableID),
tableRecord.ResolvedTs, tableRecord.Size)

// reset new record
ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{
ResolvedTs: tableRecord.ResolvedTs.AdvanceBatch(),
Size: 0,
}
}

oldQuota := ra.pendingQuota
ra.pendingQuota = rowSize * mysql.DefaultMaxTxnRow
if ra.pendingQuota > config.DefaultChangefeedMemoryQuota {
ra.pendingQuota = config.DefaultChangefeedMemoryQuota
} else if ra.pendingQuota < 64*1024 {
ra.pendingQuota = 64 * 1024
}
return ra.memQuota.BlockAcquire(ra.pendingQuota - oldQuota)
}

func (ra *RedoApplier) applyDDL(
ctx context.Context, ddl *model.DDLEvent, checkpointTs uint64,
) error {
shouldSkip := func() bool {
if ddl.CommitTs == checkpointTs {
if _, ok := unsupportedDDL[ddl.Type]; ok {
log.Error("ignore unsupported DDL", zap.Any("ddl", ddl))
return true
}
}
if ddl.TableInfo == nil {
// Note this could omly happen when using old version of cdc, and the commit ts
// of the DDL should be equal to checkpoint ts or resolved ts.
log.Warn("ignore DDL without table info", zap.Any("ddl", ddl))
return true
}
return false
}
if shouldSkip() {
return nil
}
log.Warn("apply DDL", zap.Any("ddl", ddl))
// Wait all tables to flush data before applying DDL.
// TODO: only block tables that are affected by this DDL.
for tableID := range ra.tableSinks {
if err := ra.waitTableFlush(ctx, tableID, ddl.CommitTs); err != nil {
return err
}
}
if err := ra.ddlSink.WriteDDLEvent(ctx, ddl); err != nil {
return err
}
ra.appliedDDLCount++
return nil
}

func (ra *RedoApplier) applyRow(
row *model.RowChangedEvent, checkpointTs model.Ts,
) error {
rowSize := uint64(row.ApproximateBytes())
if rowSize > ra.pendingQuota {
if err := ra.resetQuota(uint64(row.ApproximateBytes())); err != nil {
return err
}
}
ra.pendingQuota -= rowSize

tableID := row.Table.TableID
if _, ok := ra.tableSinks[tableID]; !ok {
tableSink := ra.sinkFactory.CreateTableSink(
model.DefaultChangeFeedID(applierChangefeed),
spanz.TableIDToComparableSpan(tableID),
prometheus.NewCounter(prometheus.CounterOpts{}),
)
ra.tableSinks[tableID] = tableSink
}
if _, ok := ra.tableResolvedTsMap[tableID]; !ok {
// Initialize table record using checkpointTs.
ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{
ResolvedTs: model.ResolvedTs{
Mode: model.BatchResolvedMode,
Ts: checkpointTs,
BatchID: 1,
},
Size: 0,
}
}

ra.tableSinks[tableID].AppendRowChangedEvents(row)
record := ra.tableResolvedTsMap[tableID]
record.Size += rowSize
if row.CommitTs > record.ResolvedTs.Ts {
// Use batch resolvedTs to flush data as quickly as possible.
ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{
ResolvedTs: model.ResolvedTs{
Mode: model.BatchResolvedMode,
Ts: row.CommitTs,
BatchID: 1,
},
Size: record.Size,
}
} else if row.CommitTs < ra.tableResolvedTsMap[tableID].ResolvedTs.Ts {
log.Panic("commit ts of redo log regressed",
zap.Int64("tableID", tableID),
zap.Uint64("commitTs", row.CommitTs),
zap.Any("resolvedTs", ra.tableResolvedTsMap[tableID]))
}

ra.appliedLogCount++
return nil
}

func (ra *RedoApplier) waitTableFlush(
ctx context.Context, tableID model.TableID, rts model.Ts,
) error {
ticker := time.NewTicker(warnDuration)
defer ticker.Stop()

oldTableRecord := ra.tableResolvedTsMap[tableID]
if oldTableRecord.ResolvedTs.Ts < rts {
// Use new batch resolvedTs to flush data.
ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{
ResolvedTs: model.ResolvedTs{
Mode: model.BatchResolvedMode,
Ts: rts,
BatchID: 1,
},
Size: ra.tableResolvedTsMap[tableID].Size,
}
} else if oldTableRecord.ResolvedTs.Ts > rts {
log.Panic("resolved ts of redo log regressed",
zap.Any("oldResolvedTs", oldTableRecord),
zap.Any("newResolvedTs", rts))
}

tableRecord := ra.tableResolvedTsMap[tableID]
if err := ra.tableSinks[tableID].UpdateResolvedTs(tableRecord.ResolvedTs); err != nil {
return err
}
ra.memQuota.Record(spanz.TableIDToComparableSpan(tableID),
tableRecord.ResolvedTs, tableRecord.Size)

// Make sure all events are flushed to downstream.
for !ra.tableSinks[tableID].GetCheckpointTs().EqualOrGreater(tableRecord.ResolvedTs) {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-ticker.C:
log.Warn(
"Table sink is not catching up with resolved ts for a long time",
zap.Int64("tableID", tableID),
zap.Any("resolvedTs", tableRecord.ResolvedTs),
zap.Any("checkpointTs", ra.tableSinks[tableID].GetCheckpointTs()),
)
default:
time.Sleep(flushWaitDuration)
>>>>>>> def41a61d5 (redo(ticdc): fix resolvedTs regression in applier's memory quota record (#8620))
}
// Make sure all events are flushed to downstream.
for !ra.tableSinks[tableID].GetCheckpointTs().EqualOrGreater(resolvedTs) {
Expand Down

0 comments on commit 10640aa

Please sign in to comment.