Skip to content

Commit

Permalink
add fileOffset
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD committed Nov 25, 2022
1 parent 9057ed7 commit 4453b50
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions dm/relay/binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@ const (
// BinlogWriter is a binlog event writer which writes binlog events to a file.
// Open/Write/Close cannot be called concurrently.
type BinlogWriter struct {
offset atomic.Int64
file *os.File
relayDir string
uuid atomic.String
filename atomic.String
err atomic.Error
// offset means the offset we add to binlog writer
offset atomic.Int64
// fileOffset means the offset we finish write to the file
fileOffset atomic.Int64
file *os.File
relayDir string
uuid atomic.String
filename atomic.String
err atomic.Error

logger log.Logger

Expand Down Expand Up @@ -98,6 +101,7 @@ func (w *BinlogWriter) run() {
errOccurs = true
return
}
w.fileOffset.Add(int64(n))
buf.Reset()
}

Expand Down Expand Up @@ -136,7 +140,9 @@ func (w *BinlogWriter) Open(uuid, filename string) error {
return terror.ErrBinlogWriterGetFileStat.Delegate(err, f.Name())
}

w.offset.Store(fs.Size())
size := fs.Size()
w.offset.Store(size)
w.fileOffset.Store(size)
w.file = f
w.uuid.Store(uuid)
w.filename.Store(filename)
Expand Down Expand Up @@ -168,6 +174,7 @@ func (w *BinlogWriter) Close() error {

w.file = nil
w.offset.Store(0)
w.fileOffset.Store(0)
w.uuid.Store("")
w.filename.Store("")
w.input = nil
Expand Down Expand Up @@ -204,5 +211,5 @@ func (w *BinlogWriter) Offset() int64 {
}

func (w *BinlogWriter) isActive(uuid, filename string) (bool, int64) {
return uuid == w.uuid.Load() && filename == w.filename.Load(), w.offset.Load()
return uuid == w.uuid.Load() && filename == w.filename.Load(), w.fileOffset.Load()
}

0 comments on commit 4453b50

Please sign in to comment.