From 6619c31b280ae0f905448e174e167728c4dcaf80 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Thu, 16 Mar 2023 10:50:39 +0800 Subject: [PATCH] redo(ticdc): limit memory usage in applier (#8494) ref pingcap/tiflow#8056 --- cdc/processor/memquota/mem_quota.go | 45 ++++----- cdc/redo/reader/reader.go | 9 ++ errors.toml | 5 + go.mod | 2 +- go.sum | 5 +- pkg/applier/redo.go | 140 +++++++++++++++++++++++----- pkg/errors/cdc_errors.go | 4 + pkg/util/memory_checker.go | 47 ++++++++++ 8 files changed, 206 insertions(+), 51 deletions(-) create mode 100644 pkg/util/memory_checker.go diff --git a/cdc/processor/memquota/mem_quota.go b/cdc/processor/memquota/mem_quota.go index 7c442114b85..77e7eedddd7 100644 --- a/cdc/processor/memquota/mem_quota.go +++ b/cdc/processor/memquota/mem_quota.go @@ -26,9 +26,10 @@ import ( "go.uber.org/zap" ) -type memConsumeRecord struct { - resolvedTs model.ResolvedTs - size uint64 +// MemConsumeRecord is used to trace memory usage. +type MemConsumeRecord struct { + ResolvedTs model.ResolvedTs + Size uint64 } // MemQuota is used to trace memory usage. @@ -49,8 +50,6 @@ type MemQuota struct { // blockAcquireCond is used to notify the blocked acquire. blockAcquireCond *sync.Cond - // condMu protects nothing, but sync.Cond needs a mutex. - condMu sync.Mutex metricTotal prometheus.Gauge metricUsed prometheus.Gauge @@ -58,21 +57,23 @@ type MemQuota struct { // mu protects the following fields. mu sync.Mutex // tableMemory is the memory usage of each table. - tableMemory map[model.TableID][]*memConsumeRecord + tableMemory map[model.TableID][]*MemConsumeRecord } // NewMemQuota creates a MemQuota instance. func NewMemQuota(changefeedID model.ChangeFeedID, totalBytes uint64, comp string) *MemQuota { m := &MemQuota{ - changefeedID: changefeedID, - totalBytes: totalBytes, - metricTotal: MemoryQuota.WithLabelValues(changefeedID.Namespace, changefeedID.ID, "total", comp), - metricUsed: MemoryQuota.WithLabelValues(changefeedID.Namespace, changefeedID.ID, "used", comp), - closeBg: make(chan struct{}, 1), + changefeedID: changefeedID, + totalBytes: totalBytes, + blockAcquireCond: sync.NewCond(&sync.Mutex{}), + metricTotal: MemoryQuota.WithLabelValues(changefeedID.Namespace, + changefeedID.ID, "total", comp), + metricUsed: MemoryQuota.WithLabelValues(changefeedID.Namespace, + changefeedID.ID, "used", comp), + closeBg: make(chan struct{}, 1), - tableMemory: make(map[model.TableID][]*memConsumeRecord), + tableMemory: make(map[model.TableID][]*MemConsumeRecord), } - m.blockAcquireCond = sync.NewCond(&m.condMu) m.metricTotal.Set(float64(totalBytes)) m.metricUsed.Set(float64(0)) @@ -125,9 +126,9 @@ func (m *MemQuota) BlockAcquire(nBytes uint64) error { } usedBytes := m.usedBytes.Load() if usedBytes+nBytes > m.totalBytes { - m.condMu.Lock() + m.blockAcquireCond.L.Lock() m.blockAcquireCond.Wait() - m.condMu.Unlock() + m.blockAcquireCond.L.Unlock() continue } if m.usedBytes.CompareAndSwap(usedBytes, usedBytes+nBytes) { @@ -155,7 +156,7 @@ func (m *MemQuota) Refund(nBytes uint64) { func (m *MemQuota) AddTable(tableID model.TableID) { m.mu.Lock() defer m.mu.Unlock() - m.tableMemory[tableID] = make([]*memConsumeRecord, 0, 2) + m.tableMemory[tableID] = make([]*MemConsumeRecord, 0, 2) } // Record records the memory usage of a table. @@ -177,9 +178,9 @@ func (m *MemQuota) Record(tableID model.TableID, resolved model.ResolvedTs, nByt } return } - m.tableMemory[tableID] = append(m.tableMemory[tableID], &memConsumeRecord{ - resolvedTs: resolved, - size: nBytes, + m.tableMemory[tableID] = append(m.tableMemory[tableID], &MemConsumeRecord{ + ResolvedTs: resolved, + Size: nBytes, }) } @@ -197,11 +198,11 @@ func (m *MemQuota) Release(tableID model.TableID, resolved model.ResolvedTs) { } records := m.tableMemory[tableID] i := sort.Search(len(records), func(i int) bool { - return records[i].resolvedTs.Greater(resolved) + return records[i].ResolvedTs.Greater(resolved) }) var toRelease uint64 = 0 for j := 0; j < i; j++ { - toRelease += records[j].size + toRelease += records[j].Size } m.tableMemory[tableID] = records[i:] if toRelease == 0 { @@ -236,7 +237,7 @@ func (m *MemQuota) Clean(tableID model.TableID) uint64 { cleaned := uint64(0) records := m.tableMemory[tableID] for _, record := range records { - cleaned += record.size + cleaned += record.Size } delete(m.tableMemory, tableID) diff --git a/cdc/redo/reader/reader.go b/cdc/redo/reader/reader.go index 484ba165d4b..6153aea25f2 100644 --- a/cdc/redo/reader/reader.go +++ b/cdc/redo/reader/reader.go @@ -21,6 +21,7 @@ import ( "os" "path/filepath" "sync" + "time" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" @@ -28,6 +29,7 @@ import ( "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/sink/mysql" + "github.com/pingcap/tiflow/pkg/util" "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -36,6 +38,8 @@ import ( const ( emitBatch = mysql.DefaultMaxTxnRow defaultReaderChanSize = mysql.DefaultWorkerCount * emitBatch + maxTotalMemoryUsage = 90.0 + maxWaitDuration = time.Minute * 2 ) // RedoLogReader is a reader abstraction for redo log storage layer @@ -231,6 +235,11 @@ func (l *LogReader) runReader(egCtx context.Context, cfg *readerConfig) error { case l.rowCh <- row: } } + err := util.WaitMemoryAvailable(maxTotalMemoryUsage, maxWaitDuration) + if err != nil { + return errors.Trace(err) + } + case redo.RedoDDLLogFileType: ddl := common.LogToDDL(item.data.RedoDDL) if ddl != nil && ddl.CommitTs > cfg.startTs && ddl.CommitTs <= cfg.endTs { diff --git a/errors.toml b/errors.toml index 83ddf59b127..d46d18f98f9 100755 --- a/errors.toml +++ b/errors.toml @@ -1261,6 +1261,11 @@ error = ''' version is incompatible: %s ''' +["CDC:ErrWaitFreeMemoryTimeout"] +error = ''' +wait free memory timeout +''' + ["CDC:ErrWaitHandleOperationTimeout"] error = ''' waiting processor to handle the operation finished timeout diff --git a/go.mod b/go.mod index d2e5b66c9cf..3eb260a53d8 100644 --- a/go.mod +++ b/go.mod @@ -67,6 +67,7 @@ require ( github.com/prometheus/client_model v0.3.0 github.com/r3labs/diff v1.1.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 + github.com/shirou/gopsutil/v3 v3.23.1 github.com/shopspring/decimal v1.3.0 github.com/soheilhy/cmux v0.1.5 github.com/spf13/cobra v1.6.1 @@ -239,7 +240,6 @@ require ( github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/rs/cors v1.7.0 // indirect github.com/sasha-s/go-deadlock v0.2.0 // indirect - github.com/shirou/gopsutil/v3 v3.22.12 // indirect github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 // indirect github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546 // indirect diff --git a/go.sum b/go.sum index 73316209b89..82dd3ba506d 100644 --- a/go.sum +++ b/go.sum @@ -1327,8 +1327,8 @@ github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shirou/gopsutil/v3 v3.21.12/go.mod h1:BToYZVTlSVlfazpDDYFnsVZLaoRG+g8ufT6fPQLdJzA= -github.com/shirou/gopsutil/v3 v3.22.12 h1:oG0ns6poeUSxf78JtOsfygNWuEHYYz8hnnNg7P04TJs= -github.com/shirou/gopsutil/v3 v3.22.12/go.mod h1:Xd7P1kwZcp5VW52+9XsirIKd/BROzbb2wdX3Kqlz9uI= +github.com/shirou/gopsutil/v3 v3.23.1 h1:a9KKO+kGLKEvcPIs4W62v0nu3sciVDOOOPUD0Hz7z/4= +github.com/shirou/gopsutil/v3 v3.23.1/go.mod h1:NN6mnm5/0k8jw4cBfCnJtr5L7ErOTg18tMNpgFkn0hA= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= github.com/shopspring/decimal v1.3.0 h1:KK3gWIXskZ2O1U/JNTisNcvH+jveJxZYrjbTsrbbnh8= github.com/shopspring/decimal v1.3.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= @@ -1881,7 +1881,6 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220909162455-aba9fc2a8ff2/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index 5562c6e0006..54f402f3e70 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -22,6 +22,7 @@ import ( timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/memquota" "github.com/pingcap/tiflow/cdc/redo/reader" "github.com/pingcap/tiflow/cdc/sinkv2/ddlsink" ddlfactory "github.com/pingcap/tiflow/cdc/sinkv2/ddlsink/factory" @@ -67,12 +68,15 @@ type RedoApplier struct { ddlSink ddlsink.DDLEventSink appliedDDLCount uint64 + memQuota *memquota.MemQuota + pendingQuota uint64 + // sinkFactory is used to create table sinks. sinkFactory *dmlfactory.SinkFactory // tableSinks is a map from tableID to table sink. // We create it when we need it, and close it after we finish applying the redo logs. tableSinks map[model.TableID]tablesink.TableSink - tableResolvedTsMap map[model.TableID]model.ResolvedTs + tableResolvedTsMap map[model.TableID]*memquota.MemConsumeRecord appliedLogCount uint64 errCh chan error @@ -128,10 +132,25 @@ func (ra *RedoApplier) initSink(ctx context.Context) (err error) { } ra.tableSinks = make(map[model.TableID]tablesink.TableSink) - ra.tableResolvedTsMap = make(map[model.TableID]model.ResolvedTs) + ra.tableResolvedTsMap = make(map[model.TableID]*memquota.MemConsumeRecord) return nil } +func (ra *RedoApplier) bgReleaseQuota(ctx context.Context) error { + ticker := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case <-ticker.C: + for tableID, tableSink := range ra.tableSinks { + checkpointTs := tableSink.GetCheckpointTs() + ra.memQuota.Release(tableID, checkpointTs) + } + } + } +} + func (ra *RedoApplier) consumeLogs(ctx context.Context) error { checkpointTs, resolvedTs, err := ra.rd.ReadMeta(ctx) if err != nil { @@ -200,6 +219,42 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { return errApplyFinished } +func (ra *RedoApplier) resetQuota(rowSize uint64) error { + if rowSize >= config.DefaultChangefeedMemoryQuota || rowSize < ra.pendingQuota { + log.Panic("row size exceeds memory quota", + zap.Uint64("rowSize", rowSize), + zap.Uint64("memoryQuota", config.DefaultChangefeedMemoryQuota)) + } + + // flush all tables before acquire new quota + for tableID, tableRecord := range ra.tableResolvedTsMap { + if !tableRecord.ResolvedTs.IsBatchMode() { + log.Panic("table resolved ts should always be in batch mode when apply redo log") + } + + if err := ra.tableSinks[tableID].UpdateResolvedTs(tableRecord.ResolvedTs); err != nil { + return err + } + ra.memQuota.Record(tableID, + tableRecord.ResolvedTs, tableRecord.Size) + + // reset new record + ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{ + ResolvedTs: tableRecord.ResolvedTs.AdvanceBatch(), + Size: 0, + } + } + + oldQuota := ra.pendingQuota + ra.pendingQuota = rowSize * mysql.DefaultMaxTxnRow + if ra.pendingQuota > config.DefaultChangefeedMemoryQuota { + ra.pendingQuota = config.DefaultChangefeedMemoryQuota + } else if ra.pendingQuota < 64*1024 { + ra.pendingQuota = 64 * 1024 + } + return ra.memQuota.BlockAcquire(ra.pendingQuota - oldQuota) +} + func (ra *RedoApplier) applyDDL( ctx context.Context, ddl *model.DDLEvent, checkpointTs uint64, ) error { @@ -239,6 +294,14 @@ func (ra *RedoApplier) applyDDL( func (ra *RedoApplier) applyRow( row *model.RowChangedEvent, checkpointTs model.Ts, ) error { + rowSize := uint64(row.ApproximateBytes()) + if rowSize > ra.pendingQuota { + if err := ra.resetQuota(uint64(row.ApproximateBytes())); err != nil { + return err + } + } + ra.pendingQuota -= rowSize + tableID := row.Table.TableID if _, ok := ra.tableSinks[tableID]; !ok { tableSink := ra.sinkFactory.CreateTableSink( @@ -249,17 +312,30 @@ func (ra *RedoApplier) applyRow( ra.tableSinks[tableID] = tableSink } if _, ok := ra.tableResolvedTsMap[tableID]; !ok { - ra.tableResolvedTsMap[tableID] = model.NewResolvedTs(checkpointTs) + ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{ + ResolvedTs: model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: checkpointTs, + BatchID: 1, + }, + Size: 0, + } } + ra.tableSinks[tableID].AppendRowChangedEvents(row) - if row.CommitTs > ra.tableResolvedTsMap[tableID].Ts { + record := ra.tableResolvedTsMap[tableID] + record.Size += rowSize + if row.CommitTs > record.ResolvedTs.Ts { // Use batch resolvedTs to flush data as quickly as possible. - ra.tableResolvedTsMap[tableID] = model.ResolvedTs{ - Mode: model.BatchResolvedMode, - Ts: row.CommitTs, - BatchID: 1, + ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{ + ResolvedTs: model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: row.CommitTs, + BatchID: 1, + }, + Size: record.Size, } - } else if row.CommitTs < ra.tableResolvedTsMap[tableID].Ts { + } else if row.CommitTs < ra.tableResolvedTsMap[tableID].ResolvedTs.Ts { log.Panic("commit ts of redo log regressed", zap.Int64("tableID", tableID), zap.Uint64("commitTs", row.CommitTs), @@ -267,16 +343,6 @@ func (ra *RedoApplier) applyRow( } ra.appliedLogCount++ - if ra.appliedLogCount%mysql.DefaultMaxTxnRow == 0 { - for tableID, tableResolvedTs := range ra.tableResolvedTsMap { - if err := ra.tableSinks[tableID].UpdateResolvedTs(tableResolvedTs); err != nil { - return err - } - if tableResolvedTs.IsBatchMode() { - ra.tableResolvedTsMap[tableID] = tableResolvedTs.AdvanceBatch() - } - } - } return nil } @@ -286,13 +352,23 @@ func (ra *RedoApplier) waitTableFlush( ticker := time.NewTicker(warnDuration) defer ticker.Stop() - resolvedTs := model.NewResolvedTs(rts) - ra.tableResolvedTsMap[tableID] = resolvedTs - if err := ra.tableSinks[tableID].UpdateResolvedTs(resolvedTs); err != nil { + ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{ + ResolvedTs: model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: rts, + BatchID: 1, + }, + Size: ra.tableResolvedTsMap[tableID].Size, + } + tableRecord := ra.tableResolvedTsMap[tableID] + if err := ra.tableSinks[tableID].UpdateResolvedTs(tableRecord.ResolvedTs); err != nil { return err } + ra.memQuota.Record(tableID, + tableRecord.ResolvedTs, tableRecord.Size) + // Make sure all events are flushed to downstream. - for !ra.tableSinks[tableID].GetCheckpointTs().EqualOrGreater(resolvedTs) { + for !ra.tableSinks[tableID].GetCheckpointTs().EqualOrGreater(tableRecord.ResolvedTs) { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) @@ -300,13 +376,19 @@ func (ra *RedoApplier) waitTableFlush( log.Warn( "Table sink is not catching up with resolved ts for a long time", zap.Int64("tableID", tableID), - zap.Any("resolvedTs", resolvedTs), + zap.Any("resolvedTs", tableRecord.ResolvedTs), zap.Any("checkpointTs", ra.tableSinks[tableID].GetCheckpointTs()), ) default: time.Sleep(flushWaitDuration) } } + + // reset new record + ra.tableResolvedTsMap[tableID] = &memquota.MemConsumeRecord{ + ResolvedTs: tableRecord.ResolvedTs.AdvanceBatch(), + Size: 0, + } return nil } @@ -333,13 +415,21 @@ func (ra *RedoApplier) ReadMeta(ctx context.Context) (checkpointTs uint64, resol func (ra *RedoApplier) Apply(egCtx context.Context) (err error) { eg, egCtx := errgroup.WithContext(egCtx) egCtx = contextutil.PutRoleInCtx(egCtx, util.RoleRedoLogApplier) + if ra.rd, err = createRedoReader(egCtx, ra.cfg); err != nil { return err } - eg.Go(func() error { return ra.rd.Run(egCtx) }) + + ra.memQuota = memquota.NewMemQuota(model.DefaultChangeFeedID(applierChangefeed), + config.DefaultChangefeedMemoryQuota, "sink") + defer ra.memQuota.Close() + eg.Go(func() error { + return ra.bgReleaseQuota(egCtx) + }) + eg.Go(func() error { return ra.consumeLogs(egCtx) }) diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 404a90e7978..015b1381ddf 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -485,6 +485,10 @@ var ( ErrDiskFull = errors.Normalize( "failed to preallocate file because disk is full", errors.RFCCodeText("CDC:ErrDiskFull")) + ErrWaitFreeMemoryTimeout = errors.Normalize( + "wait free memory timeout", + errors.RFCCodeText("CDC:ErrWaitFreeMemoryTimeout"), + ) // encode/decode, data format and data integrity errors ErrInvalidRecordKey = errors.Normalize( diff --git a/pkg/util/memory_checker.go b/pkg/util/memory_checker.go new file mode 100644 index 00000000000..8132edadc85 --- /dev/null +++ b/pkg/util/memory_checker.go @@ -0,0 +1,47 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "time" + + "github.com/pingcap/tiflow/pkg/errors" + "github.com/shirou/gopsutil/v3/mem" +) + +// CheckMemoryUsage checks if the memory usage is less than the limit. +func CheckMemoryUsage(limit float64) (bool, error) { + stat, err := mem.VirtualMemory() + if err != nil { + return false, err + } + return stat.UsedPercent < limit, nil +} + +// WaitMemoryAvailable waits until the memory usage is less than the limit. +func WaitMemoryAvailable(limit float64, timeout time.Duration) error { + start := time.Now() + for { + hasFreeMemory, err := CheckMemoryUsage(limit) + if err != nil { + return err + } + if hasFreeMemory { + return nil + } + if time.Since(start) > timeout { + return errors.ErrWaitFreeMemoryTimeout.GenWithStackByArgs() + } + } +}