Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions grpc/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

type ApiServer struct {
messagesServer *server.MessagesServer
workDir string
}

func NewApiServer(server *server.MessagesServer) *ApiServer {
Expand Down Expand Up @@ -67,19 +68,19 @@ func (a *ApiServer) GetClients(m *pbapi.Empty, stream pbapi.Api_GetClientsServer

// LastBackupMetadata returns the last backup metadata so it can be stored in the local filesystem as JSON
func (a *ApiServer) LastBackupMetadata(ctx context.Context, e *pbapi.Empty) (*pb.BackupMetadata, error) {
return a.messagesServer.LastBackupMetadata(), nil
return a.messagesServer.LastBackupMetadata().Metadata(), nil
}

// StartBackup starts a backup by calling server's StartBackup gRPC method
// This call waits until the backup finish
func (a *ApiServer) RunBackup(ctx context.Context, opts *pbapi.RunBackupParams) (*pbapi.Error, error) {

msg := &pb.StartBackup{
OplogStartTime: time.Now().Unix(),
BackupType: pb.BackupType(opts.BackupType),
DestinationType: pb.DestinationType(opts.DestinationType),
CompressionType: pb.CompressionType(opts.CompressionType),
Cypher: pb.Cypher(opts.Cypher),
NamePrefix: time.Now().UTC().Format(time.RFC3339),
// DBBackupName & OplogBackupName are going to be set in server.go
// We cannot set them here because the backup name will include the replicaset name so, it will
// be different for each client/MongoDB instance
Expand All @@ -90,7 +91,9 @@ func (a *ApiServer) RunBackup(ctx context.Context, opts *pbapi.RunBackupParams)
if err := a.messagesServer.StopBalancer(); err != nil {
return &pbapi.Error{Message: err.Error()}, err
}
logger.Debug("Balancer stopped")

logger.Debug("Starting the backup")
if err := a.messagesServer.StartBackup(msg); err != nil {
return &pbapi.Error{Message: err.Error()}, err
}
Expand All @@ -108,9 +111,15 @@ func (a *ApiServer) RunBackup(ctx context.Context, opts *pbapi.RunBackupParams)
a.messagesServer.WaitOplogBackupFinish()
logger.Debug("Oplog finished")

mdFilename := msg.NamePrefix + ".json"

logger.Debugf("Writing metadata to %s", mdFilename)
a.messagesServer.WriteBackupMetadata(mdFilename)

logger.Debug("Starting the balancer")
if err := a.messagesServer.StartBalancer(); err != nil {
return &pbapi.Error{Message: err.Error()}, err
}
logger.Debug("Balancer started")
return &pbapi.Error{}, nil
}
122 changes: 0 additions & 122 deletions grpc/api/api_test.go

This file was deleted.

16 changes: 16 additions & 0 deletions grpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,14 @@ func (c *Client) processStartBalancer() (*pb.ClientMessage, error) {
return nil, err
}
c.logger.Debugf("Balancer has been started by %s", c.nodeName)

out := &pb.ClientMessage{
ClientID: c.id,
Payload: &pb.ClientMessage_AckMsg{AckMsg: &pb.Ack{}},
}
c.logger.Debugf("processStartBalancer Sending ACK message to the gRPC server")
c.streamSend(out)

return nil, nil
}

Expand Down Expand Up @@ -504,7 +512,15 @@ func (c *Client) processStopBalancer() (*pb.ClientMessage, error) {
if err := balancer.StopAndWait(balancerStopRetries, balancerStopTimeout); err != nil {
return nil, err
}

c.logger.Debugf("Balancer has been stopped by %s", c.nodeName)
out := &pb.ClientMessage{
ClientID: c.id,
Payload: &pb.ClientMessage_AckMsg{AckMsg: &pb.Ack{}},
}
c.logger.Debugf("processStopBalancer Sending ACK message to the gRPC server")
c.streamSend(out)

return nil, nil
}

Expand Down
9 changes: 3 additions & 6 deletions grpc/server/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
var (
ClientAlreadyExistsError = fmt.Errorf("Client ID already registered")
UnknownClientID = fmt.Errorf("Unknown client ID")
timeout = 1 * time.Second
timeout = 10 * time.Second
)

type Client struct {
Expand Down Expand Up @@ -203,9 +203,6 @@ func (c *Client) setOplogTailerRunning(status bool) {
}

func (c *Client) setRestoreRunning(status bool) {
fmt.Printf("client %v set restore running lock\n", c.ID)
fmt.Printf("client %v set restore running unlock\n", c.ID)

c.statusLock.Lock()
defer c.statusLock.Unlock()
if status {
Expand Down Expand Up @@ -248,7 +245,7 @@ func (c *Client) startBackup(opts *pb.StartBackup) error {

func (c *Client) startBalancer() error {
err := c.streamSend(&pb.ServerMessage{
Payload: &pb.ServerMessage_StartBalancerMsg{},
Payload: &pb.ServerMessage_StartBalancerMsg{StartBalancerMsg: &pb.StartBalancer{}},
})
if err != nil {
return err
Expand Down Expand Up @@ -284,7 +281,7 @@ func (c *Client) stopBackup() error {

func (c *Client) stopBalancer() error {
err := c.streamSend(&pb.ServerMessage{
Payload: &pb.ServerMessage_StopBalancerMsg{},
Payload: &pb.ServerMessage_StopBalancerMsg{StopBalancerMsg: &pb.StopBalancer{}},
})
if err != nil {
return err
Expand Down
33 changes: 18 additions & 15 deletions grpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package server
import (
"context"
"fmt"
"path"
"path/filepath"
"sync"
"time"

Expand Down Expand Up @@ -139,8 +139,8 @@ func (s *MessagesServer) IsShardedSystem() bool {
return false
}

func (s MessagesServer) LastBackupMetadata() *pb.BackupMetadata {
return s.lastBackupMetadata.Metadata()
func (s MessagesServer) LastBackupMetadata() *BackupMetadata {
return s.lastBackupMetadata
}

func (s *MessagesServer) ReplicasetsRunningDBBackup() map[string]*Client {
Expand Down Expand Up @@ -247,7 +247,6 @@ func (s *MessagesServer) StartBackup(opts *pb.StartBackup) error {
return fmt.Errorf("Cannot start a backup while a restore is still running")
}

ts := time.Now().Format(time.RFC3339)
ext := getFileExtension(pb.CompressionType(opts.CompressionType), pb.Cypher(opts.Cypher))

s.lastBackupMetadata = NewBackupMetadata(opts)
Expand All @@ -265,8 +264,8 @@ func (s *MessagesServer) StartBackup(opts *pb.StartBackup) error {
s.logger.Printf("Starting backup for replicaset %q on client %s %s %s", replName, client.ID, client.NodeName, client.NodeType)
s.replicasRunningBackup[replName] = true

dbBackupName := fmt.Sprintf("%s_%s.dump%s", ts, client.ReplicasetName, ext)
oplogBackupName := fmt.Sprintf("%s_%s.oplog%s", ts, client.ReplicasetName, ext)
dbBackupName := fmt.Sprintf("%s_%s.dump%s", opts.NamePrefix, client.ReplicasetName, ext)
oplogBackupName := fmt.Sprintf("%s_%s.oplog%s", opts.NamePrefix, client.ReplicasetName, ext)

s.lastBackupMetadata.AddReplicaset(client.ReplicasetName, client.ReplicasetUUID, dbBackupName, oplogBackupName)

Expand All @@ -282,11 +281,6 @@ func (s *MessagesServer) StartBackup(opts *pb.StartBackup) error {
})
}

metadataFilename := path.Join(s.workDir, fmt.Sprintf("%s.json", time.Unix(s.lastBackupMetadata.Metadata().StartTs, 0).Format(time.RFC3339)))
err = s.lastBackupMetadata.WriteMetadataToFile(metadataFilename)
if err != nil {
log.Warn("Cannot write metadata file %s: %s", metadataFilename, err)
}
return nil
}

Expand Down Expand Up @@ -369,13 +363,22 @@ func (s *MessagesServer) WaitOplogBackupFinish() {
}

func (s *MessagesServer) WaitRestoreFinish() {
//replicasets := s.ReplicasetsRunningRestore()
//if len(replicasets) == 0 {
// return
//}
replicasets := s.ReplicasetsRunningRestore()
if len(replicasets) == 0 {
return
}
<-s.restoreFinishChan
}

func (s *MessagesServer) WriteBackupMetadata(filename string) error {
return s.lastBackupMetadata.WriteMetadataToFile(filepath.Join(s.workDir, filename))
}

// WorkDir returns the server working directory.
func (s *MessagesServer) WorkDir() string {
return s.workDir
}

// ---------------------------------------------------------------------------------------------------------------------
// gRPC methods
// ---------------------------------------------------------------------------------------------------------------------
Expand Down
Loading