Skip to content

Commit 7e9a6f8

Browse files
Merge pull request #88 from percona/PMB-31
PMB-31 Support start backup from the cli
2 parents 0ffa5d1 + 4b8abb1 commit 7e9a6f8

File tree

20 files changed

+3050
-887
lines changed

20 files changed

+3050
-887
lines changed

.env

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
AWS_ACCESS_KEY_ID=
22
AWS_SECRET_ACCESS_KEY=
33
GOCACHE=off
4-
GOLANG_DOCKERHUB_TAG=1.11-stretch
4+
GOLANG_DOCKERHUB_TAG=1.10-stretch
55
TEST_MONGODB_ADMIN_USERNAME=admin
66
TEST_MONGODB_ADMIN_PASSWORD=admin123456
77
TEST_MONGODB_USERNAME=test

cli/mongodb-backup-admin/main.go

Lines changed: 80 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,32 @@ import (
44
"context"
55
"fmt"
66
"io"
7-
"log"
87
"os"
9-
"strings"
8+
"sort"
109
"time"
1110

1211
"github.com/alecthomas/kingpin"
1312
pbapi "github.com/percona/mongodb-backup/proto/api"
1413
"github.com/pkg/errors"
14+
log "github.com/sirupsen/logrus"
1515
"google.golang.org/grpc"
1616
"google.golang.org/grpc/credentials"
1717
"google.golang.org/grpc/testdata"
1818
)
1919

2020
type cliOptions struct {
2121
app *kingpin.Application
22-
clientID *string
2322
tls *bool
2423
caFile *string
2524
serverAddr *string
2625

2726
listClients *kingpin.CmdClause
27+
28+
startBackup *kingpin.CmdClause
29+
backupType *string
30+
destinationType *string
31+
compressionAlgorithm *string
32+
encryptionAlgorithm *string
2833
}
2934

3035
func main() {
@@ -56,18 +61,30 @@ func main() {
5661
}
5762
defer conn.Close()
5863

64+
apiClient := pbapi.NewApiClient(conn)
65+
if err != nil {
66+
log.Fatalf("Cannot connect to the API: %s", err)
67+
}
68+
5969
switch cmd {
6070
case "list-agents":
61-
clients, err := getConnectedAgents(conn)
71+
clients, err := connectedAgents(conn)
6272
if err != nil {
63-
log.Fatal(err)
73+
log.Debugf("Cannot connect to the gRPC server: %s", err)
74+
log.Fatal("Cannot connect to the gRPC server")
6475
}
6576
printConnectedAgents(clients)
77+
case "start-backup":
78+
err := startBackup(apiClient, opts)
79+
if err != nil {
80+
log.Fatal(err)
81+
log.Fatalf("Cannot connect to the gRPC server: %s", err)
82+
}
6683
}
6784

6885
}
6986

70-
func getConnectedAgents(conn *grpc.ClientConn) ([]*pbapi.Client, error) {
87+
func connectedAgents(conn *grpc.ClientConn) ([]*pbapi.Client, error) {
7188
apiClient := pbapi.NewApiClient(conn)
7289
stream, err := apiClient.GetClients(context.Background(), &pbapi.Empty{})
7390
if err != nil {
@@ -84,6 +101,7 @@ func getConnectedAgents(conn *grpc.ClientConn) ([]*pbapi.Client, error) {
84101
}
85102
clients = append(clients, msg)
86103
}
104+
sort.Slice(clients, func(i, j int) bool { return clients[i].NodeName < clients[j].NodeName })
87105
return clients, nil
88106
}
89107

@@ -92,21 +110,69 @@ func printConnectedAgents(clients []*pbapi.Client) {
92110
fmt.Println("There are no agents connected to the backup master")
93111
return
94112
}
95-
fmt.Println(strings.Repeat("-", 100))
113+
fmt.Println(" Node Name - Node Type - Replicaset - Backup Running - Last Seen")
96114
for _, client := range clients {
97-
fmt.Printf("%s - %s - %v\n", client.ClientID, client.Status, time.Unix(client.LastSeen, 0))
115+
runningBackup := "false"
116+
if client.Status.GetRunningDBBackup() {
117+
runningBackup = "true"
118+
}
119+
fmt.Printf("%20s - %20s - %18s - %5s - %v\n", client.ID, client.NodeType, client.ReplicasetName, runningBackup, time.Unix(client.LastSeen, 0))
98120
}
99121
}
100122

101-
func processCliArgs(args []string) (string, *cliOptions, error) {
123+
func startBackup(apiClient pbapi.ApiClient, opts *cliOptions) error {
124+
msg := &pbapi.RunBackupParams{
125+
CompressionType: pbapi.CompressionType_NO_COMPRESSION,
126+
Cypher: pbapi.Cypher_NO_CYPHER,
127+
}
102128

129+
switch *opts.backupType {
130+
case "logical":
131+
msg.BackupType = pbapi.BackupType_LOGICAL
132+
case "hot":
133+
msg.BackupType = pbapi.BackupType_LOGICAL
134+
}
135+
136+
switch *opts.destinationType {
137+
case "logical":
138+
msg.DestinationType = pbapi.DestinationType_FILE
139+
case "aws":
140+
msg.DestinationType = pbapi.DestinationType_AWS
141+
}
142+
143+
switch *opts.compressionAlgorithm {
144+
case "gzip":
145+
msg.CompressionType = pbapi.CompressionType_GZIP
146+
}
147+
148+
switch *opts.encryptionAlgorithm {
149+
}
150+
151+
_, err := apiClient.RunBackup(context.Background(), msg)
152+
if err != nil {
153+
return err
154+
}
155+
156+
return nil
157+
}
158+
159+
func processCliArgs(args []string) (string, *cliOptions, error) {
103160
app := kingpin.New("mongodb-backup-admin", "MongoDB backup admin")
161+
listClientsCmd := app.Command("list-agents", "List all agents connected to the server")
162+
startBackupCmd := app.Command("start-backup", "Start a backup")
163+
104164
opts := &cliOptions{
105-
clientID: kingpin.Flag("client-id", "Client ID").Required().String(),
106-
tls: kingpin.Flag("tls", "Connection uses TLS if true, else plain TCP").Default("false").Bool(),
107-
caFile: kingpin.Flag("ca-file", "The file containning the CA root cert file").String(),
108-
serverAddr: kingpin.Flag("server-addr", "The server address in the format of host:port").Default("127.0.0.1:10001").String(),
109-
listClients: kingpin.Command("list-agents", "List all agents connected to the server"),
165+
tls: app.Flag("tls", "Connection uses TLS if true, else plain TCP").Default("false").Bool(),
166+
caFile: app.Flag("ca-file", "The file containning the CA root cert file").String(),
167+
serverAddr: app.Flag("server-addr", "The server address in the format of host:port").Default("127.0.0.1:10001").String(),
168+
169+
listClients: listClientsCmd,
170+
171+
startBackup: startBackupCmd,
172+
backupType: startBackupCmd.Flag("backup-type", "Backup type").Enum("logical", "hot"),
173+
destinationType: startBackupCmd.Flag("destination-type", "Backup destination type").Enum("file", "aws"),
174+
compressionAlgorithm: startBackupCmd.Flag("compression-algorithm", "Compression algorithm used for the backup").String(),
175+
encryptionAlgorithm: startBackupCmd.Flag("encryption-algorithm", "Encryption algorithm used for the backup").String(),
110176
}
111177

112178
cmd, err := app.Parse(args)

cli/mongodb-backup-agent/main.go

Lines changed: 86 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,34 @@ package main
33
import (
44
"context"
55
"fmt"
6+
"io/ioutil"
67
"log"
78
"math/rand"
89
"os"
910
"os/signal"
11+
"strconv"
1012
"strings"
13+
"syscall"
1114

1215
"github.com/alecthomas/kingpin"
1316
"github.com/globalsign/mgo"
1417
"github.com/percona/mongodb-backup/grpc/client"
18+
"github.com/pkg/errors"
19+
"github.com/sirupsen/logrus"
1520
"google.golang.org/grpc"
1621
"google.golang.org/grpc/credentials"
1722
)
1823

1924
type cliOptios struct {
2025
app *kingpin.Application
2126
dsn *string
27+
pidFile *string
2228
serverAddress *string
29+
backupDir *string
2330
tls *bool
2431
caFile *string
32+
debug *bool
33+
quiet *bool
2534

2635
// MongoDB connection options
2736
mongodbConnOptions struct {
@@ -48,43 +57,64 @@ type cliOptios struct {
4857
}
4958

5059
func main() {
60+
log := logrus.New()
5161
opts, err := processCliArgs()
5262
if err != nil {
5363
log.Fatalf("Cannot parse command line arguments: %s", err)
5464
}
5565

66+
log.SetLevel(logrus.InfoLevel)
67+
if *opts.quiet {
68+
log.SetLevel(logrus.ErrorLevel)
69+
}
70+
if *opts.debug {
71+
log.SetLevel(logrus.DebugLevel)
72+
}
73+
log.SetLevel(logrus.DebugLevel)
74+
5675
grpcOpts := getgRPCOptions(opts)
5776
clientID := fmt.Sprintf("ABC%04d", rand.Int63n(10000))
58-
log.Printf("Using Client ID: %s", clientID)
77+
log.Infof("Using Client ID: %s", clientID)
5978

6079
// Connect to the mongodb-backup gRPC server
6180
conn, err := grpc.Dial(*opts.serverAddress, grpcOpts...)
6281
if err != nil {
63-
log.Fatalf("fail to dial: %v", err)
82+
log.Fatalf("Fail to connect to the gRPC server at %q: %v", *opts.serverAddress, err)
6483
}
6584
defer conn.Close()
66-
log.Printf("Connected to the gRPC server at %s", *opts.serverAddress)
85+
log.Infof("Connected to the gRPC server at %s", *opts.serverAddress)
6786

6887
// Connect to the MongoDB instance
6988
var di *mgo.DialInfo
7089
if *opts.dsn == "" {
7190
di = &mgo.DialInfo{
72-
Addrs: []string{opts.mongodbConnOptions.Host + ":" + opts.mongodbConnOptions.Port},
91+
Addrs: []string{opts.mongodbConnOptions.Host + ":" + opts.mongodbConnOptions.Port},
92+
Username: opts.mongodbConnOptions.User,
93+
Password: opts.mongodbConnOptions.Password,
94+
ReplicaSetName: opts.mongodbConnOptions.ReplicasetName,
95+
FailFast: true,
96+
Source: "admin",
7397
}
7498
} else {
7599
di, err = mgo.ParseURL(*opts.dsn)
100+
di.FailFast = true
76101
if err != nil {
77-
log.Fatalf("Cannot parse MongoDB dsn %q, %s", *opts.dsn, err)
102+
log.Fatalf("Cannot parse MongoDB DSN %q, %s", *opts.dsn, err)
78103
}
79104
}
105+
80106
mdbSession, err := mgo.DialWithInfo(di)
81107
if err != nil {
82-
log.Fatalf("Cannot connect to MongoDB on %s: %s", di.Addrs[0], err)
108+
log.Fatalf("Cannot connect to MongoDB at %s: %s", di.Addrs[0], err)
83109
}
84110
defer mdbSession.Close()
85-
log.Printf("Connected to MongoDB at %s", di.Addrs[0])
111+
log.Infof("Connected to MongoDB at %s", di.Addrs[0])
112+
113+
client, err := client.NewClient(context.Background(), *opts.backupDir, opts.mongodbConnOptions, opts.mongodbSslOptions, conn, log)
114+
if err != nil {
115+
log.Fatal(err)
116+
}
86117

87-
client, err := client.NewClient(context.Background(), opts.mongodbConnOptions, opts.mongodbSslOptions, conn)
88118
c := make(chan os.Signal, 1)
89119
signal.Notify(c, os.Interrupt)
90120

@@ -97,22 +127,34 @@ func processCliArgs() (*cliOptios, error) {
97127
opts := &cliOptios{
98128
app: app,
99129
dsn: app.Flag("dsn", "MongoDB connection string").String(),
100-
serverAddress: app.Flag("server-address", "MongoDB backup server address").String(),
130+
serverAddress: app.Flag("server-address", "MongoDB backup server address").Default("127.0.0.1:10000").String(),
131+
backupDir: app.Flag("backup-dir", "Directory where to store the backups").Default("/tmp").String(),
101132
tls: app.Flag("tls", "Use TLS").Bool(),
133+
pidFile: app.Flag("pid-file", "pid file").String(),
102134
caFile: app.Flag("ca-file", "CA file").String(),
135+
debug: app.Flag("debug", "Enable debug log level").Bool(),
136+
quiet: app.Flag("quiet", "Quiet mode. Log only errors").Bool(),
103137
}
104138

105-
app.Flag("mongodb-host", "MongoDB host").StringVar(&opts.mongodbConnOptions.Host)
106-
app.Flag("mongodb-port", "MongoDB port").StringVar(&opts.mongodbConnOptions.Port)
107-
app.Flag("mongodb-user", "MongoDB username").StringVar(&opts.mongodbConnOptions.User)
108-
app.Flag("mongodb-password", "MongoDB password").StringVar(&opts.mongodbConnOptions.Password)
109-
app.Flag("replicaset", "Replicaset name").StringVar(&opts.mongodbConnOptions.ReplicasetName)
139+
//TODO: Remove defaults. These values are only here to test during development
140+
// These params should be Required()
141+
app.Flag("mongodb-host", "MongoDB host").Default("127.0.0.1").StringVar(&opts.mongodbConnOptions.Host)
142+
app.Flag("mongodb-port", "MongoDB port").Default(os.Getenv("TEST_MONGODB_S1_PRIMARY_PORT")).StringVar(&opts.mongodbConnOptions.Port)
143+
app.Flag("mongodb-user", "MongoDB username").Default(os.Getenv("TEST_MONGODB_USERNAME")).StringVar(&opts.mongodbConnOptions.User)
144+
app.Flag("mongodb-password", "MongoDB password").Default(os.Getenv("TEST_MONGODB_PASSWORD")).StringVar(&opts.mongodbConnOptions.Password)
145+
app.Flag("replicaset", "Replicaset name").Default(os.Getenv("TEST_MONGODB_S1_RS")).StringVar(&opts.mongodbConnOptions.ReplicasetName)
110146

111147
_, err := app.Parse(os.Args[1:])
112148
if err != nil {
113149
return nil, err
114150
}
115151

152+
if *opts.pidFile != "" {
153+
if err := writePidFile(*opts.pidFile); err != nil {
154+
return nil, errors.Wrapf(err, "cannot write pid file %q", *opts.pidFile)
155+
}
156+
}
157+
116158
if *opts.dsn != "" {
117159
di, err := mgo.ParseURL(*opts.dsn)
118160
if err != nil {
@@ -128,6 +170,15 @@ func processCliArgs() (*cliOptios, error) {
128170
opts.mongodbConnOptions.Password = di.Password
129171
opts.mongodbConnOptions.ReplicasetName = di.ReplicaSetName
130172
}
173+
174+
fi, err := os.Stat(*opts.backupDir)
175+
if err != nil {
176+
return nil, errors.Wrap(err, "invalid backup destination dir")
177+
}
178+
if !fi.IsDir() {
179+
return nil, fmt.Errorf("%q is not a directory", *opts.backupDir)
180+
}
181+
131182
return opts, nil
132183
}
133184

@@ -144,3 +195,24 @@ func getgRPCOptions(opts *cliOptios) []grpc.DialOption {
144195
}
145196
return grpcOpts
146197
}
198+
199+
// Write a pid file, but first make sure it doesn't exist with a running pid.
200+
func writePidFile(pidFile string) error {
201+
// Read in the pid file as a slice of bytes.
202+
if piddata, err := ioutil.ReadFile(pidFile); err == nil {
203+
// Convert the file contents to an integer.
204+
if pid, err := strconv.Atoi(string(piddata)); err == nil {
205+
// Look for the pid in the process list.
206+
if process, err := os.FindProcess(pid); err == nil {
207+
// Send the process a signal zero kill.
208+
if err := process.Signal(syscall.Signal(0)); err == nil {
209+
// We only get an error if the pid isn't running, or it's not ours.
210+
return fmt.Errorf("pid already running: %d", pid)
211+
}
212+
}
213+
}
214+
}
215+
// If we get here, then the pidfile didn't exist,
216+
// or the pid in it doesn't belong to the user running this app.
217+
return ioutil.WriteFile(pidFile, []byte(fmt.Sprintf("%d", os.Getpid())), 0664)
218+
}

0 commit comments

Comments
 (0)