-
Notifications
You must be signed in to change notification settings - Fork 288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
relay: add async/batch relay writer #7580
Changes from 9 commits
dc9edc5
a13cb43
7f9bfd6
20c870a
177060e
51e9fc7
ab28ad7
0ee15f3
9057ed7
4453b50
6ed8b7c
2a44a25
e3fa741
b0b75a0
63dfc31
71a7896
d0641b4
60cea13
570fdb6
4d0a324
b190706
95d11ea
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -14,6 +14,7 @@ | |||
package relay | ||||
|
||||
import ( | ||||
"bytes" | ||||
"encoding/json" | ||||
"fmt" | ||||
"os" | ||||
|
@@ -27,17 +28,26 @@ import ( | |||
"go.uber.org/zap" | ||||
) | ||||
|
||||
const ( | ||||
bufferSize = 1 * 1024 * 1024 // 1MB | ||||
chanSize = 1024 | ||||
) | ||||
|
||||
// BinlogWriter is a binlog event writer which writes binlog events to a file. | ||||
// Open/Write/Close cannot be called concurrently. | ||||
type BinlogWriter struct { | ||||
mu sync.RWMutex | ||||
|
||||
offset atomic.Int64 | ||||
file *os.File | ||||
relayDir string | ||||
uuid string | ||||
filename string | ||||
uuid atomic.String | ||||
filename atomic.String | ||||
err atomic.Error | ||||
|
||||
logger log.Logger | ||||
|
||||
input chan []byte | ||||
flushWg sync.WaitGroup | ||||
wg sync.WaitGroup | ||||
} | ||||
|
||||
// BinlogWriterStatus represents the status of a BinlogWriter. | ||||
|
@@ -64,6 +74,53 @@ func NewBinlogWriter(logger log.Logger, relayDir string) *BinlogWriter { | |||
} | ||||
} | ||||
|
||||
// run starts the binlog writer. | ||||
func (w *BinlogWriter) run() { | ||||
var ( | ||||
buf = &bytes.Buffer{} | ||||
errOccurs bool | ||||
) | ||||
|
||||
// writeToFile writes buffer to file | ||||
writeToFile := func() { | ||||
if buf.Len() == 0 { | ||||
return | ||||
} | ||||
|
||||
if w.file == nil { | ||||
w.err.CompareAndSwap(nil, terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened"))) | ||||
errOccurs = true | ||||
return | ||||
} | ||||
n, err := w.file.Write(buf.Bytes()) | ||||
if err != nil { | ||||
w.err.CompareAndSwap(nil, terror.ErrBinlogWriterWriteDataLen.Delegate(err, n)) | ||||
errOccurs = true | ||||
return | ||||
} | ||||
buf.Reset() | ||||
} | ||||
|
||||
for bs := range w.input { | ||||
if errOccurs { | ||||
continue | ||||
} | ||||
if bs != nil { | ||||
buf.Write(bs) | ||||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
} | ||||
// we use bs = nil to mean flush | ||||
if bs == nil || buf.Len() > bufferSize || len(w.input) == 0 { | ||||
D3Hunter marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
writeToFile() | ||||
} | ||||
if bs == nil { | ||||
w.flushWg.Done() | ||||
} | ||||
} | ||||
if !errOccurs { | ||||
writeToFile() | ||||
} | ||||
} | ||||
|
||||
func (w *BinlogWriter) Open(uuid, filename string) error { | ||||
fullName := filepath.Join(w.relayDir, uuid, filename) | ||||
f, err := os.OpenFile(fullName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o600) | ||||
|
@@ -79,58 +136,65 @@ func (w *BinlogWriter) Open(uuid, filename string) error { | |||
return terror.ErrBinlogWriterGetFileStat.Delegate(err, f.Name()) | ||||
} | ||||
|
||||
w.mu.Lock() | ||||
defer w.mu.Unlock() | ||||
|
||||
w.offset.Store(fs.Size()) | ||||
w.file = f | ||||
w.uuid = uuid | ||||
w.filename = filename | ||||
w.uuid.Store(uuid) | ||||
w.filename.Store(filename) | ||||
|
||||
w.input = make(chan []byte, chanSize) | ||||
w.wg.Add(1) | ||||
go func() { | ||||
defer w.wg.Done() | ||||
w.run() | ||||
}() | ||||
|
||||
return nil | ||||
} | ||||
|
||||
func (w *BinlogWriter) Close() error { | ||||
w.mu.Lock() | ||||
defer w.mu.Unlock() | ||||
if w.input != nil { | ||||
close(w.input) | ||||
} | ||||
w.wg.Wait() | ||||
|
||||
var err error | ||||
if w.file != nil { | ||||
err2 := w.file.Sync() // try sync manually before close. | ||||
if err2 != nil { | ||||
w.logger.Error("fail to flush buffered data", zap.String("component", "file writer"), zap.Error(err2)) | ||||
if err := w.file.Sync(); err != nil { | ||||
w.logger.Error("fail to flush buffered data", zap.String("component", "file writer"), zap.Error(err)) | ||||
} | ||||
if err := w.file.Close(); err != nil { | ||||
w.err.CompareAndSwap(nil, err) | ||||
} | ||||
err = w.file.Close() | ||||
} | ||||
|
||||
w.file = nil | ||||
w.offset.Store(0) | ||||
w.uuid = "" | ||||
w.filename = "" | ||||
|
||||
return err | ||||
w.uuid.Store("") | ||||
w.filename.Store("") | ||||
w.input = nil | ||||
return w.err.Swap(nil) | ||||
} | ||||
|
||||
func (w *BinlogWriter) Write(rawData []byte) error { | ||||
w.mu.RLock() | ||||
defer w.mu.RUnlock() | ||||
|
||||
if w.file == nil { | ||||
return terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened")) | ||||
} | ||||
w.input <- rawData | ||||
w.offset.Add(int64(len(rawData))) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we use offset to check whether hole exist, so cannot update it later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tiflow/dm/relay/relay_writer.go Line 304 in d98f885
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use another offset? |
||||
return w.err.Load() | ||||
} | ||||
|
||||
n, err := w.file.Write(rawData) | ||||
w.offset.Add(int64(n)) | ||||
|
||||
return terror.ErrBinlogWriterWriteDataLen.Delegate(err, len(rawData)) | ||||
func (w *BinlogWriter) Flush() error { | ||||
w.flushWg.Add(1) | ||||
if err := w.Write(nil); err != nil { | ||||
return err | ||||
} | ||||
w.flushWg.Wait() | ||||
return w.err.Load() | ||||
} | ||||
|
||||
func (w *BinlogWriter) Status() *BinlogWriterStatus { | ||||
w.mu.RLock() | ||||
defer w.mu.RUnlock() | ||||
|
||||
return &BinlogWriterStatus{ | ||||
Filename: w.filename, | ||||
Filename: w.filename.Load(), | ||||
Offset: w.offset.Load(), | ||||
} | ||||
} | ||||
|
@@ -140,7 +204,5 @@ func (w *BinlogWriter) Offset() int64 { | |||
} | ||||
|
||||
func (w *BinlogWriter) isActive(uuid, filename string) (bool, int64) { | ||||
w.mu.RLock() | ||||
defer w.mu.RUnlock() | ||||
return uuid == w.uuid && filename == w.filename, w.offset.Load() | ||||
return uuid == w.uuid.Load() && filename == w.filename.Load(), w.offset.Load() | ||||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -57,6 +57,8 @@ type Writer interface { | |||||
WriteEvent(ev *replication.BinlogEvent) (WResult, error) | ||||||
// IsActive check whether given uuid+filename is active binlog file, if true return current file offset | ||||||
IsActive(uuid, filename string) (bool, int64) | ||||||
// Flush flushes the binlog writer. | ||||||
Flush() error | ||||||
} | ||||||
|
||||||
// FileWriter implements Writer interface. | ||||||
|
@@ -104,6 +106,11 @@ func (w *FileWriter) WriteEvent(ev *replication.BinlogEvent) (WResult, error) { | |||||
} | ||||||
} | ||||||
|
||||||
// Flush implements Writer.Flush. | ||||||
func (w *FileWriter) Flush() error { | ||||||
return w.out.Flush() | ||||||
} | ||||||
|
||||||
// offset returns the current offset of the binlog file. | ||||||
// it is only used for testing now. | ||||||
func (w *FileWriter) offset() int64 { | ||||||
|
@@ -145,6 +152,10 @@ func (w *FileWriter) handleFormatDescriptionEvent(ev *replication.BinlogEvent) ( | |||||
if err != nil { | ||||||
return WResult{}, terror.Annotatef(err, "write binlog file header for %s", fullName) | ||||||
} | ||||||
err = w.Flush() | ||||||
if err != nil { | ||||||
return WResult{}, terror.Annotatef(err, "write binlog file header for %s", fullName) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
} | ||||||
|
||||||
// write the FormatDescriptionEvent if not exists one | ||||||
|
@@ -277,6 +288,9 @@ func (w *FileWriter) handlePotentialHoleOrDuplicate(ev *replication.BinlogEvent) | |||||
} | ||||||
|
||||||
if mayDuplicate { | ||||||
if err := w.Flush(); err != nil { | ||||||
return WResult{}, terror.Annotatef(err, "flush before handle duplicate event %v in %s", ev.Header, w.filename.Load()) | ||||||
} | ||||||
// handle any duplicate events if exist | ||||||
result, err2 := w.handleDuplicateEventsExist(ev) | ||||||
if err2 != nil { | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry seems the
nil
for CAS is not what I expectedCan we use the
nilErr
variable or simplyStore
? but in the latter we will store the last error not the first one.