Skip to content

Commit

Permalink
This is an automated cherry-pick of #8494
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Apr 4, 2023
1 parent 0180aed commit 676f0c6
Show file tree
Hide file tree
Showing 7 changed files with 398 additions and 16 deletions.
52 changes: 36 additions & 16 deletions cdc/processor/memquota/mem_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
"go.uber.org/zap"
)

type memConsumeRecord struct {
resolvedTs model.ResolvedTs
size uint64
// MemConsumeRecord is used to trace memory usage.
type MemConsumeRecord struct {
ResolvedTs model.ResolvedTs
Size uint64
}

// MemQuota is used to trace memory usage.
Expand All @@ -49,30 +50,38 @@ type MemQuota struct {

// blockAcquireCond is used to notify the blocked acquire.
blockAcquireCond *sync.Cond
// condMu protects nothing, but sync.Cond needs a mutex.
condMu sync.Mutex

metricTotal prometheus.Gauge
metricUsed prometheus.Gauge

// mu protects the following fields.
mu sync.Mutex
// tableMemory is the memory usage of each table.
<<<<<<< HEAD
tableMemory map[model.TableID][]*memConsumeRecord
=======
tableMemory *spanz.HashMap[[]*MemConsumeRecord]
>>>>>>> 7848c0caad (redo(ticdc): limit memory usage in applier (#8494))
}

// NewMemQuota creates a MemQuota instance.
func NewMemQuota(changefeedID model.ChangeFeedID, totalBytes uint64, comp string) *MemQuota {
m := &MemQuota{
changefeedID: changefeedID,
totalBytes: totalBytes,
metricTotal: MemoryQuota.WithLabelValues(changefeedID.Namespace, changefeedID.ID, "total", comp),
metricUsed: MemoryQuota.WithLabelValues(changefeedID.Namespace, changefeedID.ID, "used", comp),
closeBg: make(chan struct{}, 1),
changefeedID: changefeedID,
totalBytes: totalBytes,
blockAcquireCond: sync.NewCond(&sync.Mutex{}),
metricTotal: MemoryQuota.WithLabelValues(changefeedID.Namespace,
changefeedID.ID, "total", comp),
metricUsed: MemoryQuota.WithLabelValues(changefeedID.Namespace,
changefeedID.ID, "used", comp),
closeBg: make(chan struct{}, 1),

<<<<<<< HEAD
tableMemory: make(map[model.TableID][]*memConsumeRecord),
=======
tableMemory: spanz.NewHashMap[[]*MemConsumeRecord](),
>>>>>>> 7848c0caad (redo(ticdc): limit memory usage in applier (#8494))
}
m.blockAcquireCond = sync.NewCond(&m.condMu)
m.metricTotal.Set(float64(totalBytes))
m.metricUsed.Set(float64(0))

Expand Down Expand Up @@ -125,9 +134,9 @@ func (m *MemQuota) BlockAcquire(nBytes uint64) error {
}
usedBytes := m.usedBytes.Load()
if usedBytes+nBytes > m.totalBytes {
m.condMu.Lock()
m.blockAcquireCond.L.Lock()
m.blockAcquireCond.Wait()
m.condMu.Unlock()
m.blockAcquireCond.L.Unlock()
continue
}
if m.usedBytes.CompareAndSwap(usedBytes, usedBytes+nBytes) {
Expand Down Expand Up @@ -155,7 +164,11 @@ func (m *MemQuota) Refund(nBytes uint64) {
func (m *MemQuota) AddTable(tableID model.TableID) {
m.mu.Lock()
defer m.mu.Unlock()
<<<<<<< HEAD
m.tableMemory[tableID] = make([]*memConsumeRecord, 0, 2)
=======
m.tableMemory.ReplaceOrInsert(span, make([]*MemConsumeRecord, 0, 2))
>>>>>>> 7848c0caad (redo(ticdc): limit memory usage in applier (#8494))
}

// Record records the memory usage of a table.
Expand All @@ -177,10 +190,17 @@ func (m *MemQuota) Record(tableID model.TableID, resolved model.ResolvedTs, nByt
}
return
}
<<<<<<< HEAD
m.tableMemory[tableID] = append(m.tableMemory[tableID], &memConsumeRecord{
resolvedTs: resolved,
size: nBytes,
})
=======
m.tableMemory.ReplaceOrInsert(span, append(m.tableMemory.GetV(span), &MemConsumeRecord{
ResolvedTs: resolved,
Size: nBytes,
}))
>>>>>>> 7848c0caad (redo(ticdc): limit memory usage in applier (#8494))
}

// Release try to use resolvedTs to release the memory quota.
Expand All @@ -197,11 +217,11 @@ func (m *MemQuota) Release(tableID model.TableID, resolved model.ResolvedTs) {
}
records := m.tableMemory[tableID]
i := sort.Search(len(records), func(i int) bool {
return records[i].resolvedTs.Greater(resolved)
return records[i].ResolvedTs.Greater(resolved)
})
var toRelease uint64 = 0
for j := 0; j < i; j++ {
toRelease += records[j].size
toRelease += records[j].Size
}
m.tableMemory[tableID] = records[i:]
if toRelease == 0 {
Expand Down Expand Up @@ -236,7 +256,7 @@ func (m *MemQuota) Clean(tableID model.TableID) uint64 {
cleaned := uint64(0)
records := m.tableMemory[tableID]
for _, record := range records {
cleaned += record.size
cleaned += record.Size
}
delete(m.tableMemory, tableID)

Expand Down
66 changes: 66 additions & 0 deletions cdc/redo/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,31 @@ import (
"os"
"path/filepath"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/common"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
<<<<<<< HEAD
"go.uber.org/multierr"
"go.uber.org/zap"
=======
"github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

const (
emitBatch = mysql.DefaultMaxTxnRow
defaultReaderChanSize = mysql.DefaultWorkerCount * emitBatch
maxTotalMemoryUsage = 90.0
maxWaitDuration = time.Minute * 2
>>>>>>> 7848c0caad (redo(ticdc): limit memory usage in applier (#8494))
)

// RedoLogReader is a reader abstraction for redo log storage layer
Expand Down Expand Up @@ -277,6 +293,56 @@ func (l *LogReader) ReadNextLog(ctx context.Context, maxNumberOfEvents uint64) (

ld := &logWithIdx{
data: rl,
<<<<<<< HEAD
=======
idx: i,
}
redoLogHeap = append(redoLogHeap, ld)
}
heap.Init(&redoLogHeap)

for redoLogHeap.Len() != 0 {
item := heap.Pop(&redoLogHeap).(*logWithIdx)

switch cfg.fileType {
case redo.RedoRowLogFileType:
row := item.data.RedoRow.Row
// By design only data (startTs,endTs] is needed,
// so filter out data may beyond the boundary.
if row != nil && row.CommitTs > cfg.startTs && row.CommitTs <= cfg.endTs {
select {
case <-egCtx.Done():
return errors.Trace(egCtx.Err())
case l.rowCh <- row:
}
}
err := util.WaitMemoryAvailable(maxTotalMemoryUsage, maxWaitDuration)
if err != nil {
return errors.Trace(err)
}

case redo.RedoDDLLogFileType:
ddl := item.data.RedoDDL.DDL
if ddl != nil && ddl.CommitTs > cfg.startTs && ddl.CommitTs <= cfg.endTs {
select {
case <-egCtx.Done():
return errors.Trace(egCtx.Err())
case l.ddlCh <- ddl:
}
}
}

// read next and push again
rl, err := fileReaders[item.idx].Read()
if err != nil {
if err != io.EOF {
return errors.Trace(err)
}
continue
}
ld := &logWithIdx{
data: rl,
>>>>>>> 7848c0caad (redo(ticdc): limit memory usage in applier (#8494))
idx: item.idx,
}
heap.Push(&l.rowHeap, ld)
Expand Down
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1261,9 +1261,15 @@ error = '''
version is incompatible: %s
'''

<<<<<<< HEAD
["CDC:ErrWaitHandleOperationTimeout"]
error = '''
waiting processor to handle the operation finished timeout
=======
["CDC:ErrWaitFreeMemoryTimeout"]
error = '''
wait free memory timeout
>>>>>>> 7848c0caad (redo(ticdc): limit memory usage in applier (#8494))
'''

["CDC:ErrWorkerPoolGracefulUnregisterTimedOut"]
Expand Down
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ require (
github.com/prometheus/client_model v0.3.0
github.com/r3labs/diff v1.1.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
<<<<<<< HEAD
=======
github.com/segmentio/kafka-go v0.4.39-0.20230217181906-f6986fb02ee7
github.com/shirou/gopsutil/v3 v3.23.1
>>>>>>> 7848c0caad (redo(ticdc): limit memory usage in applier (#8494))
github.com/shopspring/decimal v1.3.0
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.6.1
Expand Down Expand Up @@ -239,7 +244,10 @@ require (
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/rs/cors v1.7.0 // indirect
github.com/sasha-s/go-deadlock v0.2.0 // indirect
<<<<<<< HEAD
github.com/shirou/gopsutil/v3 v3.22.12 // indirect
=======
>>>>>>> 7848c0caad (redo(ticdc): limit memory usage in applier (#8494))
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect
github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 // indirect
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546 // indirect
Expand Down
Loading

0 comments on commit 676f0c6

Please sign in to comment.