Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/pbm-coordinator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ const (
defaultAPIPort = 10001
defaultShutdownTimeout = 5 // Seconds
defaultClientsLogging = true
defaultDebugMode = true
defaultDebugMode = false
defaultWorkDir = "~/percona-backup-mongodb"
)

Expand Down
5 changes: 4 additions & 1 deletion grpc/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"context"
"os"
"path/filepath"
"time"

Expand All @@ -28,7 +29,9 @@ var (
)

func init() {
logger.SetLevel(logrus.DebugLevel)
if os.Getenv("DEBUG") == "1" {
logger.SetLevel(logrus.DebugLevel)
}
}

func (a *ApiServer) GetClients(m *pbapi.Empty, stream pbapi.Api_GetClientsServer) error {
Expand Down
120 changes: 101 additions & 19 deletions grpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/service/s3"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/golang/snappy"
"github.com/percona/percona-backup-mongodb/bsonfile"
"github.com/percona/percona-backup-mongodb/internal/awsutils"
"github.com/percona/percona-backup-mongodb/internal/backup/dumper"
"github.com/percona/percona-backup-mongodb/internal/cluster"
"github.com/percona/percona-backup-mongodb/internal/oplog"
Expand Down Expand Up @@ -360,7 +363,26 @@ func (c *Client) processIncommingServerMessages() {
case *pb.ServerMessage_ListReplicasets:
c.processListReplicasets()
case *pb.ServerMessage_PingMsg:
c.processPing()
msg := c.processPing()
if err := c.streamSend(msg); err != nil {
c.logger.Errorf("Cannot stream ping response to the server: %s. Out message: %+v. In message type: %T", err, *msg, msg.Payload)
}
continue
case *pb.ServerMessage_CanRestoreBackupMsg:
msg, err := c.processCanRestoreBackup(msg.GetCanRestoreBackupMsg())
if err != nil {
errMsg := &pb.ClientMessage{
ClientId: c.id,
Payload: &pb.ClientMessage_ErrorMsg{ErrorMsg: &pb.Error{Message: err.Error()}},
}
if err = c.streamSend(errMsg); err != nil {
c.logger.Errorf("Cannot send error response (%+v) to the RPC server: %s", msg, err)
}
continue
}
if err = c.streamSend(msg); err != nil {
c.logger.Errorf("Cannot send CanRestoreBackup response (%+v) to the RPC server: %s", msg, err)
}
//
case *pb.ServerMessage_StartBackupMsg:
startBackupMsg := msg.GetStartBackupMsg()
Expand Down Expand Up @@ -429,6 +451,63 @@ func (c *Client) processCancelBackup() error {
return err
}

func (c *Client) processCanRestoreBackup(msg *pb.CanRestoreBackup) (*pb.ClientMessage, error) {
var err error
c.updateClientInfo()
resp := &pb.CanRestoreBackupResponse{
ClientId: c.id,
IsPrimary: c.isMasterDoc.IsMaster && c.isMasterDoc.SetName != "" && c.isMasterDoc.Msg != "isdbgrid",
Replicaset: c.ReplicasetName(),
Host: c.connOpts.Host,
Port: c.connOpts.Port,
}

switch msg.DestinationType {
case pb.DestinationType_DESTINATION_TYPE_FILE:
resp.CanRestore, err = c.checkCanRestoreLocal(msg)
case pb.DestinationType_DESTINATION_TYPE_AWS:
resp.CanRestore, err = c.checkCanRestoreS3(msg)
}
if err != nil {
return nil, err
}

outMsg := &pb.ClientMessage{
ClientId: c.id,
Payload: &pb.ClientMessage_CanRestoreBackupMsg{CanRestoreBackupMsg: resp},
}

return outMsg, nil
}

func (c *Client) checkCanRestoreLocal(msg *pb.CanRestoreBackup) (bool, error) {
path := filepath.Join(c.backupDir, msg.GetDestinationDir(), msg.GetBackupName())
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
return true, nil
}

func (c *Client) checkCanRestoreS3(msg *pb.CanRestoreBackup) (bool, error) {
awsSession, err := awsutils.GetAWSSession()
if err != nil {
return false, fmt.Errorf("Cannot get AWS session: %s", err)
}
svc := s3.New(awsSession)
_, err = awsutils.S3Stat(svc, msg.GetDestinationDir(), msg.GetBackupName())
if err != nil {
if err == awsutils.FileNotFoundError {
return false, nil
} else {
return false, fmt.Errorf("Cannot check if backup exists in S3: %s", err)
}
}
return true, nil
}

func (c *Client) processGetBackupSource() {
c.logger.Debug("Received GetBackupSource command")
r, err := cluster.NewReplset(c.mdbSession)
Expand Down Expand Up @@ -543,9 +622,10 @@ func (c *Client) processListReplicasets() error {
return nil
}

func (c *Client) processPing() {
func (c *Client) processPing() *pb.ClientMessage {
c.logger.Debug("Received Ping command")
c.updateClientInfo()

pongMsg := &pb.Pong{
Timestamp: time.Now().Unix(),
NodeType: c.nodeType,
Expand All @@ -561,10 +641,7 @@ func (c *Client) processPing() {
ClientId: c.id,
Payload: &pb.ClientMessage_PongMsg{PongMsg: pongMsg},
}

if err := c.streamSend(msg); err != nil {
c.logger.Errorf("Cannot stream response to the server: %s. Out message: %+v. In message type: %T", err, *msg, msg.Payload)
}
return msg
}

func (c *Client) processRestore(msg *pb.RestoreBackup) error {
Expand Down Expand Up @@ -720,6 +797,11 @@ func (c *Client) processStatus() {
c.logger.Debug("Received Status command")
c.lock.Lock()

isMaster, err := cluster.NewIsMaster(c.mdbSession)
if err != nil {
log.Errorf("Cannot get IsMaster for processStatus")
}

msg := &pb.ClientMessage{
ClientId: c.id,
Payload: &pb.ClientMessage_StatusMsg{
Expand All @@ -738,6 +820,7 @@ func (c *Client) processStatus() {
CompressionType: c.status.CompressionType,
Cypher: c.status.Cypher,
StartOplogTs: c.status.StartOplogTs,
IsPrimary: isMaster.IsMasterDoc().IsMaster && isMaster.IsMasterDoc().SetName != "" && c.isMasterDoc.Msg != "isdbgrid",
},
},
}
Expand Down Expand Up @@ -825,7 +908,6 @@ func (c *Client) runDBBackup(msg *pb.StartBackup) {
fw, err := os.Create(path.Join(c.backupDir, msg.GetDbBackupName()))
if err != nil {
log.Errorf("Cannot create backup file: %s", err)
// TODO Stream error msg to the server
}
writers = append(writers, fw)
}
Expand Down Expand Up @@ -1087,12 +1169,12 @@ func getNodeType(isMaster *cluster.IsMaster) pb.NodeType {
return pb.NodeType_NODE_TYPE_MONGOD
}

func (c *Client) restoreDBDump(opts *pb.RestoreBackup) (err error) {
func (c *Client) restoreDBDump(msg *pb.RestoreBackup) (err error) {
readers := []io.ReadCloser{}

switch opts.SourceType {
switch msg.SourceType {
case pb.DestinationType_DESTINATION_TYPE_FILE:
reader, err := os.Open(path.Join(c.backupDir, opts.DbSourceName))
reader, err := os.Open(path.Join(c.backupDir, msg.DbSourceName))
if err != nil {
return errors.Wrap(err, "cannot open restore source file")
}
Expand All @@ -1101,7 +1183,7 @@ func (c *Client) restoreDBDump(opts *pb.RestoreBackup) (err error) {
return fmt.Errorf("Restoring from sources other than file is not implemented yet")
}

switch opts.GetCompressionType() {
switch msg.GetCompressionType() {
case pb.CompressionType_COMPRESSION_TYPE_GZIP:
gzr, err := gzip.NewReader(readers[len(readers)-1])
if err != nil {
Expand All @@ -1126,8 +1208,8 @@ func (c *Client) restoreDBDump(opts *pb.RestoreBackup) (err error) {
input := &restore.MongoRestoreInput{
Archive: "-",
DryRun: false,
Host: c.connOpts.Host,
Port: c.connOpts.Port,
Host: msg.Host,
Port: msg.Port,
Username: c.connOpts.User,
Password: c.connOpts.Password,
Gzip: false,
Expand All @@ -1137,7 +1219,7 @@ func (c *Client) restoreDBDump(opts *pb.RestoreBackup) (err error) {
// A real restore would be applied to a just created and empty instance and it should be
// configured to run without user authentication.
// For testing purposes, we can skip restoring users and roles.
SkipUsersAndRoles: opts.SkipUsersAndRoles,
SkipUsersAndRoles: msg.SkipUsersAndRoles,
}

r, err := restore.NewMongoRestore(input)
Expand All @@ -1156,13 +1238,13 @@ func (c *Client) restoreDBDump(opts *pb.RestoreBackup) (err error) {
return nil
}

func (c *Client) restoreOplog(opts *pb.RestoreBackup) (err error) {
func (c *Client) restoreOplog(msg *pb.RestoreBackup) (err error) {
// var reader bsonfile.BSONReader
readers := []io.ReadCloser{}

switch opts.SourceType {
switch msg.SourceType {
case pb.DestinationType_DESTINATION_TYPE_FILE:
filer, err := os.Open(path.Join(c.backupDir, opts.OplogSourceName))
filer, err := os.Open(path.Join(c.backupDir, msg.OplogSourceName))
if err != nil {
return errors.Wrap(err, "cannot open oplog restore source file")
}
Expand All @@ -1171,7 +1253,7 @@ func (c *Client) restoreOplog(opts *pb.RestoreBackup) (err error) {
return fmt.Errorf("Restoring oplogs from sources other than file is not implemented yet")
}

switch opts.GetCompressionType() {
switch msg.GetCompressionType() {
case pb.CompressionType_COMPRESSION_TYPE_GZIP:
gzr, err := gzip.NewReader(readers[len(readers)-1])
if err != nil {
Expand All @@ -1189,7 +1271,7 @@ func (c *Client) restoreOplog(opts *pb.RestoreBackup) (err error) {
bsonReader, err := bsonfile.NewBSONReader(readers[len(readers)-1])

di := &mgo.DialInfo{
Addrs: []string{fmt.Sprintf("%s:%s", c.connOpts.Host, c.connOpts.Port)},
Addrs: []string{fmt.Sprintf("%s:%s", msg.Host, msg.Port)},
Username: c.connOpts.User,
Password: c.connOpts.Password,
}
Expand Down
27 changes: 26 additions & 1 deletion grpc/server/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,30 @@ func newClient(id string, registerMsg *pb.Register, stream pb.Messages_MessagesC
return client
}

func (c *Client) CanRestoreBackup(backupType pb.BackupType, destinationType pb.DestinationType, bucket, name string) (pb.CanRestoreBackupResponse, error) {
if err := c.streamSend(&pb.ServerMessage{
Payload: &pb.ServerMessage_CanRestoreBackupMsg{
CanRestoreBackupMsg: &pb.CanRestoreBackup{
BackupType: backupType,
DestinationType: destinationType,
DestinationDir: bucket,
BackupName: name,
},
},
}); err != nil {
return pb.CanRestoreBackupResponse{}, err
}
msg, err := c.streamRecv()
if err != nil {
return pb.CanRestoreBackupResponse{}, err
}
if canRestoreBackupMsg := msg.GetCanRestoreBackupMsg(); canRestoreBackupMsg != nil {
return *canRestoreBackupMsg, nil
}

return pb.CanRestoreBackupResponse{}, fmt.Errorf("Cannot get CanRestoreBackup Response (response is nil)")
}

// newClient creates a new client in the gRPC server. This client is the one that will handle communications with the
// real client (agent). The method is not exported because only the gRPC server should be able to create a new client.
// func newClient(id, clusterID, nodeName, replicasetUUID, replicasetName string, nodeType pb.NodeType,
Expand Down Expand Up @@ -253,7 +277,6 @@ func (c *Client) ping() error {
if err != nil {
return errors.Wrapf(err, "ping client %s (%s)", c.ID, c.NodeName)
}

pongMsg := msg.GetPongMsg()
c.statusLock.Lock()
c.NodeType = pongMsg.GetNodeType()
Expand All @@ -279,6 +302,8 @@ func (c *Client) restoreBackup(msg *pb.RestoreBackup) error {
Cypher: msg.Cypher,
OplogStartTime: msg.OplogStartTime,
SkipUsersAndRoles: msg.SkipUsersAndRoles,
Host: msg.Host,
Port: msg.Port,
},
},
}
Expand Down
Loading