Skip to content

Commit dd80834

Browse files
New restore method
1 parent deecd17 commit dd80834

File tree

7 files changed

+46
-38
lines changed

7 files changed

+46
-38
lines changed

cli/pbm-coordinator/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ const (
5858
defaultAPIPort = 10001
5959
defaultShutdownTimeout = 5 // Seconds
6060
defaultClientsLogging = true
61-
defaultDebugMode = true
61+
defaultDebugMode = false
6262
defaultWorkDir = "~/percona-backup-mongodb"
6363
)
6464

grpc/api/api.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package api
22

33
import (
44
"context"
5+
"os"
56
"path/filepath"
67
"time"
78

@@ -28,7 +29,9 @@ var (
2829
)
2930

3031
func init() {
31-
logger.SetLevel(logrus.DebugLevel)
32+
if os.Getenv("DEBUG") == "1" {
33+
logger.SetLevel(logrus.DebugLevel)
34+
}
3235
}
3336

3437
func (a *ApiServer) GetClients(m *pbapi.Empty, stream pbapi.Api_GetClientsServer) error {

grpc/client/client.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
"github.com/globalsign/mgo"
1818
"github.com/globalsign/mgo/bson"
1919
"github.com/golang/snappy"
20-
"github.com/kr/pretty"
2120
"github.com/percona/percona-backup-mongodb/bsonfile"
2221
"github.com/percona/percona-backup-mongodb/internal/awsutils"
2322
"github.com/percona/percona-backup-mongodb/internal/backup/dumper"
@@ -364,11 +363,10 @@ func (c *Client) processIncommingServerMessages() {
364363
case *pb.ServerMessage_ListReplicasets:
365364
c.processListReplicasets()
366365
case *pb.ServerMessage_PingMsg:
367-
c.processPing()
368-
// msg := c.processPing()
369-
//if err := c.streamSend(msg); err != nil {
370-
// c.logger.Errorf("Cannot stream ping response to the server: %s. Out message: %+v. In message type: %T", err, *msg, msg.Payload)
371-
//}
366+
msg := c.processPing()
367+
if err := c.streamSend(msg); err != nil {
368+
c.logger.Errorf("Cannot stream ping response to the server: %s. Out message: %+v. In message type: %T", err, *msg, msg.Payload)
369+
}
372370
continue
373371
case *pb.ServerMessage_CanRestoreBackupMsg:
374372
msg, err := c.processCanRestoreBackup(msg.GetCanRestoreBackupMsg())
@@ -626,7 +624,7 @@ func (c *Client) processListReplicasets() error {
626624

627625
func (c *Client) processPing() *pb.ClientMessage {
628626
c.logger.Debug("Received Ping command")
629-
//c.updateClientInfo()
627+
c.updateClientInfo()
630628

631629
pongMsg := &pb.Pong{
632630
Timestamp: time.Now().Unix(),
@@ -658,7 +656,6 @@ func (c *Client) processRestore(msg *pb.RestoreBackup) error {
658656
}()
659657

660658
c.sendACK()
661-
pretty.Print(msg)
662659

663660
if err := c.restoreDBDump(msg); err != nil {
664661
err := errors.Wrap(err, "cannot restore DB backup")
@@ -911,7 +908,6 @@ func (c *Client) runDBBackup(msg *pb.StartBackup) {
911908
fw, err := os.Create(path.Join(c.backupDir, msg.GetDbBackupName()))
912909
if err != nil {
913910
log.Errorf("Cannot create backup file: %s", err)
914-
// TODO Stream error msg to the server
915911
}
916912
writers = append(writers, fw)
917913
}

grpc/server/clients.go

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -267,25 +267,24 @@ func (c *Client) listReplicasets() ([]string, error) {
267267
}
268268

269269
func (c *Client) ping() error {
270-
//c.logger.Debug("sending ping")
270+
c.logger.Debug("sending ping")
271271
err := c.streamSend(&pb.ServerMessage{Payload: &pb.ServerMessage_PingMsg{PingMsg: &pb.Ping{}}})
272272
if err != nil {
273273
return errors.Wrap(err, "clients.go -> ping()")
274274
}
275275

276-
//msg, err := c.streamRecv()
277-
//if err != nil {
278-
// return errors.Wrapf(err, "ping client %s (%s)", c.ID, c.NodeName)
279-
//}
280-
//pretty.Println(msg)
281-
//pongMsg := msg.GetPongMsg()
282-
//c.statusLock.Lock()
283-
//c.NodeType = pongMsg.GetNodeType()
284-
//c.ReplicasetUUID = pongMsg.GetReplicaSetUuid()
285-
//c.ReplicasetVersion = pongMsg.GetReplicaSetVersion()
286-
//c.isTailing = pongMsg.GetIsTailing()
287-
//c.lastTailedTimestamp = pongMsg.GetLastTailedTimestamp()
288-
//c.statusLock.Unlock()
276+
msg, err := c.streamRecv()
277+
if err != nil {
278+
return errors.Wrapf(err, "ping client %s (%s)", c.ID, c.NodeName)
279+
}
280+
pongMsg := msg.GetPongMsg()
281+
c.statusLock.Lock()
282+
c.NodeType = pongMsg.GetNodeType()
283+
c.ReplicasetUUID = pongMsg.GetReplicaSetUuid()
284+
c.ReplicasetVersion = pongMsg.GetReplicaSetVersion()
285+
c.isTailing = pongMsg.GetIsTailing()
286+
c.lastTailedTimestamp = pongMsg.GetLastTailedTimestamp()
287+
c.statusLock.Unlock()
289288

290289
return nil
291290
}

grpc/server/server.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -137,12 +137,6 @@ func (s *MessagesServer) RestoreSourcesByReplicaset(bm *pb.BackupMetadata) (map[
137137
sources[resp.Replicaset] = s
138138
}
139139
}
140-
for rs, source := range sources {
141-
fmt.Printf("RS: %v\n", rs)
142-
fmt.Printf("Client: %v\n", source.Client.NodeName)
143-
fmt.Printf("Host: %v\n", source.Host)
144-
fmt.Printf("Port: %v\n", source.Port)
145-
}
146140
return sources, nil
147141
}
148142

@@ -330,9 +324,6 @@ func (s *MessagesServer) RestoreBackUp(bm *pb.BackupMetadata, skipUsersAndRoles
330324
// Ping will also update the status and if it is primary or secondary
331325
for _, source := range clients {
332326
source.Client.ping()
333-
fmt.Printf("Client ID : %v\n", source.Client.ID)
334-
fmt.Printf("Node Name : %v\n", source.Client.NodeName)
335-
fmt.Printf("Is primary : %v\n", source.Client.isPrimary)
336327
}
337328

338329
for replName, source := range clients {

internal/backup/dumper/dumper.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/mongodb/mongo-tools/common/options"
99
"github.com/mongodb/mongo-tools/mongodump"
1010
"github.com/pkg/errors"
11+
"github.com/sirupsen/logrus"
1112
)
1213

1314
type MongodumpInput struct {
@@ -32,6 +33,21 @@ type Mongodump struct {
3233
//
3334
lock *sync.Mutex
3435
running bool
36+
37+
mdumpLogger []byte
38+
}
39+
40+
type log2LogrusWriter struct {
41+
entry *logrus.Entry
42+
}
43+
44+
func (w *log2LogrusWriter) Write(b []byte) (int, error) {
45+
n := len(b)
46+
if n > 0 && b[n-1] == '\n' {
47+
b = b[:n-1]
48+
}
49+
w.entry.Info(string(b))
50+
return n, nil
3551
}
3652

3753
func NewMongodump(i *MongodumpInput) (*Mongodump, error) {
@@ -82,12 +98,15 @@ func NewMongodump(i *MongodumpInput) (*Mongodump, error) {
8298

8399
dump.OutputWriter = i.Writer
84100

85-
return &Mongodump{
101+
mongoDump := &Mongodump{
86102
MongodumpInput: i,
87103
mongodump: dump,
88104
lock: &sync.Mutex{},
89105
running: false,
90-
}, nil
106+
}
107+
//mdumpLogger.SetWriter(logrus.StandardLogger().Writer())
108+
109+
return mongoDump, nil
91110
}
92111

93112
func (md *Mongodump) LastError() error {

tests/general_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ func TestValidateReplicasetAgents(t *testing.T) {
356356
time.Sleep(10 * time.Second)
357357
for _, client := range d.Clients() {
358358
if client.ReplicasetName() == "rs1" {
359-
fmt.Printf("Stopping client: %s, rs: %s\n", client.NodeName(), client.ReplicasetName())
359+
log.Infof("Stopping client: %s, rs: %s\n", client.NodeName(), client.ReplicasetName())
360360
client.Stop()
361361
time.Sleep(1 * time.Second)
362362
}
@@ -466,7 +466,7 @@ func TestBackup1(t *testing.T) {
466466
t.Fatalf("Cannot start backup: %s", err)
467467
}
468468

469-
fmt.Printf("starting backup: %s\n", backupNamePrefix)
469+
log.Infof("starting backup: %s\n", backupNamePrefix)
470470

471471
d.MessagesServer.WaitBackupFinish()
472472
err = d.MessagesServer.StopOplogTail()

0 commit comments

Comments
 (0)