Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay: add async/batch relay writer #7580

Merged
merged 22 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 119 additions & 5 deletions dm/relay/binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package relay

import (
"bytes"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/dm/pkg/log"
Expand All @@ -27,6 +29,12 @@ import (
"go.uber.org/zap"
)

const (
bufferSize = 1 * 1024 * 1024 // 1MB
chanSize = 1024
waitTime = 10 * time.Millisecond
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
)

// BinlogWriter is a binlog event writer which writes binlog events to a file.
type BinlogWriter struct {
mu sync.RWMutex
Expand All @@ -38,6 +46,11 @@ type BinlogWriter struct {
filename string

logger log.Logger

input chan []byte
flushWg sync.WaitGroup
errCh chan error
wg sync.WaitGroup
}

// BinlogWriterStatus represents the status of a BinlogWriter.
Expand All @@ -61,6 +74,71 @@ func NewBinlogWriter(logger log.Logger, relayDir string) *BinlogWriter {
return &BinlogWriter{
logger: logger,
relayDir: relayDir,
errCh: make(chan error, 1),
}
}

// run starts the binlog writer.
func (w *BinlogWriter) run() {
var (
buf = &bytes.Buffer{}
errOccurs bool
)

// writeToFile writes buffer to file
writeToFile := func() {
if buf.Len() == 0 {
return
}

w.mu.Lock()
defer w.mu.Unlock()
if w.file == nil {
select {
case w.errCh <- terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened")):
default:
}
errOccurs = true
return
}
n, err := w.file.Write(buf.Bytes())
if err != nil {
select {
case w.errCh <- terror.ErrBinlogWriterWriteDataLen.Delegate(err, n):
default:
}
errOccurs = true
return
}
buf.Reset()
}

for {
select {
case bs, ok := <-w.input:
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
if !errOccurs {
writeToFile()
}
return
}
if errOccurs {
continue
}
if bs != nil {
buf.Write(bs)
}
if bs == nil || buf.Len() > bufferSize {
writeToFile()
}
if bs == nil {
w.flushWg.Done()
}
case <-time.After(waitTime):
if !errOccurs {
writeToFile()
}
}
}
}

Expand All @@ -87,10 +165,24 @@ func (w *BinlogWriter) Open(uuid, filename string) error {
w.uuid = uuid
w.filename = filename

w.input = make(chan []byte, chanSize)
w.wg.Add(1)
go func() {
defer w.wg.Done()
w.run()
}()

return nil
}

func (w *BinlogWriter) Close() error {
w.mu.Lock()
if w.input != nil {
close(w.input)
}
w.mu.Unlock()
w.wg.Wait()

w.mu.Lock()
defer w.mu.Unlock()

Expand All @@ -107,22 +199,44 @@ func (w *BinlogWriter) Close() error {
w.offset.Store(0)
w.uuid = ""
w.filename = ""
w.input = nil

if writeErr := w.Error(); writeErr != nil {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
return writeErr
}
return err
}

func (w *BinlogWriter) Write(rawData []byte) error {
w.mu.RLock()
defer w.mu.RUnlock()

if w.file == nil {
w.mu.RUnlock()
return terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened"))
}
input := w.input
w.mu.RUnlock()

input <- rawData
w.offset.Add(int64(len(rawData)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BinlogReader will use this offset to check whether there's need to re-parse binlog, maybe move it after we actual write data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use offset to check whether hole exist, so cannot update it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fileOffset := w.out.Offset()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use another offset?

return w.Error()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since every Write need to check Error, Error is a hot function and we'd better using fast implementation. I prefer atomic variable like using atomic.Error and CAS to replace nil to an error

}

n, err := w.file.Write(rawData)
w.offset.Add(int64(n))
func (w *BinlogWriter) Flush() error {
w.flushWg.Add(1)
if err := w.Write(nil); err != nil {
return err
}
w.flushWg.Wait()
return w.Error()
}

return terror.ErrBinlogWriterWriteDataLen.Delegate(err, len(rawData))
func (w *BinlogWriter) Error() error {
select {
case err := <-w.errCh:
return err
default:
return nil
}
}

func (w *BinlogWriter) Status() *BinlogWriterStatus {
Expand Down
4 changes: 3 additions & 1 deletion dm/relay/binlog_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) {

err = w.Write(data1)
c.Assert(err, IsNil)
err = w.Flush()
c.Assert(err, IsNil)
allData.Write(data1)

fwStatus := w.Status()
Expand All @@ -85,7 +87,7 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) {
c.Assert(err, IsNil)
allData.Write(data2)

c.Assert(w.offset.Load(), Equals, int64(allData.Len()))
c.Assert(w.offset.Load(), LessEqual, int64(allData.Len()))

err = w.Close()
c.Assert(err, IsNil)
Expand Down
4 changes: 4 additions & 0 deletions dm/relay/local_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,10 @@ func (m *mockFileWriterForActiveTest) Close() error {
panic("should be used")
}

func (m *mockFileWriterForActiveTest) Flush() error {
panic("should be used")
}

func (m *mockFileWriterForActiveTest) WriteEvent(ev *replication.BinlogEvent) (WResult, error) {
panic("should be used")
}
Expand Down
14 changes: 14 additions & 0 deletions dm/relay/relay_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type Writer interface {
WriteEvent(ev *replication.BinlogEvent) (WResult, error)
// IsActive check whether given uuid+filename is active binlog file, if true return current file offset
IsActive(uuid, filename string) (bool, int64)
// Flush flushes the binlog writer.
Flush() error
}

// FileWriter implements Writer interface.
Expand Down Expand Up @@ -104,6 +106,11 @@ func (w *FileWriter) WriteEvent(ev *replication.BinlogEvent) (WResult, error) {
}
}

// Flush implements Writer.Flush.
func (w *FileWriter) Flush() error {
return w.out.Flush()
}

// offset returns the current offset of the binlog file.
// it is only used for testing now.
func (w *FileWriter) offset() int64 {
Expand Down Expand Up @@ -145,6 +152,10 @@ func (w *FileWriter) handleFormatDescriptionEvent(ev *replication.BinlogEvent) (
if err != nil {
return WResult{}, terror.Annotatef(err, "write binlog file header for %s", fullName)
}
err = w.Flush()
if err != nil {
return WResult{}, terror.Annotatef(err, "write binlog file header for %s", fullName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return WResult{}, terror.Annotatef(err, "write binlog file header for %s", fullName)
return WResult{}, terror.Annotatef(err, "flush binlog file for %s", fullName)

}
}

// write the FormatDescriptionEvent if not exists one
Expand Down Expand Up @@ -277,6 +288,9 @@ func (w *FileWriter) handlePotentialHoleOrDuplicate(ev *replication.BinlogEvent)
}

if mayDuplicate {
if err := w.Flush(); err != nil {
return WResult{}, terror.Annotatef(err, "flush before handle duplicate event %v in %s", ev.Header, w.filename.Load())
}
// handle any duplicate events if exist
result, err2 := w.handleDuplicateEventsExist(ev)
if err2 != nil {
Expand Down
4 changes: 4 additions & 0 deletions dm/relay/relay_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check
filename2 := filepath.Join(relayDir, uuid, nextFilename)
_, err = os.Stat(filename1)
c.Assert(os.IsNotExist(err), check.IsTrue)
c.Assert(w2.Flush(), check.IsNil)
data, err := os.ReadFile(filename2)
c.Assert(err, check.IsNil)
fileHeaderLen := len(replication.BinLogFileHeader)
Expand Down Expand Up @@ -292,6 +293,7 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check
filename2 = filepath.Join(relayDir, uuid, nextFilename)
_, err = os.Stat(filename2)
c.Assert(os.IsNotExist(err), check.IsTrue)
c.Assert(w3.Flush(), check.IsNil)
data, err = os.ReadFile(filename1)
c.Assert(err, check.IsNil)
c.Assert(len(data), check.Equals, fileHeaderLen+len(formatDescEv.RawData))
Expand Down Expand Up @@ -399,6 +401,7 @@ func (t *testFileWriterSuite) TestWriteMultiEvents(c *check.C) {
c.Assert(result.Ignore, check.IsFalse) // no event is ignored
}

c.Assert(w.Flush(), check.IsNil)
t.verifyFilenameOffset(c, w, filename, int64(allData.Len()))

// read the data back from the file
Expand Down Expand Up @@ -448,6 +451,7 @@ func (t *testFileWriterSuite) TestHandleFileHoleExist(c *check.C) {
result, err = w.WriteEvent(queryEv)
c.Assert(err, check.IsNil)
c.Assert(result.Ignore, check.IsFalse)
c.Assert(w.Flush(), check.IsNil)
fileSize := int64(queryEv.Header.LogPos)
t.verifyFilenameOffset(c, w, filename, fileSize)

Expand Down