diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index e1f4e34266a..1367cc68e54 100644 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -28,7 +28,7 @@ import ( ) const ( - defaultSyncResolvedBatch = 1024 + defaultSyncResolvedBatch = 64 ) // TableStatus is status of the table pipeline @@ -132,7 +132,7 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err if resolvedTs <= n.checkpointTs { return nil } - if err := n.flushRow2Sink(ctx); err != nil { + if err := n.emitRow2Sink(ctx); err != nil { return errors.Trace(err) } checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, resolvedTs) @@ -178,7 +178,7 @@ func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicE } if len(n.eventBuffer) >= defaultSyncResolvedBatch { - if err := n.flushRow2Sink(ctx); err != nil { + if err := n.emitRow2Sink(ctx); err != nil { return errors.Trace(err) } } @@ -249,7 +249,30 @@ func splitUpdateEvent(updateEvent *model.PolymorphicEvent) (*model.PolymorphicEv return &deleteEvent, &insertEvent, nil } -func (n *sinkNode) flushRow2Sink(ctx pipeline.NodeContext) error { +// clear event buffer and row buffer. +// Also, it unrefs data that are holded by buffers. +func (n *sinkNode) clearBuffers() { + // Do not hog memory. + if cap(n.rowBuffer) > defaultSyncResolvedBatch { + n.rowBuffer = make([]*model.RowChangedEvent, 0, defaultSyncResolvedBatch) + } else { + for i := range n.rowBuffer { + n.rowBuffer[i] = nil + } + n.rowBuffer = n.rowBuffer[:0] + } + + if cap(n.eventBuffer) > defaultSyncResolvedBatch { + n.eventBuffer = make([]*model.PolymorphicEvent, 0, defaultSyncResolvedBatch) + } else { + for i := range n.eventBuffer { + n.eventBuffer[i] = nil + } + n.eventBuffer = n.eventBuffer[:0] + } +} + +func (n *sinkNode) emitRow2Sink(ctx pipeline.NodeContext) error { for _, ev := range n.eventBuffer { err := ev.WaitPrepare(ctx) if err != nil { @@ -270,8 +293,7 @@ func (n *sinkNode) flushRow2Sink(ctx pipeline.NodeContext) error { if err != nil { return errors.Trace(err) } - n.rowBuffer = n.rowBuffer[:0] - n.eventBuffer = n.eventBuffer[:0] + n.clearBuffers() return nil } diff --git a/cdc/puller/sorter/file_backend.go b/cdc/puller/sorter/file_backend.go index 6ded3d1aaa1..14b50679b17 100644 --- a/cdc/puller/sorter/file_backend.go +++ b/cdc/puller/sorter/file_backend.go @@ -29,7 +29,7 @@ import ( ) const ( - fileBufferSize = 32 * 1024 // 32KB + fileBufferSize = 4 * 1024 // 4KB fileMagic = 0x12345678 numFileEntriesOffset = 4 blockMagic = 0xbeefbeef @@ -160,11 +160,10 @@ func (f *fileBackEnd) cleanStats() { } type fileBackEndReader struct { - backEnd *fileBackEnd - f *os.File - reader *bufio.Reader - rawBytesBuf []byte - isEOF bool + backEnd *fileBackEnd + f *os.File + reader *bufio.Reader + isEOF bool // to prevent truncation-like corruption totalEvents uint64 @@ -237,14 +236,11 @@ func (r *fileBackEndReader) readNext() (*model.PolymorphicEvent, error) { return nil, errors.Trace(wrapIOError(err)) } - if cap(r.rawBytesBuf) < int(size) { - r.rawBytesBuf = make([]byte, size) - } else { - r.rawBytesBuf = r.rawBytesBuf[:size] - } + // Note, do not hold the buffer in reader to avoid hogging memory. + rawBytesBuf := make([]byte, size) // short reads are possible with bufio, hence the need for io.ReadFull - n, err := io.ReadFull(r.reader, r.rawBytesBuf) + n, err := io.ReadFull(r.reader, rawBytesBuf) if err != nil { return nil, errors.Trace(wrapIOError(err)) } @@ -254,12 +250,12 @@ func (r *fileBackEndReader) readNext() (*model.PolymorphicEvent, error) { } event := new(model.PolymorphicEvent) - _, err = r.backEnd.serde.unmarshal(event, r.rawBytesBuf) + _, err = r.backEnd.serde.unmarshal(event, rawBytesBuf) if err != nil { return nil, errors.Trace(err) } - r.readEvents += 1 + r.readEvents++ failpoint.Inject("sorterDebug", func() { r.readBytes += int64(4 + 4 + int(size)) @@ -323,10 +319,9 @@ func (r *fileBackEndReader) resetAndClose() error { } type fileBackEndWriter struct { - backEnd *fileBackEnd - f *os.File - writer *bufio.Writer - rawBytesBuf []byte + backEnd *fileBackEnd + f *os.File + writer *bufio.Writer bytesWritten int64 eventsWritten int64 @@ -349,12 +344,14 @@ func (w *fileBackEndWriter) writeFileHeader() error { func (w *fileBackEndWriter) writeNext(event *model.PolymorphicEvent) error { var err error - w.rawBytesBuf, err = w.backEnd.serde.marshal(event, w.rawBytesBuf) + // Note, do not hold the buffer in writer to avoid hogging memory. + var rawBytesBuf []byte + rawBytesBuf, err = w.backEnd.serde.marshal(event, rawBytesBuf) if err != nil { return errors.Trace(wrapIOError(err)) } - size := len(w.rawBytesBuf) + size := len(rawBytesBuf) if size == 0 { log.Panic("fileSorterBackEnd: serialized to empty byte array. Bug?") } @@ -372,7 +369,7 @@ func (w *fileBackEndWriter) writeNext(event *model.PolymorphicEvent) error { // short writes are possible with bufio offset := 0 for offset < size { - n, err := w.writer.Write(w.rawBytesBuf[offset:]) + n, err := w.writer.Write(rawBytesBuf[offset:]) if err != nil { return errors.Trace(wrapIOError(err)) }