Skip to content

Commit

Permalink
sink, sorter: do not hog memory (#2606)
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus authored Aug 26, 2021
1 parent 200d877 commit 142ae26
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 27 deletions.
34 changes: 28 additions & 6 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

const (
defaultSyncResolvedBatch = 1024
defaultSyncResolvedBatch = 64
)

// TableStatus is status of the table pipeline
Expand Down Expand Up @@ -132,7 +132,7 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err
if resolvedTs <= n.checkpointTs {
return nil
}
if err := n.flushRow2Sink(ctx); err != nil {
if err := n.emitRow2Sink(ctx); err != nil {
return errors.Trace(err)
}
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, resolvedTs)
Expand Down Expand Up @@ -178,7 +178,7 @@ func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicE
}

if len(n.eventBuffer) >= defaultSyncResolvedBatch {
if err := n.flushRow2Sink(ctx); err != nil {
if err := n.emitRow2Sink(ctx); err != nil {
return errors.Trace(err)
}
}
Expand Down Expand Up @@ -249,7 +249,30 @@ func splitUpdateEvent(updateEvent *model.PolymorphicEvent) (*model.PolymorphicEv
return &deleteEvent, &insertEvent, nil
}

func (n *sinkNode) flushRow2Sink(ctx pipeline.NodeContext) error {
// clear event buffer and row buffer.
// Also, it unrefs data that are holded by buffers.
func (n *sinkNode) clearBuffers() {
// Do not hog memory.
if cap(n.rowBuffer) > defaultSyncResolvedBatch {
n.rowBuffer = make([]*model.RowChangedEvent, 0, defaultSyncResolvedBatch)
} else {
for i := range n.rowBuffer {
n.rowBuffer[i] = nil
}
n.rowBuffer = n.rowBuffer[:0]
}

if cap(n.eventBuffer) > defaultSyncResolvedBatch {
n.eventBuffer = make([]*model.PolymorphicEvent, 0, defaultSyncResolvedBatch)
} else {
for i := range n.eventBuffer {
n.eventBuffer[i] = nil
}
n.eventBuffer = n.eventBuffer[:0]
}
}

func (n *sinkNode) emitRow2Sink(ctx pipeline.NodeContext) error {
for _, ev := range n.eventBuffer {
err := ev.WaitPrepare(ctx)
if err != nil {
Expand All @@ -270,8 +293,7 @@ func (n *sinkNode) flushRow2Sink(ctx pipeline.NodeContext) error {
if err != nil {
return errors.Trace(err)
}
n.rowBuffer = n.rowBuffer[:0]
n.eventBuffer = n.eventBuffer[:0]
n.clearBuffers()
return nil
}

Expand Down
39 changes: 18 additions & 21 deletions cdc/puller/sorter/file_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

const (
fileBufferSize = 32 * 1024 // 32KB
fileBufferSize = 4 * 1024 // 4KB
fileMagic = 0x12345678
numFileEntriesOffset = 4
blockMagic = 0xbeefbeef
Expand Down Expand Up @@ -160,11 +160,10 @@ func (f *fileBackEnd) cleanStats() {
}

type fileBackEndReader struct {
backEnd *fileBackEnd
f *os.File
reader *bufio.Reader
rawBytesBuf []byte
isEOF bool
backEnd *fileBackEnd
f *os.File
reader *bufio.Reader
isEOF bool

// to prevent truncation-like corruption
totalEvents uint64
Expand Down Expand Up @@ -237,14 +236,11 @@ func (r *fileBackEndReader) readNext() (*model.PolymorphicEvent, error) {
return nil, errors.Trace(wrapIOError(err))
}

if cap(r.rawBytesBuf) < int(size) {
r.rawBytesBuf = make([]byte, size)
} else {
r.rawBytesBuf = r.rawBytesBuf[:size]
}
// Note, do not hold the buffer in reader to avoid hogging memory.
rawBytesBuf := make([]byte, size)

// short reads are possible with bufio, hence the need for io.ReadFull
n, err := io.ReadFull(r.reader, r.rawBytesBuf)
n, err := io.ReadFull(r.reader, rawBytesBuf)
if err != nil {
return nil, errors.Trace(wrapIOError(err))
}
Expand All @@ -254,12 +250,12 @@ func (r *fileBackEndReader) readNext() (*model.PolymorphicEvent, error) {
}

event := new(model.PolymorphicEvent)
_, err = r.backEnd.serde.unmarshal(event, r.rawBytesBuf)
_, err = r.backEnd.serde.unmarshal(event, rawBytesBuf)
if err != nil {
return nil, errors.Trace(err)
}

r.readEvents += 1
r.readEvents++

failpoint.Inject("sorterDebug", func() {
r.readBytes += int64(4 + 4 + int(size))
Expand Down Expand Up @@ -323,10 +319,9 @@ func (r *fileBackEndReader) resetAndClose() error {
}

type fileBackEndWriter struct {
backEnd *fileBackEnd
f *os.File
writer *bufio.Writer
rawBytesBuf []byte
backEnd *fileBackEnd
f *os.File
writer *bufio.Writer

bytesWritten int64
eventsWritten int64
Expand All @@ -349,12 +344,14 @@ func (w *fileBackEndWriter) writeFileHeader() error {

func (w *fileBackEndWriter) writeNext(event *model.PolymorphicEvent) error {
var err error
w.rawBytesBuf, err = w.backEnd.serde.marshal(event, w.rawBytesBuf)
// Note, do not hold the buffer in writer to avoid hogging memory.
var rawBytesBuf []byte
rawBytesBuf, err = w.backEnd.serde.marshal(event, rawBytesBuf)
if err != nil {
return errors.Trace(wrapIOError(err))
}

size := len(w.rawBytesBuf)
size := len(rawBytesBuf)
if size == 0 {
log.Panic("fileSorterBackEnd: serialized to empty byte array. Bug?")
}
Expand All @@ -372,7 +369,7 @@ func (w *fileBackEndWriter) writeNext(event *model.PolymorphicEvent) error {
// short writes are possible with bufio
offset := 0
for offset < size {
n, err := w.writer.Write(w.rawBytesBuf[offset:])
n, err := w.writer.Write(rawBytesBuf[offset:])
if err != nil {
return errors.Trace(wrapIOError(err))
}
Expand Down

0 comments on commit 142ae26

Please sign in to comment.