Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 104 additions & 67 deletions grpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@ type Client struct {
nodeName string
clusterID string
backupDir string
mongoDumper *dumper.Mongodump
oplogTailer *oplog.OplogTail
mdbSession *mgo.Session
grpcClient pb.MessagesClient
connOpts ConnectionOptions
sslOpts SSLOptions
logger *logrus.Logger

mdbSession *mgo.Session
mgoDI *mgo.DialInfo
connOpts ConnectionOptions
sslOpts SSLOptions

mongoDumper *dumper.Mongodump
oplogTailer *oplog.OplogTail
logger *logrus.Logger
grpcClient pb.MessagesClient
dbReconnectChan chan struct{}
//
lock *sync.Mutex
running bool
Expand Down Expand Up @@ -83,6 +87,8 @@ type shardsMap struct {
var (
balancerStopRetries = 3
balancerStopTimeout = 30 * time.Second
dbReconnectInterval = 30 * time.Second
dbPingInterval = 60 * time.Second
)

func NewClient(ctx context.Context, backupDir string, mdbConnOpts ConnectionOptions, mdbSSLOpts SSLOptions,
Expand All @@ -94,6 +100,13 @@ func NewClient(ctx context.Context, backupDir string, mdbConnOpts ConnectionOpti
logger.Out = logrus.StandardLogger().Out
}

grpcClient := pb.NewMessagesClient(conn)

stream, err := grpcClient.MessagesChat(ctx)
if err != nil {
return nil, errors.Wrap(err, "cannot connect to the gRPC server")
}

di := &mgo.DialInfo{
Addrs: []string{mdbConnOpts.Host + ":" + mdbConnOpts.Port},
Username: mdbConnOpts.User,
Expand All @@ -105,68 +118,21 @@ func NewClient(ctx context.Context, backupDir string, mdbConnOpts ConnectionOpti
FailFast: true,
Direct: true,
}
mdbSession, err := mgo.DialWithInfo(di)

mdbSession.SetMode(mgo.Eventual, true)

nodeType, nodeName, err := getNodeTypeAndName(mdbSession)
if err != nil {
return nil, errors.Wrap(err, "Cannot get node type")
}

bi, err := mdbSession.BuildInfo()
if err != nil {
return nil, errors.Wrapf(err, "Cannot get build info")
}
if !bi.VersionAtLeast(3, 4) {
return nil, fmt.Errorf("You need at least MongoDB version 3.4 to run this tool")
}

var replicasetName, replicasetID string

if nodeType != pb.NodeType_MONGOS {
replset, err := cluster.NewReplset(mdbSession)
if err != nil {
return nil, fmt.Errorf("Cannot create a new replicaset instance: %s", err)
}
replicasetName = replset.Name()
replicasetID = replset.ID().Hex()
} else {
nodeName = di.Addrs[0]
}

clusterIDString := ""
if clusterID, _ := cluster.GetClusterID(mdbSession); clusterID != nil {
clusterIDString = clusterID.Hex()
}

grpcClient := pb.NewMessagesClient(conn)

stream, err := grpcClient.MessagesChat(ctx)
if err != nil {
return nil, errors.Wrap(err, "cannot connect to the gRPC server")
}

c := &Client{
id: nodeName,
ctx: ctx,
backupDir: backupDir,
replicasetName: replicasetName,
replicasetID: replicasetID,
grpcClient: grpcClient,
mdbSession: mdbSession,
nodeName: nodeName,
nodeType: nodeType,
clusterID: clusterIDString,
stream: stream,
ctx: ctx,
backupDir: backupDir,
grpcClient: grpcClient,
stream: stream,
status: pb.Status{
BackupType: pb.BackupType_LOGICAL,
},
connOpts: mdbConnOpts,
sslOpts: mdbSSLOpts,
logger: logger,
lock: &sync.Mutex{},
running: true,
connOpts: mdbConnOpts,
sslOpts: mdbSSLOpts,
logger: logger,
lock: &sync.Mutex{},
running: true,
mgoDI: di,
dbReconnectChan: make(chan struct{}),
// This lock is used to sync the access to the stream Send() method.
// For example, if the backup is running, we can receive a Ping request from
// the server but while we are sending the Ping response, the backup can finish
Expand All @@ -177,16 +143,61 @@ func NewClient(ctx context.Context, backupDir string, mdbConnOpts ConnectionOpti
streamLock: &sync.Mutex{},
}

if err := c.dbConnect(); err != nil {
return nil, errors.Wrap(err, "cannot connect to the database")
}

if err := c.register(); err != nil {
return nil, err
}

// start listening server messages
go c.processIncommingServerMessages()
go c.dbWatchdog()

return c, nil
}

func (c *Client) dbConnect() (err error) {
c.mdbSession, err = mgo.DialWithInfo(c.mgoDI)
if err != nil {
return err
}
c.mdbSession.SetMode(mgo.Eventual, true)

c.nodeType, c.nodeName, err = getNodeTypeAndName(c.mdbSession)
if err != nil {
return errors.Wrap(err, "Cannot get node type")
}
// Review this ID. We need to decide if we want a random ID or not
c.id = c.nodeName

bi, err := c.mdbSession.BuildInfo()
if err != nil {
return errors.Wrapf(err, "Cannot get build info")
}
if !bi.VersionAtLeast(3, 4) {
return fmt.Errorf("You need at least MongoDB version 3.4 to run this tool")
}

if c.nodeType != pb.NodeType_MONGOS {
replset, err := cluster.NewReplset(c.mdbSession)
if err != nil {
return fmt.Errorf("Cannot create a new replicaset instance: %s", err)
}
c.replicasetName = replset.Name()
c.replicasetID = replset.ID().Hex()
} else {
c.nodeName = c.mgoDI.Addrs[0]
}

if clusterID, _ := cluster.GetClusterID(c.mdbSession); clusterID != nil {
c.clusterID = clusterID.Hex()
}

return nil
}

func (c *Client) ID() string {
return c.id
}
Expand Down Expand Up @@ -294,8 +305,6 @@ func (c *Client) processIncommingServerMessages() {
continue
}

//var response *pb.ClientMessage

c.logger.Debugf("Incoming message: %+v", msg)
switch msg.Payload.(type) {
case *pb.ServerMessage_GetStatusMsg:
Expand Down Expand Up @@ -329,6 +338,34 @@ func (c *Client) processIncommingServerMessages() {
}
}

func (c *Client) dbWatchdog() {
for {
select {
case <-time.After(dbPingInterval):
if err := c.mdbSession.Ping(); err != nil {
c.dbReconnect()
}
case <-c.dbReconnectChan:
c.dbReconnect()
case <-c.ctx.Done():
return
}
}
}

func (c *Client) dbReconnect() {
for {
if err := c.dbConnect(); err == nil {
return
}
select {
case <-time.After(dbReconnectInterval):
case <-c.ctx.Done():
return
}
}
}

func (c *Client) processCancelBackup() error {
err := c.mongoDumper.Stop()
c.oplogTailer.Cancel()
Expand Down