Skip to content

Commit

Permalink
redo(ticdc): limit memory usage in applier (pingcap#8494)
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Apr 4, 2023
1 parent 4f1add5 commit 6619c31
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 51 deletions.
45 changes: 23 additions & 22 deletions cdc/processor/memquota/mem_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
"go.uber.org/zap"
)

type memConsumeRecord struct {
resolvedTs model.ResolvedTs
size uint64
// MemConsumeRecord is used to trace memory usage.
type MemConsumeRecord struct {
ResolvedTs model.ResolvedTs
Size uint64
}

// MemQuota is used to trace memory usage.
Expand All @@ -49,30 +50,30 @@ type MemQuota struct {

// blockAcquireCond is used to notify the blocked acquire.
blockAcquireCond *sync.Cond
// condMu protects nothing, but sync.Cond needs a mutex.
condMu sync.Mutex

metricTotal prometheus.Gauge
metricUsed prometheus.Gauge

// mu protects the following fields.
mu sync.Mutex
// tableMemory is the memory usage of each table.
tableMemory map[model.TableID][]*memConsumeRecord
tableMemory map[model.TableID][]*MemConsumeRecord
}

// NewMemQuota creates a MemQuota instance.
func NewMemQuota(changefeedID model.ChangeFeedID, totalBytes uint64, comp string) *MemQuota {
m := &MemQuota{
changefeedID: changefeedID,
totalBytes: totalBytes,
metricTotal: MemoryQuota.WithLabelValues(changefeedID.Namespace, changefeedID.ID, "total", comp),
metricUsed: MemoryQuota.WithLabelValues(changefeedID.Namespace, changefeedID.ID, "used", comp),
closeBg: make(chan struct{}, 1),
changefeedID: changefeedID,
totalBytes: totalBytes,
blockAcquireCond: sync.NewCond(&sync.Mutex{}),
metricTotal: MemoryQuota.WithLabelValues(changefeedID.Namespace,
changefeedID.ID, "total", comp),
metricUsed: MemoryQuota.WithLabelValues(changefeedID.Namespace,
changefeedID.ID, "used", comp),
closeBg: make(chan struct{}, 1),

tableMemory: make(map[model.TableID][]*memConsumeRecord),
tableMemory: make(map[model.TableID][]*MemConsumeRecord),
}
m.blockAcquireCond = sync.NewCond(&m.condMu)
m.metricTotal.Set(float64(totalBytes))
m.metricUsed.Set(float64(0))

Expand Down Expand Up @@ -125,9 +126,9 @@ func (m *MemQuota) BlockAcquire(nBytes uint64) error {
}
usedBytes := m.usedBytes.Load()
if usedBytes+nBytes > m.totalBytes {
m.condMu.Lock()
m.blockAcquireCond.L.Lock()
m.blockAcquireCond.Wait()
m.condMu.Unlock()
m.blockAcquireCond.L.Unlock()
continue
}
if m.usedBytes.CompareAndSwap(usedBytes, usedBytes+nBytes) {
Expand Down Expand Up @@ -155,7 +156,7 @@ func (m *MemQuota) Refund(nBytes uint64) {
func (m *MemQuota) AddTable(tableID model.TableID) {
m.mu.Lock()
defer m.mu.Unlock()
m.tableMemory[tableID] = make([]*memConsumeRecord, 0, 2)
m.tableMemory[tableID] = make([]*MemConsumeRecord, 0, 2)
}

// Record records the memory usage of a table.
Expand All @@ -177,9 +178,9 @@ func (m *MemQuota) Record(tableID model.TableID, resolved model.ResolvedTs, nByt
}
return
}
m.tableMemory[tableID] = append(m.tableMemory[tableID], &memConsumeRecord{
resolvedTs: resolved,
size: nBytes,
m.tableMemory[tableID] = append(m.tableMemory[tableID], &MemConsumeRecord{
ResolvedTs: resolved,
Size: nBytes,
})
}

Expand All @@ -197,11 +198,11 @@ func (m *MemQuota) Release(tableID model.TableID, resolved model.ResolvedTs) {
}
records := m.tableMemory[tableID]
i := sort.Search(len(records), func(i int) bool {
return records[i].resolvedTs.Greater(resolved)
return records[i].ResolvedTs.Greater(resolved)
})
var toRelease uint64 = 0
for j := 0; j < i; j++ {
toRelease += records[j].size
toRelease += records[j].Size
}
m.tableMemory[tableID] = records[i:]
if toRelease == 0 {
Expand Down Expand Up @@ -236,7 +237,7 @@ func (m *MemQuota) Clean(tableID model.TableID) uint64 {
cleaned := uint64(0)
records := m.tableMemory[tableID]
for _, record := range records {
cleaned += record.size
cleaned += record.Size
}
delete(m.tableMemory, tableID)

Expand Down
9 changes: 9 additions & 0 deletions cdc/redo/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
"os"
"path/filepath"
"sync"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand All @@ -36,6 +38,8 @@ import (
const (
emitBatch = mysql.DefaultMaxTxnRow
defaultReaderChanSize = mysql.DefaultWorkerCount * emitBatch
maxTotalMemoryUsage = 90.0
maxWaitDuration = time.Minute * 2
)

// RedoLogReader is a reader abstraction for redo log storage layer
Expand Down Expand Up @@ -231,6 +235,11 @@ func (l *LogReader) runReader(egCtx context.Context, cfg *readerConfig) error {
case l.rowCh <- row:
}
}
err := util.WaitMemoryAvailable(maxTotalMemoryUsage, maxWaitDuration)
if err != nil {
return errors.Trace(err)
}

case redo.RedoDDLLogFileType:
ddl := common.LogToDDL(item.data.RedoDDL)
if ddl != nil && ddl.CommitTs > cfg.startTs && ddl.CommitTs <= cfg.endTs {
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,11 @@ error = '''
version is incompatible: %s
'''

["CDC:ErrWaitFreeMemoryTimeout"]
error = '''
wait free memory timeout
'''

["CDC:ErrWaitHandleOperationTimeout"]
error = '''
waiting processor to handle the operation finished timeout
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ require (
github.com/prometheus/client_model v0.3.0
github.com/r3labs/diff v1.1.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/shirou/gopsutil/v3 v3.23.1
github.com/shopspring/decimal v1.3.0
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.6.1
Expand Down Expand Up @@ -239,7 +240,6 @@ require (
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/rs/cors v1.7.0 // indirect
github.com/sasha-s/go-deadlock v0.2.0 // indirect
github.com/shirou/gopsutil/v3 v3.22.12 // indirect
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect
github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 // indirect
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546 // indirect
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1327,8 +1327,8 @@ github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZ
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shirou/gopsutil/v3 v3.21.12/go.mod h1:BToYZVTlSVlfazpDDYFnsVZLaoRG+g8ufT6fPQLdJzA=
github.com/shirou/gopsutil/v3 v3.22.12 h1:oG0ns6poeUSxf78JtOsfygNWuEHYYz8hnnNg7P04TJs=
github.com/shirou/gopsutil/v3 v3.22.12/go.mod h1:Xd7P1kwZcp5VW52+9XsirIKd/BROzbb2wdX3Kqlz9uI=
github.com/shirou/gopsutil/v3 v3.23.1 h1:a9KKO+kGLKEvcPIs4W62v0nu3sciVDOOOPUD0Hz7z/4=
github.com/shirou/gopsutil/v3 v3.23.1/go.mod h1:NN6mnm5/0k8jw4cBfCnJtr5L7ErOTg18tMNpgFkn0hA=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shopspring/decimal v1.3.0 h1:KK3gWIXskZ2O1U/JNTisNcvH+jveJxZYrjbTsrbbnh8=
github.com/shopspring/decimal v1.3.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
Expand Down Expand Up @@ -1881,7 +1881,6 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220909162455-aba9fc2a8ff2/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
Expand Down
Loading

0 comments on commit 6619c31

Please sign in to comment.