Skip to content

Commit 8bb1105

Browse files
PMB-54
1 parent 680b2d0 commit 8bb1105

File tree

7 files changed

+200
-28
lines changed

7 files changed

+200
-28
lines changed

cli/mongodb-backupd/main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type cliOptions struct {
2323
app *kingpin.Application
2424
cmd string
2525
tls *bool
26+
workDir *string
2627
certFile *string
2728
keyFile *string
2829
grpcPort *int
@@ -46,6 +47,7 @@ func processCliParams() (*cliOptions, error) {
4647
opts := &cliOptions{
4748
app: app,
4849
tls: app.Flag("tls", "Enable TLS").Bool(),
50+
workDir: app.Flag("work-dir", "Working directory for backup metadata").String(),
4951
certFile: app.Flag("cert-file", "Cert file for gRPC client connections").String(),
5052
keyFile: app.Flag("key-file", "Key file for gRPC client connections").String(),
5153
grpcPort: app.Flag("grpc-port", "Listening port for client connections").Default(defaultGrpcPort).Int(),
@@ -97,7 +99,7 @@ func main() {
9799
wg := &sync.WaitGroup{}
98100

99101
grpcServer := grpc.NewServer(grpcOpts...)
100-
messagesServer := server.NewMessagesServer(log)
102+
messagesServer := server.NewMessagesServer(*opts.workDir, log)
101103
pb.RegisterMessagesServer(grpcServer, messagesServer)
102104

103105
wg.Add(1)

grpc/api/api.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ func (a *ApiServer) RunBackup(ctx context.Context, opts *pbapi.RunBackupParams)
8080
Cypher: pb.Cypher(opts.Cypher),
8181
}
8282

83+
logger.Debug("Stopping the balancer")
84+
if err := a.messagesServer.StopBalancer(); err != nil {
85+
return &pbapi.Error{Message: err.Error()}, err
86+
}
87+
8388
if err := a.messagesServer.StartBackup(msg); err != nil {
8489
return &pbapi.Error{Message: err.Error()}, err
8590
}
@@ -97,5 +102,9 @@ func (a *ApiServer) RunBackup(ctx context.Context, opts *pbapi.RunBackupParams)
97102
a.messagesServer.WaitOplogBackupFinish()
98103
logger.Debug("Oplog finished")
99104

105+
logger.Debug("Starting the balancer")
106+
if err := a.messagesServer.StartBalancer(); err != nil {
107+
return &pbapi.Error{Message: err.Error()}, err
108+
}
100109
return &pbapi.Error{}, nil
101110
}

grpc/server/clients.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,32 @@ func (c *Client) Status() (pb.Status, error) {
134134
return *statusMsg, nil
135135
}
136136

137+
func (c *Client) startBalancer() error {
138+
err := c.streamSend(&pb.ServerMessage{
139+
Payload: &pb.ServerMessage_StartBalancerMsg{},
140+
})
141+
if err != nil {
142+
return err
143+
}
144+
msg, err := c.streamRecv()
145+
if err != nil {
146+
return err
147+
}
148+
149+
switch msg.Payload.(type) {
150+
case *pb.ClientMessage_AckMsg:
151+
return nil
152+
case *pb.ClientMessage_ErrorMsg:
153+
errMsg := msg.GetErrorMsg()
154+
return fmt.Errorf("%s", errMsg.Message)
155+
}
156+
return fmt.Errorf("unknown respose type %T", msg)
157+
}
158+
137159
func (c *Client) stopBalancer() error {
138160
err := c.streamSend(&pb.ServerMessage{
139161
Type: pb.ServerMessage_STOP_BALANCER,
140-
Payload: &pb.ServerMessage_EmptyMsg{},
162+
Payload: &pb.ServerMessage_StopBalancerMsg{},
141163
})
142164
if err != nil {
143165
return err

grpc/server/metadata.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package server
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"io/ioutil"
7+
"os"
8+
"sync"
9+
"time"
10+
11+
pb "github.com/percona/mongodb-backup/proto/messages"
12+
"github.com/pkg/errors"
13+
)
14+
15+
type ReplicasetMetadata struct {
16+
ReplicasetUUID string `json:"replicaset_uuid"`
17+
ReplicasetName string `json:"replicaset_name"`
18+
DestinationName string `json:"destination_name"`
19+
}
20+
21+
type BackupMetadata struct {
22+
StartTs time.Time `json:"start_ts"`
23+
EndTs time.Time `json:"end_ts"`
24+
BackupType pb.BackupType `json:"backup_type"`
25+
OplogStartTime int64 `json:"oplog_start_time"`
26+
LastOplogTs int64 `json:"last_oplog_ts"`
27+
DestinationType pb.DestinationType `json:"destination_type"`
28+
DestinationDir string `json:"destination_dir"`
29+
Cypher pb.Cypher `json:"cypher"`
30+
CompressionType pb.CompressionType `json:"compression_type"`
31+
32+
lock *sync.Mutex `json:"-"`
33+
Replicasets map[string]ReplicasetMetadata `json:"replicas"` // key is replicaset name
34+
}
35+
36+
func NewBackupMetadata() *BackupMetadata {
37+
return &BackupMetadata{
38+
Replicasets: make(map[string]ReplicasetMetadata),
39+
lock: &sync.Mutex{},
40+
}
41+
}
42+
43+
func LoadMetadataFromFile(name string) (*BackupMetadata, error) {
44+
buf, err := ioutil.ReadFile(name)
45+
if err != nil {
46+
return nil, err
47+
}
48+
metadata := &BackupMetadata{
49+
Replicasets: make(map[string]ReplicasetMetadata),
50+
lock: &sync.Mutex{},
51+
}
52+
err = json.Unmarshal(buf, metadata)
53+
return metadata, nil
54+
}
55+
56+
// AddReplicaset adds backup info for a replicaset using the replicaset name as the key
57+
func (b *BackupMetadata) AddReplicaset(replName, replUUID, destinationName string) error {
58+
b.lock.Lock()
59+
60+
if _, ok := b.Replicasets[replName]; ok {
61+
return fmt.Errorf("Info for replicaset %s already exists", replName)
62+
}
63+
64+
b.Replicasets[replName] = ReplicasetMetadata{
65+
ReplicasetUUID: replUUID,
66+
ReplicasetName: replName,
67+
DestinationName: destinationName,
68+
}
69+
70+
b.lock.Unlock()
71+
return nil
72+
}
73+
74+
func (b *BackupMetadata) RemoveReplicaset(replName string) error {
75+
b.lock.Lock()
76+
defer b.lock.Unlock()
77+
78+
if _, ok := b.Replicasets[replName]; !ok {
79+
return fmt.Errorf("Info for replicaset %s doesn't exists", replName)
80+
}
81+
delete(b.Replicasets, replName)
82+
return nil
83+
}
84+
85+
// WriteMetadataToFile writes the backup metadata to a file as JSON
86+
func (b *BackupMetadata) WriteMetadataToFile(name string) error {
87+
b.lock.Lock()
88+
defer b.lock.Unlock()
89+
90+
buf, err := json.Marshal(b)
91+
if err != nil {
92+
return errors.Wrap(err, "cannot encode backup metadata")
93+
}
94+
if err = ioutil.WriteFile(name, buf, os.ModePerm); err != nil {
95+
return err
96+
}
97+
return nil
98+
}

grpc/server/server.go

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package server
33
import (
44
"context"
55
"fmt"
6+
"path"
67
"sync"
78
"time"
89

@@ -29,13 +30,15 @@ type MessagesServer struct {
2930
oplogBackupRunning bool
3031
err error
3132
//
33+
workDir string
34+
lastBackupMetadata *BackupMetadata
3235
clientDisconnetedChan chan string
3336
dbBackupFinishChan chan interface{}
3437
oplogBackupFinishChan chan interface{}
3538
logger *logrus.Logger
3639
}
3740

38-
func NewMessagesServer(logger *logrus.Logger) *MessagesServer {
41+
func NewMessagesServer(workDir string, logger *logrus.Logger) *MessagesServer {
3942
if logger == nil {
4043
logger = logrus.New()
4144
logger.SetLevel(logrus.StandardLogger().Level)
@@ -53,6 +56,7 @@ func NewMessagesServer(logger *logrus.Logger) *MessagesServer {
5356
dbBackupFinishChan: bfc,
5457
oplogBackupFinishChan: ofc,
5558
replicasRunningBackup: make(map[string]bool),
59+
workDir: workDir,
5660
logger: logger,
5761
}
5862

@@ -186,6 +190,17 @@ func (s *MessagesServer) StartBackup(opts *pb.StartBackup) error {
186190
if s.isBackupRunning() {
187191
return fmt.Errorf("Backup is already running")
188192
}
193+
194+
s.lastBackupMetadata = NewBackupMetadata()
195+
196+
s.lastBackupMetadata.StartTs = time.Now()
197+
s.lastBackupMetadata.Replicasets = make(map[string]ReplicasetMetadata)
198+
s.lastBackupMetadata.BackupType = opts.GetBackupType()
199+
s.lastBackupMetadata.DestinationType = opts.GetDestinationType()
200+
s.lastBackupMetadata.DestinationDir = opts.GetDestinationDir()
201+
s.lastBackupMetadata.CompressionType = opts.GetCompressionType()
202+
s.lastBackupMetadata.Cypher = opts.GetCypher()
203+
189204
clients, err := s.BackupSourceByReplicaset()
190205
if err != nil {
191206
return errors.Wrapf(err, "Cannot start backup. Cannot find backup source for replicas")
@@ -198,7 +213,10 @@ func (s *MessagesServer) StartBackup(opts *pb.StartBackup) error {
198213
for replName, client := range clients {
199214
s.logger.Printf("Starting backup for replicaset %q on client %s %s %s", replName, client.ID, client.NodeName, client.NodeType)
200215
s.replicasRunningBackup[replName] = true
201-
destinationName := fmt.Sprintf("%s_%s_%s", time.Now().Format("2006-01-02_15.04.05"), client.ReplicasetName, opts.GetDestinationName())
216+
destinationName := fmt.Sprintf("%s_%s_%s", s.lastBackupMetadata.StartTs.Format("2006-01-02_15.04.05"), client.ReplicasetName, opts.GetDestinationName())
217+
218+
s.lastBackupMetadata.AddReplicaset(client.ReplicasetName, client.ReplicasetUUID, destinationName)
219+
202220
client.startBackup(&pb.StartBackup{
203221
BackupType: opts.GetBackupType(),
204222
DestinationType: opts.GetDestinationType(),
@@ -210,9 +228,28 @@ func (s *MessagesServer) StartBackup(opts *pb.StartBackup) error {
210228
})
211229
}
212230

231+
metadataFilename := path.Join(s.workDir, fmt.Sprintf("%s.json", s.lastBackupMetadata.StartTs.Format("2006-01-02_15.04.05")))
232+
err = s.lastBackupMetadata.WriteMetadataToFile(metadataFilename)
233+
if err != nil {
234+
log.Warn("Cannot write metadata file %s: %s", metadataFilename, err)
235+
}
213236
return nil
214237
}
215238

239+
// StartBalancer restarts the balancer if this is a sharded system
240+
func (s *MessagesServer) StartBalancer() error {
241+
s.lock.Lock()
242+
defer s.lock.Unlock()
243+
for _, client := range s.clients {
244+
if client.NodeType == pb.NodeType_MONGOS {
245+
return client.startBalancer()
246+
}
247+
}
248+
// This is not a sharded system. There is nothing to do.
249+
return nil
250+
}
251+
252+
// StopBalancer stops the balancer if this is a sharded system
216253
func (s *MessagesServer) StopBalancer() error {
217254
s.lock.Lock()
218255
defer s.lock.Unlock()
@@ -221,7 +258,8 @@ func (s *MessagesServer) StopBalancer() error {
221258
return client.stopBalancer()
222259
}
223260
}
224-
return fmt.Errorf("No MongoS server found to stop the balancer")
261+
// This is not a sharded system. There is nothing to do.
262+
return nil
225263
}
226264

227265
func (s *MessagesServer) cancelBackup() error {

internal/testutils/grpc-daeamon.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"context"
55
"fmt"
66
"net"
7-
"os"
8-
"path"
97
"sync"
108
"testing"
119
"time"
@@ -43,7 +41,7 @@ type GrpcDaemon struct {
4341
clients []*client.Client
4442
}
4543

46-
func NewGrpcDaemon(ctx context.Context, t *testing.T, logger *logrus.Logger) (*GrpcDaemon, error) {
44+
func NewGrpcDaemon(ctx context.Context, workDir string, t *testing.T, logger *logrus.Logger) (*GrpcDaemon, error) {
4745
if logger == nil {
4846
logger = logrus.New()
4947
logger.SetLevel(logrus.StandardLogger().Level)
@@ -56,13 +54,7 @@ func NewGrpcDaemon(ctx context.Context, t *testing.T, logger *logrus.Logger) (*G
5654
logger: logger,
5755
lock: &sync.Mutex{},
5856
}
59-
60-
tmpDir := path.Join(os.TempDir(), "dump_test")
61-
os.RemoveAll(tmpDir) // Don't check for errors. The path might not exist
62-
err := os.MkdirAll(tmpDir, os.ModePerm)
63-
if err != nil {
64-
return nil, fmt.Errorf("Cannot create temp dir %q, %s", tmpDir, err)
65-
}
57+
var err error
6658

6759
// Start the grpc server
6860
d.msgListener, err = net.Listen("tcp", fmt.Sprintf("localhost:%s", TEST_GRPC_MESSAGES_PORT))
@@ -73,7 +65,7 @@ func NewGrpcDaemon(ctx context.Context, t *testing.T, logger *logrus.Logger) (*G
7365
d.ctx, d.cancelFunc = context.WithCancel(ctx)
7466
// This is the sever/agents gRPC server
7567
d.grpcServer4Clients = grpc.NewServer(opts...)
76-
d.MessagesServer = server.NewMessagesServer(logger)
68+
d.MessagesServer = server.NewMessagesServer(workDir, logger)
7769
pb.RegisterMessagesServer(d.grpcServer4Clients, d.MessagesServer)
7870

7971
d.wg.Add(1)
@@ -128,7 +120,7 @@ func NewGrpcDaemon(ctx context.Context, t *testing.T, logger *logrus.Logger) (*G
128120
ReplicasetName: di.ReplicaSetName,
129121
}
130122

131-
client, err := client.NewClient(d.ctx, tmpDir, dbConnOpts, client.SSLOptions{}, clientConn, logger)
123+
client, err := client.NewClient(d.ctx, workDir, dbConnOpts, client.SSLOptions{}, clientConn, logger)
132124
if err != nil {
133125
return nil, fmt.Errorf("Cannot create an agent instance %s: %s", agentID, err)
134126
}

0 commit comments

Comments
 (0)