Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 0 additions & 22 deletions .env

This file was deleted.

42 changes: 28 additions & 14 deletions cli/mongodb-backup-admin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os"
"os/signal"
"sort"
"time"

Expand Down Expand Up @@ -71,6 +72,8 @@ func main() {
grpcOpts = append(grpcOpts, grpc.WithInsecure())
}

ctx, cancel := context.WithCancel(context.Background())

conn, err = grpc.Dial(*opts.serverAddr, grpcOpts...)
if err != nil {
log.Fatalf("fail to dial: %v", err)
Expand All @@ -82,16 +85,24 @@ func main() {
log.Fatalf("Cannot connect to the API: %s", err)
}

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)

go func() {
<-c
cancel()
}()

switch cmd {
case "list-agents":
clients, err := connectedAgents(conn)
clients, err := connectedAgents(ctx, conn)
if err != nil {
log.Errorf("Cannot get the list of connected agents: %s", err)
break
}
printConnectedAgents(clients)
case "list-backups":
md, err := getAvailableBackups(conn)
md, err := getAvailableBackups(ctx, conn)
if err != nil {
log.Errorf("Cannot get the list of available backups: %s", err)
break
Expand All @@ -102,25 +113,28 @@ func main() {
}
fmt.Println("No backups found")
case "backup":
err := startBackup(apiClient, opts)
err := startBackup(ctx, apiClient, opts)
if err != nil {
log.Fatal(err)
log.Fatalf("Cannot send the StartBackup command to the gRPC server: %s", err)
}
case "restore":
fmt.Println("restoring")
err := restoreBackup(apiClient, opts)
err := restoreBackup(ctx, apiClient, opts)
if err != nil {
log.Fatal(err)
log.Fatalf("Cannot send the RestoreBackup command to the gRPC server: %s", err)
}
default:
log.Fatalf("Unknown command %q", cmd)
}

cancel()
}

func connectedAgents(conn *grpc.ClientConn) ([]*pbapi.Client, error) {
func connectedAgents(ctx context.Context, conn *grpc.ClientConn) ([]*pbapi.Client, error) {
apiClient := pbapi.NewApiClient(conn)
stream, err := apiClient.GetClients(context.Background(), &pbapi.Empty{})
stream, err := apiClient.GetClients(ctx, &pbapi.Empty{})
if err != nil {
return nil, err
}
Expand All @@ -139,9 +153,9 @@ func connectedAgents(conn *grpc.ClientConn) ([]*pbapi.Client, error) {
return clients, nil
}

func getAvailableBackups(conn *grpc.ClientConn) (map[string]*pb.BackupMetadata, error) {
func getAvailableBackups(ctx context.Context, conn *grpc.ClientConn) (map[string]*pb.BackupMetadata, error) {
apiClient := pbapi.NewApiClient(conn)
stream, err := apiClient.BackupsMetadata(context.Background(), &pbapi.Empty{})
stream, err := apiClient.BackupsMetadata(ctx, &pbapi.Empty{})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -175,7 +189,7 @@ func listAvailableBackups() (backups []string) {
defer conn.Close()
}

mds, err := getAvailableBackups(conn)
mds, err := getAvailableBackups(context.TODO(), conn)
if err != nil {
return
}
Expand Down Expand Up @@ -210,7 +224,7 @@ func printConnectedAgents(clients []*pbapi.Client) {
}
}

func startBackup(apiClient pbapi.ApiClient, opts *cliOptions) error {
func startBackup(ctx context.Context, apiClient pbapi.ApiClient, opts *cliOptions) error {
msg := &pbapi.RunBackupParams{
CompressionType: pbapi.CompressionType_NO_COMPRESSION,
Cypher: pbapi.Cypher_NO_CYPHER,
Expand Down Expand Up @@ -239,21 +253,21 @@ func startBackup(apiClient pbapi.ApiClient, opts *cliOptions) error {
switch *opts.encryptionAlgorithm {
}

_, err := apiClient.RunBackup(context.Background(), msg)
_, err := apiClient.RunBackup(ctx, msg)
if err != nil {
return err
}

return nil
}

func restoreBackup(apiClient pbapi.ApiClient, opts *cliOptions) error {
func restoreBackup(ctx context.Context, apiClient pbapi.ApiClient, opts *cliOptions) error {
msg := &pbapi.RunRestoreParams{
MetadataFile: *opts.restoreMetadataFile,
SkipUsersAndRoles: *opts.restoreSkipUsersAndRoles,
}

_, err := apiClient.RunRestore(context.Background(), msg)
_, err := apiClient.RunRestore(ctx, msg)
if err != nil {
return err
}
Expand All @@ -265,7 +279,7 @@ func processCliArgs(args []string) (string, *cliOptions, error) {
app := kingpin.New("mongodb-backup-admin", "MongoDB backup admin")
listClientsCmd := app.Command("list-agents", "List all agents connected to the server")
listBackupsCmd := app.Command("list-backups", "List all backups (metadata files) stored in the server working directory")
backupCmd := app.Command("start-backup", "Start a backup")
backupCmd := app.Command("backup", "Start a backup")
restoreCmd := app.Command("restore", "Restore a backup given a metadata file name")

opts := &cliOptions{
Expand Down
32 changes: 20 additions & 12 deletions cli/mongodb-backupd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@ import (
)

type cliOptions struct {
app *kingpin.Application
cmd string
tls *bool
workDir *string
certFile *string
keyFile *string
grpcPort *int
apiPort *int
shutdownTimeout *int
debug *bool
app *kingpin.Application
cmd string
tls *bool
workDir *string
certFile *string
keyFile *string
grpcPort *int
apiPort *int
shutdownTimeout *int
debug *bool
enableClientsLogging *bool
}

var (
Expand All @@ -54,6 +55,8 @@ func processCliParams() (*cliOptions, error) {
apiPort: app.Flag("api-port", "Listening por for API client connecions").Default(defaultAPIPort).Int(),
shutdownTimeout: app.Flag("shutdown-timeout", "Server shutdown timeout").Default("3").Int(),
debug: app.Flag("debug", "Enable debug log level").Bool(),
enableClientsLogging: app.Flag("enable-clients-logging", "Enable showing logs comming from agents on the server side").
Default("true").Bool(),
}

opts.cmd, err = app.Parse(os.Args[1:])
Expand Down Expand Up @@ -98,8 +101,13 @@ func main() {
stopChan := make(chan interface{})
wg := &sync.WaitGroup{}

var messagesServer *server.MessagesServer
grpcServer := grpc.NewServer(grpcOpts...)
messagesServer := server.NewMessagesServer(*opts.workDir, log)
if *opts.enableClientsLogging {
messagesServer = server.NewMessagesServerWithClientLogging(*opts.workDir, log)
} else {
messagesServer = server.NewMessagesServer(*opts.workDir, log)
}
pb.RegisterMessagesServer(grpcServer, messagesServer)

wg.Add(1)
Expand All @@ -118,7 +126,7 @@ func main() {
signal.Notify(c, os.Interrupt)

<-c
log.Infof("Stop signal received. Stopping the agent")
log.Infof("Stop signal received. Stopping the server")
close(stopChan)
wg.Wait()
}
Expand Down
23 changes: 17 additions & 6 deletions grpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/percona/mongodb-backup/bsonfile"
"github.com/percona/mongodb-backup/internal/backup/dumper"
"github.com/percona/mongodb-backup/internal/cluster"
"github.com/percona/mongodb-backup/internal/loghook"
"github.com/percona/mongodb-backup/internal/oplog"
"github.com/percona/mongodb-backup/internal/restore"
pb "github.com/percona/mongodb-backup/proto/messages"
Expand Down Expand Up @@ -51,6 +52,7 @@ type Client struct {
//
streamLock *sync.Mutex
stream pb.Messages_MessagesChatClient
logStream pb.Messages_LoggingClient
}

type ConnectionOptions struct {
Expand Down Expand Up @@ -200,7 +202,18 @@ func (c *Client) connect() {
c.lock.Lock()
c.running = true
c.lock.Unlock()
return

// Hook the gRPC logging stream into logrus.
// By doing this, all regular logrus calls will also send the log entries to the gRPC server
logStream, err := c.grpcClient.Logging(c.ctx)
if err != nil {
log.Errorf("Cannot start gRPC logging stream: %s", err)
return
}
c.logStream = logStream
logrusHook := loghook.NewGrpcLogging(c.id, c.logStream)
c.logger.AddHook(logrusHook)
return // remember we are in a reconnect for loop and we need to exit it
}
}

Expand Down Expand Up @@ -273,8 +286,7 @@ func (c *Client) processIncommingServerMessages() {

//var response *pb.ClientMessage

c.logger.Debugf("Client %s -> incoming message: %+v", c.nodeName, msg)
c.logger.Infof("Client %s -> incoming message: %+v", c.nodeName, msg)
c.logger.Debugf("Incoming message: %+v", msg)
switch msg.Payload.(type) {
case *pb.ServerMessage_GetStatusMsg:
c.processStatus()
Expand Down Expand Up @@ -415,8 +427,7 @@ func (c *Client) sendRestoreComplete(err error) error {
}

func (c *Client) processStartBackup(msg *pb.StartBackup) {
c.logger.Infof("%s: Received StartBackup command", c.nodeName)
c.logger.Debugf("%s: Received start backup command: %+v", c.nodeName, *msg)
c.logger.Info("Received StartBackup command")

c.lock.Lock()
defer c.lock.Unlock()
Expand Down Expand Up @@ -461,7 +472,7 @@ func (c *Client) processStartBalancer() (*pb.ClientMessage, error) {
if err := balancer.Start(); err != nil {
return nil, err
}
c.logger.Debugf("Balancer has been started by %s", c.nodeName)
c.logger.Debugf("Balancer has been started by me")

out := &pb.ClientMessage{
ClientID: c.id,
Expand Down
45 changes: 43 additions & 2 deletions grpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
EVENT_BACKUP_FINISH = iota
EVENT_OPLOG_FINISH
EVENT_RESTORE_FINISH
logBufferSize = 500
)

type MessagesServer struct {
Expand All @@ -35,15 +36,30 @@ type MessagesServer struct {
err error
//
workDir string
clientLoggingEnabled bool
lastBackupMetadata *BackupMetadata
clientDisconnetedChan chan string
dbBackupFinishChan chan interface{}
oplogBackupFinishChan chan interface{}
restoreFinishChan chan interface{}
clientsLogChan chan *pb.LogEntry
logger *logrus.Logger
}

func NewMessagesServer(workDir string, logger *logrus.Logger) *MessagesServer {
messagesServer := newMessagesServer(workDir, logger)
go messagesServer.handleClientDisconnection()
return messagesServer
}

func NewMessagesServerWithClientLogging(workDir string, logger *logrus.Logger) *MessagesServer {
messagesServer := newMessagesServer(workDir, logger)
messagesServer.clientLoggingEnabled = true
go messagesServer.handleClientDisconnection()
return messagesServer
}

func newMessagesServer(workDir string, logger *logrus.Logger) *MessagesServer {
if logger == nil {
logger = logrus.New()
logger.SetLevel(logrus.StandardLogger().Level)
Expand All @@ -62,6 +78,7 @@ func NewMessagesServer(workDir string, logger *logrus.Logger) *MessagesServer {
clients: make(map[string]*Client),
clientDisconnetedChan: make(chan string),
stopChan: make(chan struct{}),
clientsLogChan: make(chan *pb.LogEntry, logBufferSize),
dbBackupFinishChan: bfc,
oplogBackupFinishChan: ofc,
restoreFinishChan: rbf,
Expand All @@ -70,8 +87,6 @@ func NewMessagesServer(workDir string, logger *logrus.Logger) *MessagesServer {
logger: logger,
}

go messagesServer.handleClientDisconnection()

return messagesServer
}

Expand Down Expand Up @@ -455,6 +470,32 @@ func (s *MessagesServer) DBBackupFinished(ctx context.Context, msg *pb.DBBackupF
return &pb.Ack{}, nil
}

func (s *MessagesServer) Logging(stream pb.Messages_LoggingServer) error {
for {
msg, err := stream.Recv()
if err != nil {
return err
}
level := logrus.Level(msg.GetLevel())
logLine := fmt.Sprintf("-> Client: %s, %+v", msg.GetClientID(), strings.TrimSpace(msg.GetMessage()))
switch level {
case logrus.PanicLevel:
s.logger.Panicf(logLine)
case logrus.FatalLevel:
s.logger.Fatalf(logLine)
case logrus.ErrorLevel:
s.logger.Errorf(logLine)
case logrus.WarnLevel:
s.logger.Warnf(logLine)
case logrus.InfoLevel:
s.logger.Infof(logLine)
case logrus.DebugLevel:
s.logger.Debugf(logLine)
}
}
return nil
}

// MessagesChat is the method exposed by gRPC to stream messages between the server and agents
func (s *MessagesServer) MessagesChat(stream pb.Messages_MessagesChatServer) error {
msg, err := stream.Recv()
Expand Down
Loading