Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions grpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,9 +672,8 @@ func (c *Client) processStartBackup(msg *pb.StartBackup) {
// Wait until we have at least one document from the tailer to start the backup only after we have
// documents in the oplog tailer.
log.Debug("Waiting oplog first doc")
timeout, err := c.oplogTailer.WaitUntilFirstDocWithTimeout(time.Duration(10 * time.Second))
if err != nil {
err := errors.Wrapf(err, "Cannot read from the oplog tailer: (timeout after %v)", timeout)
if err := c.oplogTailer.WaitUntilFirstDoc(); err != nil {
err := errors.Wrapf(err, "Cannot read from the oplog tailer")
c.oplogTailer.Cancel()
c.logger.Error(err)
finishMsg := &pb.OplogBackupFinishStatus{
Expand Down
98 changes: 50 additions & 48 deletions internal/oplog/oplog_tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ type chanDataTye []byte
type OplogTail struct {
session *mgo.Session
oplogCollection string
startOplogTimestamp *bson.MongoTimestamp
lastOplogTimestamp *bson.MongoTimestamp
stopAtTimestampt *bson.MongoTimestamp
startOplogTimestamp bson.MongoTimestamp
lastOplogTimestamp bson.MongoTimestamp
// This is an int64 and not a bson.MongoTimestamp because we need to use atomic.LoadInt64 (or StoreInt64)
// for performance.
stopAtTimestampt int64

totalSize uint64
docsCount uint64
Expand Down Expand Up @@ -67,7 +69,7 @@ func OpenAt(session *mgo.Session, t time.Time, c uint32) (*OplogTail, error) {
if err != nil {
return nil, err
}
ot.startOplogTimestamp = &mongoTimestamp
ot.startOplogTimestamp = mongoTimestamp
go ot.tail()
return ot, nil
}
Expand Down Expand Up @@ -126,7 +128,7 @@ func open(session *mgo.Session) (*OplogTail, error) {
return ot, nil
}

func (ot *OplogTail) LastOplogTimestamp() *bson.MongoTimestamp {
func (ot *OplogTail) LastOplogTimestamp() bson.MongoTimestamp {
return ot.lastOplogTimestamp
}

Expand Down Expand Up @@ -168,7 +170,7 @@ func (ot *OplogTail) Close() error {
}

ot.lock.Lock()
ot.stopAtTimestampt = &ismaster.IsMasterDoc().LastWrite.OpTime.Ts
atomic.StoreInt64(&ot.stopAtTimestampt, int64(ismaster.IsMasterDoc().LastWrite.OpTime.Ts))
ot.lock.Unlock()

ot.wg.Wait()
Expand All @@ -182,7 +184,7 @@ func (ot *OplogTail) CloseAt(ts bson.MongoTimestamp) error {
}

ot.lock.Lock()
ot.stopAtTimestampt = &ts
atomic.StoreInt64(&ot.stopAtTimestampt, int64(ts))
ot.lock.Unlock()

ot.wg.Wait()
Expand All @@ -205,7 +207,6 @@ func (ot *OplogTail) setRunning(state bool) {
func (ot *OplogTail) tail() {
ot.wg.Add(1)
defer ot.wg.Done()

once := &sync.Once{}

iter := ot.makeIterator()
Expand All @@ -218,61 +219,62 @@ func (ot *OplogTail) tail() {
}
result := bson.Raw{}

if iter.Next(&result) {
oplog := mdbstructs.OplogTimestampOnly{}
data := bson.M{}
err := result.Unmarshal(&data)
err = result.Unmarshal(&oplog)
if err == nil {
ot.lastOplogTimestamp = &oplog.Timestamp
if ot.startOplogTimestamp == nil {
ot.startOplogTimestamp = &oplog.Timestamp
}
ot.lock.Lock()
if ot.stopAtTimestampt != nil {
if ot.lastOplogTimestamp != nil {
if *ot.lastOplogTimestamp > *ot.stopAtTimestampt {
iter.Close()
ot.lock.Unlock()
close(ot.readerStopChan)
return
}
}
}
ot.lock.Unlock()
iter.Next(&result)

once.Do(func() { close(ot.startedReadChan) })
ot.dataChan <- result.Data
continue
}
log.Fatalf("cannot unmarshal oplog doc: %s", err)
if iter.Err() != nil {
iter.Close()
iter = ot.makeIterator()
continue
}
ot.lock.Lock()

once.Do(func() { close(ot.startedReadChan) })

if iter.Timeout() {
if ot.stopAtTimestampt != nil {
ot.lock.Lock()
if ot.stopAtTimestampt != 0 {
iter.Close()
ot.lock.Unlock()
return
}
ot.lock.Unlock()
continue
}
ot.lock.Unlock()
if iter.Err() != nil {
iter.Close()
iter = ot.makeIterator()

once.Do(func() { close(ot.startedReadChan) })

data := bson.M{}
if err := result.Unmarshal(&data); err == nil {
ot.lastOplogTimestamp = data["ts"].(bson.MongoTimestamp)
if ot.startOplogTimestamp == 0 {
ot.startOplogTimestamp = ot.lastOplogTimestamp
}
if atomic.LoadInt64(&ot.stopAtTimestampt) != 0 {
if ot.lastOplogTimestamp != 0 {
if ot.lastOplogTimestamp > bson.MongoTimestamp(ot.stopAtTimestampt) {
iter.Close()
close(ot.readerStopChan)
return
}
}
}

ot.dataChan <- result.Data
} else {
log.Fatalf("cannot unmarshal oplog doc: %s", err)
}
}
}

func (ot *OplogTail) getStopAtTimestamp() *bson.MongoTimestamp {
func (ot *OplogTail) getStopAtTimestamp() bson.MongoTimestamp {
ot.lock.Lock()
defer ot.lock.Unlock()
return ot.stopAtTimestampt
return bson.MongoTimestamp(ot.stopAtTimestampt)
}

func (ot *OplogTail) setStopAtTimestamp(ts bson.MongoTimestamp) {
ot.lock.Lock()
defer ot.lock.Unlock()
ot.stopAtTimestampt = &ts
ot.stopAtTimestampt = int64(ts)
}

// TODO
Expand All @@ -296,12 +298,12 @@ func (ot *OplogTail) tailQuery() bson.M {
query := bson.M{"op": bson.M{"$ne": mdbstructs.OperationNoop}}

ot.lock.Lock()
if ot.lastOplogTimestamp != nil {
query["ts"] = bson.M{"$gt": *ot.lastOplogTimestamp}
if ot.lastOplogTimestamp != 0 {
query["ts"] = bson.M{"$gt": ot.lastOplogTimestamp}
ot.lock.Unlock()
return query
} else if ot.startOplogTimestamp != nil {
query["ts"] = bson.M{"$gte": *ot.startOplogTimestamp}
} else if ot.startOplogTimestamp != 0 {
query["ts"] = bson.M{"$gte": ot.startOplogTimestamp}
ot.lock.Unlock()
return query
}
Expand Down
62 changes: 62 additions & 0 deletions tests/general_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,68 @@ func TestBackup1(t *testing.T) {
d.Stop()
}

func TestBackupWithNoOplogActivity(t *testing.T) {
tmpDir := path.Join(os.TempDir(), "dump_test")
os.RemoveAll(tmpDir) // Cleanup before start. Don't check for errors. The path might not exist
defer os.RemoveAll(tmpDir) // Clean up after testing.
err := os.MkdirAll(tmpDir, os.ModePerm)
if err != nil {
t.Fatalf("Cannot create temp dir %s: %s", tmpDir, err)
}
log.Printf("Using %s as the temporary directory", tmpDir)

d, err := testGrpc.NewGrpcDaemon(context.Background(), tmpDir, t, nil)
if err != nil {
t.Fatalf("cannot start a new gRPC daemon/clients group: %s", err)
}

s1Session, err := mgo.DialWithInfo(testutils.PrimaryDialInfo(t, testutils.MongoDBShard1ReplsetName))
if err != nil {
log.Fatalf("Cannot connect to the DB: %s", err)
}
s1Session.SetMode(mgo.Strong, true)
generateDataToBackup(t, s1Session)

backupNamePrefix := time.Now().UTC().Format(time.RFC3339)

err = d.MessagesServer.StartBackup(&pb.StartBackup{
BackupType: pb.BackupType_BACKUP_TYPE_LOGICAL,
DestinationType: pb.DestinationType_DESTINATION_TYPE_FILE,
CompressionType: pb.CompressionType_COMPRESSION_TYPE_NO_COMPRESSION,
Cypher: pb.Cypher_CYPHER_NO_CYPHER,
OplogStartTime: time.Now().UTC().Unix(),
NamePrefix: backupNamePrefix,
Description: "general_test_backup",
})
if err != nil {
t.Fatalf("Cannot start backup: %s", err)
}
d.MessagesServer.WaitBackupFinish()

// Test list backups
log.Debug("Testing backup metadata")
mdFilename := backupNamePrefix + ".json"
d.MessagesServer.WriteBackupMetadata(mdFilename)
bms, err := d.MessagesServer.ListBackups()
if err != nil {
t.Errorf("Cannot get backups metadata listing: %s", err)
} else {
if bms == nil {
t.Errorf("Backups metadata listing is nil")
} else {
if len(bms) != 1 {
t.Errorf("Backups metadata listing is empty")
}
if _, ok := bms[mdFilename]; !ok {
t.Errorf("Backup metadata for %q doesn't exists", mdFilename)
}
}
}

cleanupDBForRestore(t, s1Session)

d.Stop()
}
func testRestoreWithMetadata(t *testing.T, d *testGrpc.GrpcDaemon, md *pb.BackupMetadata) {
if err := d.MessagesServer.RestoreBackUp(md, true); err != nil {
t.Errorf("Cannot restore using backup metadata: %s", err)
Expand Down