diff --git a/dm/relay/binlog_writer.go b/dm/relay/binlog_writer.go index 1f503935dae..8bbc67b9f0e 100644 --- a/dm/relay/binlog_writer.go +++ b/dm/relay/binlog_writer.go @@ -36,12 +36,15 @@ const ( // BinlogWriter is a binlog event writer which writes binlog events to a file. // Open/Write/Close cannot be called concurrently. type BinlogWriter struct { - offset atomic.Int64 - file *os.File - relayDir string - uuid atomic.String - filename atomic.String - err atomic.Error + // offset means the offset we add to binlog writer + offset atomic.Int64 + // fileOffset means the offset we finish write to the file + fileOffset atomic.Int64 + file *os.File + relayDir string + uuid atomic.String + filename atomic.String + err atomic.Error logger log.Logger @@ -98,6 +101,7 @@ func (w *BinlogWriter) run() { errOccurs = true return } + w.fileOffset.Add(int64(n)) buf.Reset() } @@ -136,7 +140,9 @@ func (w *BinlogWriter) Open(uuid, filename string) error { return terror.ErrBinlogWriterGetFileStat.Delegate(err, f.Name()) } - w.offset.Store(fs.Size()) + size := fs.Size() + w.offset.Store(size) + w.fileOffset.Store(size) w.file = f w.uuid.Store(uuid) w.filename.Store(filename) @@ -168,6 +174,7 @@ func (w *BinlogWriter) Close() error { w.file = nil w.offset.Store(0) + w.fileOffset.Store(0) w.uuid.Store("") w.filename.Store("") w.input = nil @@ -204,5 +211,5 @@ func (w *BinlogWriter) Offset() int64 { } func (w *BinlogWriter) isActive(uuid, filename string) (bool, int64) { - return uuid == w.uuid.Load() && filename == w.filename.Load(), w.offset.Load() + return uuid == w.uuid.Load() && filename == w.filename.Load(), w.fileOffset.Load() }