Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions grpc/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ func (a *ApiServer) GetClients(m *pbapi.Empty, stream pbapi.Api_GetClientsServer
return nil
}

// LastBackupMetadata returns the last backup metadata so it can be stored in the local filesystem as JSON
func (a *ApiServer) LastBackupMetadata(ctx context.Context, e *pbapi.Empty) (*pb.BackupMetadata, error) {
return a.messagesServer.LastBackupMetadata(), nil
}

// StartBackup starts a backup by calling server's StartBackup gRPC method
// This call waits until the backup finish
func (a *ApiServer) RunBackup(ctx context.Context, opts *pbapi.RunBackupParams) (*pbapi.Error, error) {
Expand Down
81 changes: 36 additions & 45 deletions grpc/server/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,66 +12,38 @@ import (
"github.com/pkg/errors"
)

type ReplicasetMetadata struct {
ReplicasetUUID string `json:"replicaset_uuid"`
ReplicasetName string `json:"replicaset_name"`
DBBackupName string `json:"db_backup_name"`
OplogBackupName string `json:"oplog_backup_name"`
}

type BackupMetadata struct {
StartTs time.Time `json:"start_ts"`
EndTs time.Time `json:"end_ts"`
BackupType pb.BackupType `json:"backup_type"`
OplogStartTime int64 `json:"oplog_start_time"`
LastOplogTs int64 `json:"last_oplog_ts"`
DestinationType pb.DestinationType `json:"destination_type"`
DestinationDir string `json:"destination_dir"`
Cypher pb.Cypher `json:"cypher"`
CompressionType pb.CompressionType `json:"compression_type"`

lock *sync.Mutex `json:"-"`
Replicasets map[string]ReplicasetMetadata `json:"replicas"` // key is replicaset name
metadata *pb.BackupMetadata
lock *sync.Mutex `json:"-"`
}

func NewBackupMetadata(opts *pb.StartBackup) *BackupMetadata {
return &BackupMetadata{
Replicasets: make(map[string]ReplicasetMetadata),
lock: &sync.Mutex{},
StartTs: time.Now(),
BackupType: opts.GetBackupType(),
DestinationType: opts.GetDestinationType(),
DestinationDir: opts.GetDestinationDir(),
CompressionType: opts.GetCompressionType(),
Cypher: opts.GetCypher(),
metadata: &pb.BackupMetadata{
StartTs: time.Now().UTC().Unix(),
BackupType: opts.GetBackupType(),
DestinationType: opts.GetDestinationType(),
DestinationDir: opts.GetDestinationDir(),
CompressionType: opts.GetCompressionType(),
Cypher: opts.GetCypher(),
Replicasets: make(map[string]*pb.ReplicasetMetadata),
},
lock: &sync.Mutex{},
}
}

func LoadMetadataFromFile(name string) (*BackupMetadata, error) {
buf, err := ioutil.ReadFile(name)
if err != nil {
return nil, err
}
metadata := &BackupMetadata{
Replicasets: make(map[string]ReplicasetMetadata),
lock: &sync.Mutex{},
}
err = json.Unmarshal(buf, metadata)
return metadata, nil
}

// AddReplicaset adds backup info for a replicaset using the replicaset name as the key
func (b *BackupMetadata) AddReplicaset(replName, replUUID, dbBackupName, oplogBackupName string) error {
b.lock.Lock()

if _, ok := b.Replicasets[replName]; ok {
if _, ok := b.metadata.Replicasets[replName]; ok {
return fmt.Errorf("Info for replicaset %s already exists", replName)
}

// Key is replicaset name instead of UUID because the UUID is randomly generated so, on a
// new and shiny environment created to restore a backup, the UUID will be different.
// On restore, we will try to restore each replicaset by name to the matching cluster.
b.Replicasets[replName] = ReplicasetMetadata{
b.metadata.Replicasets[replName] = &pb.ReplicasetMetadata{
ReplicasetUUID: replUUID,
ReplicasetName: replName,
DBBackupName: dbBackupName,
Expand All @@ -82,14 +54,33 @@ func (b *BackupMetadata) AddReplicaset(replName, replUUID, dbBackupName, oplogBa
return nil
}

func LoadMetadataFromFile(name string) (*BackupMetadata, error) {
buf, err := ioutil.ReadFile(name)
if err != nil {
return nil, err
}
metadata := &BackupMetadata{
metadata: &pb.BackupMetadata{
Replicasets: make(map[string]*pb.ReplicasetMetadata),
},
lock: &sync.Mutex{},
}
err = json.Unmarshal(buf, &metadata.metadata)
return metadata, nil
}

func (b *BackupMetadata) Metadata() *pb.BackupMetadata {
return b.metadata
}

func (b *BackupMetadata) RemoveReplicaset(replName string) error {
b.lock.Lock()
defer b.lock.Unlock()

if _, ok := b.Replicasets[replName]; !ok {
if _, ok := b.metadata.Replicasets[replName]; !ok {
return fmt.Errorf("Info for replicaset %s doesn't exists", replName)
}
delete(b.Replicasets, replName)
delete(b.metadata.Replicasets, replName)
return nil
}

Expand All @@ -98,7 +89,7 @@ func (b *BackupMetadata) WriteMetadataToFile(name string) error {
b.lock.Lock()
defer b.lock.Unlock()

buf, err := json.MarshalIndent(b, "", " ")
buf, err := json.MarshalIndent(b.metadata, "", " ")
if err != nil {
return errors.Wrap(err, "cannot encode backup metadata")
}
Expand Down
8 changes: 4 additions & 4 deletions grpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ func (s *MessagesServer) IsShardedSystem() bool {
return false
}

func (s MessagesServer) LastBackupMetadata() *BackupMetadata {
return s.lastBackupMetadata
func (s MessagesServer) LastBackupMetadata() *pb.BackupMetadata {
return s.lastBackupMetadata.Metadata()
}

func (s *MessagesServer) ReplicasetsRunningDBBackup() map[string]*Client {
Expand Down Expand Up @@ -182,7 +182,7 @@ func (s *MessagesServer) ReplicasetsRunningRestore() map[string]*Client {

// RestoreBackUp will run a restore on each client, using the provided backup metadata to choose the source for each
// replicaset.
func (s *MessagesServer) RestoreBackUp(bm *BackupMetadata, SkipUsersAndRoles bool) error {
func (s *MessagesServer) RestoreBackUp(bm *pb.BackupMetadata, SkipUsersAndRoles bool) error {
clients, err := s.BackupSourceByReplicaset()
if err != nil {
return errors.Wrapf(err, "Cannot start backup restore. Cannot find backup source for replicas")
Expand Down Expand Up @@ -282,7 +282,7 @@ func (s *MessagesServer) StartBackup(opts *pb.StartBackup) error {
})
}

metadataFilename := path.Join(s.workDir, fmt.Sprintf("%s.json", s.lastBackupMetadata.StartTs.Format(time.RFC3339)))
metadataFilename := path.Join(s.workDir, fmt.Sprintf("%s.json", time.Unix(s.lastBackupMetadata.Metadata().StartTs, 0).Format(time.RFC3339)))
err = s.lastBackupMetadata.WriteMetadataToFile(metadataFilename)
if err != nil {
log.Warn("Cannot write metadata file %s: %s", metadataFilename, err)
Expand Down
Loading