Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal/oplog/oplog_apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ func TestBasicApplyLog(t *testing.T) {
// It seems like the oplog tailing is delayed so calling ot.Close() inmediatelly
// make the oplog tailer to stop before reading all documents
if alternateOplogTest {
for ot.Count() < int64(docCount) {
for ot.Count() < uint64(docCount) {
time.Sleep(time.Second)
}
}

ot.Close()

wg.Wait()
Expand Down
87 changes: 66 additions & 21 deletions internal/oplog/oplog_tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io"
"sync"
"sync/atomic"
"time"

"github.com/globalsign/mgo"
Expand All @@ -20,12 +21,14 @@ type OplogTail struct {
oplogCollection string
startOplogTimestamp *bson.MongoTimestamp
lastOplogTimestamp *bson.MongoTimestamp
stopAtTimestampt *bson.MongoTimestamp

totalSize int64
docsCount int64
totalSize uint64
docsCount uint64
remainingBytes []byte
nextChunkPosition int

wg *sync.WaitGroup
dataChan chan chanDataTye
stopChan chan bool
readFunc func([]byte) (int, error)
Expand Down Expand Up @@ -82,6 +85,7 @@ func open(session *mgo.Session) (*OplogTail, error) {
dataChan: make(chan chanDataTye, 1),
stopChan: make(chan bool),
running: true,
wg: &sync.WaitGroup{},
}
ot.readFunc = makeReader(ot)
return ot, nil
Expand All @@ -92,29 +96,44 @@ func open(session *mgo.Session) (*OplogTail, error) {
func (ot *OplogTail) Read(buf []byte) (int, error) {
n, err := ot.readFunc(buf)
if err == nil {
ot.docsCount++
ot.totalSize += int64(n)
atomic.AddUint64(&ot.docsCount, 1)
atomic.AddUint64(&ot.totalSize, uint64(n))
}
return n, err
}

func (ot *OplogTail) Size() int64 {
return ot.totalSize
func (ot *OplogTail) Size() uint64 {
return atomic.LoadUint64(&ot.totalSize)
}

func (ot *OplogTail) Count() int64 {
return ot.docsCount
func (ot *OplogTail) Count() uint64 {
return atomic.LoadUint64(&ot.docsCount)
}

// Cancel stopts the tailer immediatelly without waiting the tailer to reach the
// document having timestamp = IsMasterDoc().LastWrite.OpTime.Ts
func (ot *OplogTail) Cancel() {
close(ot.stopChan)
ot.wg.Wait()
}

func (ot *OplogTail) Close() error {
if ot.isRunning() {
close(ot.stopChan)
return nil
if !ot.isRunning() {
return fmt.Errorf("Tailer is already closed")
}
if ot.session != nil {
ot.session.Close()

ismaster, err := cluster.NewIsMaster(ot.session)
if err != nil {
close(ot.stopChan)
return fmt.Errorf("Cannot get master doc LastWrite.Optime: %s", err)
}
return fmt.Errorf("Tailer is already closed")

ot.lock.Lock()
ot.stopAtTimestampt = &ismaster.IsMasterDoc().LastWrite.OpTime.Ts
ot.lock.Unlock()

ot.wg.Wait()
return nil
}

func (ot *OplogTail) isRunning() bool {
Expand All @@ -130,9 +149,11 @@ func (ot *OplogTail) setRunning(state bool) {
}

func (ot *OplogTail) tail() {
col := ot.session.DB(oplogDB).C(ot.oplogCollection)
comment := "github.com/percona/mongodb-backup/internal/oplog.(*OplogTail).tail()"
iter := col.Find(ot.tailQuery()).LogReplay().Comment(comment).Batch(mgoIterBatch).Prefetch(mgoIterPrefetch).Tail(-time.Second)
ot.wg.Add(1)
defer ot.wg.Done()
defer close(ot.stopChan)

iter := ot.makeIterator()
for {
select {
case <-ot.stopChan:
Expand All @@ -151,18 +172,42 @@ func (ot *OplogTail) tail() {
ot.startOplogTimestamp = &oplog.Timestamp
}
ot.lastOplogTimestamp = &oplog.Timestamp
continue
}
}
ot.lock.Lock()
if ot.stopAtTimestampt != nil && ot.lastOplogTimestamp != nil &&
ot.lastOplogTimestamp.Time().After(ot.stopAtTimestampt.Time()) {
iter.Close()
ot.lock.Unlock()
return
}
if iter.Timeout() {
if ot.stopAtTimestampt != nil {
iter.Close()
ot.lock.Unlock()
return
}
}
//if iter.Timeout() {
// continue
//}
ot.lock.Unlock()
if iter.Err() != nil {
iter.Close()
iter = ot.makeIterator()
}
iter = col.Find(ot.tailQuery()).LogReplay().Comment(comment).Batch(mgoIterBatch).Prefetch(mgoIterPrefetch).Iter()
}
}

// TODO
// Maybe if stopAtTimestampt is nil, we can replace the timeout by -1 to avoid restarting the
// tailer query unnecessarily but I am following the two rules in the matter of optimization:
// Rule 1: Don't do it.
// Rule 2 (for experts only). Don't do it yet
func (ot *OplogTail) makeIterator() *mgo.Iter {
col := ot.session.DB(oplogDB).C(ot.oplogCollection)
comment := "github.com/percona/mongodb-backup/internal/oplog.(*OplogTail).tail()"
return col.Find(ot.tailQuery()).LogReplay().Comment(comment).Batch(mgoIterBatch).Prefetch(mgoIterPrefetch).Tail(1 * time.Second)
}

// tailQuery returns a bson.M query filter for the oplog tail
// Criteria:
// 1. If 'lastOplogTimestamp' is defined, tail all non-noop oplogs with 'ts' $gt that ts
Expand Down
9 changes: 2 additions & 7 deletions internal/oplog/oplog_tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ const (
)

var (
keepSamples bool
samplesDir string
alternateOplogTest bool
keepSamples bool
samplesDir string
)

func generateOplogTraffic(t *testing.T, session *mgo.Session, stop chan bool) {
Expand All @@ -54,7 +53,6 @@ func generateOplogTraffic(t *testing.T, session *mgo.Session, stop chan bool) {

func TestMain(m *testing.M) {
flag.BoolVar(&keepSamples, "keep-samples", false, "Keep generated bson files")
flag.BoolVar(&alternateOplogTest, "alternate-oplog-test", false, "Use alternate method for oplog tailer test")
flag.Parse()

// Get root repository path using Git
Expand Down Expand Up @@ -396,9 +394,6 @@ func TestReadIntoSmallBuffer(t *testing.T) {
if err != nil {
break
}
if testing.Verbose() {
fmt.Printf("Got %d bytes: %q\n", n, string(buf[:n]))
}
totalSize += n
result = append(result, buf[:n]...)
}
Expand Down