Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 24 additions & 17 deletions grpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sync"
"time"

"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/golang/snappy"
Expand All @@ -27,6 +26,7 @@ import (
"github.com/pierrec/lz4"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -432,8 +432,9 @@ func (c *Client) processGetBackupSource() {
ClientId: c.id,
Payload: &pb.ClientMessage_BackupSourceMsg{BackupSourceMsg: &pb.BackupSource{SourceClient: c.nodeName}},
}
c.logger.Debugf("Sending GetBackupSource response to the RPC server: %+v", *msg)
c.streamSend(msg)
if err := c.streamSend(msg); err != nil {
log.Errorf("cannot send processGetBackupSource error message: %s", err)
}
return
}

Expand All @@ -459,7 +460,9 @@ func (c *Client) processGetBackupSource() {
Payload: &pb.ClientMessage_BackupSourceMsg{BackupSourceMsg: &pb.BackupSource{SourceClient: winner}},
}
c.logger.Debugf("%s -> Sending GetBackupSource response to the RPC server: %+v (winner: %q)", c.nodeName, *msg, winner)
c.streamSend(msg)
if err := c.streamSend(msg); err != nil {
log.Errorf("cannot send processGetBackupSource message to the server: %s", err)
}
}

func (c *Client) processLastOplogTs() error {
Expand Down Expand Up @@ -529,7 +532,9 @@ func (c *Client) processListReplicasets() error {
ClientId: c.id,
Payload: &pb.ClientMessage_ReplicasetsMsg{ReplicasetsMsg: &pb.Replicasets{Replicasets: replicasets}},
}
c.streamSend(msg)
if err := c.streamSend(msg); err != nil {
log.Errorf("cannot send processListReplicasets message to the server: %v", err)
}
return nil
}

Expand Down Expand Up @@ -568,6 +573,8 @@ func (c *Client) processRestore(msg *pb.RestoreBackup) error {
c.lock.Unlock()
}()

c.sendACK()

if err := c.restoreDBDump(msg); err != nil {
err := errors.Wrap(err, "cannot restore DB backup")
c.sendRestoreComplete(err)
Expand Down Expand Up @@ -640,7 +647,9 @@ func (c *Client) processStartBackup(msg *pb.StartBackup) {
c.sendError(fmt.Errorf("%s is not a directory", c.backupDir))
return
}

// Send the ACK message and work on the background. When the process finishes, it will send the
// gRPC messages to signal that the backup has been completed
c.sendACK()
// There is a delay when starting a new go-routine. We need to instantiate c.oplogTailer here otherwise
// if we run go c.runOplogBackup(msg) and then WaitUntilFirstDoc(), the oplogTailer can be nill because
// of the delay
Expand All @@ -663,14 +672,6 @@ func (c *Client) processStartBackup(msg *pb.StartBackup) {
// documents in the oplog tailer.
c.oplogTailer.WaitUntilFirstDoc()
go c.runDBBackup(msg)

response := &pb.ClientMessage{
ClientId: c.id,
Payload: &pb.ClientMessage_AckMsg{AckMsg: &pb.Ack{}},
}
if err = c.streamSend(response); err != nil {
c.logger.Errorf("processStartBackup error: cannot stream response to the server: %s. Out message: %+v. In message type: %T", err, *response, response.Payload)
}
}

func (c *Client) processStartBalancer() (*pb.ClientMessage, error) {
Expand All @@ -688,7 +689,9 @@ func (c *Client) processStartBalancer() (*pb.ClientMessage, error) {
Payload: &pb.ClientMessage_AckMsg{AckMsg: &pb.Ack{}},
}
c.logger.Debugf("processStartBalancer Sending ACK message to the gRPC server")
c.streamSend(out)
if err := c.streamSend(out); err != nil {
log.Errorf("cannot send processStartBalancer response to the server: %s", err)
}

return nil, nil
}
Expand Down Expand Up @@ -741,7 +744,9 @@ func (c *Client) processStopBalancer() (*pb.ClientMessage, error) {
Payload: &pb.ClientMessage_AckMsg{AckMsg: &pb.Ack{}},
}
c.logger.Debugf("processStopBalancer Sending ACK message to the gRPC server")
c.streamSend(out)
if err := c.streamSend(out); err != nil {
log.Errorf("cannot send processStopBalancer response to the server: %s", err)
}

return nil, nil
}
Expand All @@ -753,7 +758,9 @@ func (c *Client) processStopOplogTail(msg *pb.StopOplogTail) {
Payload: &pb.ClientMessage_AckMsg{AckMsg: &pb.Ack{}},
}
c.logger.Debugf("Sending ACK message to the gRPC server")
c.streamSend(out)
if err := c.streamSend(out); err != nil {
log.Errorf("cannot send processStopOplogTail ACK to the server: %s", err)
}

c.setOplogBackupRunning(false)

Expand Down
9 changes: 5 additions & 4 deletions grpc/server/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"fmt"
"runtime/debug"
"sync"
"time"

Expand All @@ -13,7 +14,8 @@ import (
var (
ClientAlreadyExistsError = fmt.Errorf("Client ID already registered")
UnknownClientID = fmt.Errorf("Unknown client ID")
timeout = 10 * time.Second
// This variable is exported because in tests we might want to change it
Timeout = 10000 * time.Millisecond
)

type Client struct {
Expand Down Expand Up @@ -204,7 +206,6 @@ func (c *Client) getPrimaryLastOplogTs() (int64, error) {
return 0, errors.Wrapf(err, "cannot get LastOplogTs from the primary node %s", c.NodeName)
}

fmt.Printf("getPrimaryLastOplogTs %T %+v\n", response.Payload, response.Payload)
switch response.Payload.(type) {
case *pb.ClientMessage_ErrorMsg:
return 0, fmt.Errorf("Cannot list shards on client %s: %s", c.NodeName, response.GetErrorMsg())
Expand Down Expand Up @@ -431,8 +432,8 @@ func (c *Client) streamRecv() (*pb.ClientMessage, error) {
select {
case msg := <-c.streamRecvChan:
return msg, nil
case <-time.After(timeout):
return nil, fmt.Errorf("Timeout reading from the stream")
case <-time.After(Timeout):
return nil, fmt.Errorf("Timeout reading from the stream: \n" + string(debug.Stack()))
}
}

Expand Down
2 changes: 1 addition & 1 deletion grpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (s *MessagesServer) RestoreBackUp(bm *pb.BackupMetadata, skipUsersAndRoles
}

s.reset()
//s.setRestoreRunning(true)
s.setRestoreRunning(true)

for replName, client := range clients {
s.logger.Infof("Starting restore for replicaset %q on client %s %s %s", replName, client.ID, client.NodeName, client.NodeType)
Expand Down
51 changes: 51 additions & 0 deletions internal/oplog/oplog_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,71 @@ import (
"github.com/prometheus/common/log"
)

type checker func(bson.MongoTimestamp, bson.MongoTimestamp) bool

type OplogApply struct {
dbSession *mgo.Session
bsonReader bsonfile.BSONReader
lock *sync.Mutex
docsCount int64
stopAtTs bson.MongoTimestamp
fcheck checker
}

func NewOplogApply(session *mgo.Session, r bsonfile.BSONReader) (*OplogApply, error) {
return &OplogApply{
bsonReader: r,
dbSession: session.Clone(),
lock: &sync.Mutex{},
stopAtTs: -1,
fcheck: noCheck,
}, nil
}

func NewOplogApplyUntil(session *mgo.Session, r bsonfile.BSONReader, stopAtTs bson.MongoTimestamp) (*OplogApply, error) {
return &OplogApply{
bsonReader: r,
dbSession: session.Clone(),
lock: &sync.Mutex{},
stopAtTs: stopAtTs,
fcheck: check,
}, nil
}

func noCheck(ts, stopAt bson.MongoTimestamp) bool {
return false
}

func check(ts, stopAt bson.MongoTimestamp) bool {
if ts > stopAt {
return true
}
return false
}

func (oa *OplogApply) Run() error {
oa.docsCount = 0
for {
/* dest:
bson.M{
"o": bson.M{
"_id": "[\xef<\x19f\xa11V\xec5>*",
"id": int(49),
"name": "name_049",
},
"ts": bson.MongoTimestamp(6624579654957137951),
"t": int64(1),
"h": int64(-8478451192930320621),
"v": int(2),
"op": "i",
"ns": "test.test_collection",
"wall": time.Time{
wall: 0x1d905c0,
ext: 63678001945,
loc: (*time.Location)(nil),
},
}
*/
dest := bson.M{}
if err := oa.bsonReader.UnmarshalNext(dest); err != nil {
if err == io.EOF {
Expand All @@ -37,6 +84,10 @@ func (oa *OplogApply) Run() error {
return err
}

//if oa.fcheck(dest["ts"].(bson.MongoTimestamp), oa.stopAtTs) {
// return nil
//}

result := bson.M{}
err := oa.dbSession.Run(bson.M{"applyOps": []bson.M{dest}}, result)
if err != nil {
Expand Down