Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cli/pbm-agent/run-agents.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ run_agents() {
echo " --backup-dir=${backupdir} \\"
echo " --pid-file=${pidfile} &> ${logfile} &"

./pbm-agent --mongodb-user=${TEST_MONGODB_USERNAME} \
./pbm-agent --mongodb-username=${TEST_MONGODB_USERNAME} \
--mongodb-password=${TEST_MONGODB_PASSWORD} \
--mongodb-host=${TEST_MONGODB_HOST} \
--mongodb-port=${port} \
--replicaset=${replicaset} \
--mongodb-replicaset=${replicaset} \
--backup-dir=${backupdir} \
--pid-file=${pidfile} &> ${logfile} &
pid=$!
Expand Down
23 changes: 15 additions & 8 deletions cli/pbm-coordinator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,18 @@ type cliOptions struct {
TLSCertFile string `yaml:"tls_cert_file"`
TLSKeyFile string `yaml:"tls_key_file"`
EnableClientsLogging bool `yaml:"enable_clients_logging"`
ClientsRefreshSecs int `yaml:"clients_refresh_secs"`
ShutdownTimeout int `yaml:"shutdown_timeout"`
}

const (
defaultGrpcPort = 10000
defaultAPIPort = 10001
defaultShutdownTimeout = 5 // Seconds
defaultClientsLogging = true
defaultDebugMode = false
defaultWorkDir = "~/percona-backup-mongodb"
defaultGrpcPort = 10000
defaultAPIPort = 10001
defaultClientsRefreshSecs = 60 // Seconds
defaultShutdownTimeout = 5 // Seconds
defaultClientsLogging = true
defaultDebugMode = false
defaultWorkDir = "~/percona-backup-mongodb"
)

var (
Expand Down Expand Up @@ -118,9 +120,9 @@ func main() {
var messagesServer *server.MessagesServer
grpcServer := grpc.NewServer(grpcOpts...)
if opts.EnableClientsLogging {
messagesServer = server.NewMessagesServerWithClientLogging(opts.WorkDir, log)
messagesServer = server.NewMessagesServerWithClientLogging(opts.WorkDir, opts.ClientsRefreshSecs, log)
} else {
messagesServer = server.NewMessagesServer(opts.WorkDir, log)
messagesServer = server.NewMessagesServer(opts.WorkDir, opts.ClientsRefreshSecs, log)
}
pb.RegisterMessagesServer(grpcServer, messagesServer)

Expand Down Expand Up @@ -194,6 +196,7 @@ func processCliParams() (*cliOptions, error) {
app.Flag("grpc-port", "Listening port for gRPC client connections").IntVar(&opts.GrpcPort)
app.Flag("api-bindip", "Bind IP for API client connections").StringVar(&opts.APIBindIP)
app.Flag("api-port", "Listening port for API client connections").IntVar(&opts.APIPort)
app.Flag("clients-refresh-secs", "Frequency in seconds to refresh state of clients").IntVar(&opts.ClientsRefreshSecs)
app.Flag("enable-clients-logging", "Enable showing logs coming from agents on the server side").BoolVar(&opts.EnableClientsLogging)
app.Flag("shutdown-timeout", "Server shutdown timeout").IntVar(&opts.ShutdownTimeout)
//
Expand All @@ -212,6 +215,7 @@ func processCliParams() (*cliOptions, error) {
ShutdownTimeout: defaultShutdownTimeout,
Debug: defaultDebugMode,
WorkDir: defaultWorkDir,
ClientsRefreshSecs: defaultClientsRefreshSecs,
EnableClientsLogging: defaultClientsLogging,
}
if opts.configFile != "" {
Expand Down Expand Up @@ -273,6 +277,9 @@ func mergeOptions(opts, yamlOpts *cliOptions) {
if opts.ShutdownTimeout != 0 {
yamlOpts.ShutdownTimeout = opts.ShutdownTimeout
}
if opts.ClientsRefreshSecs != 0 {
yamlOpts.ClientsRefreshSecs = opts.ClientsRefreshSecs
}
if opts.EnableClientsLogging != false {
yamlOpts.EnableClientsLogging = opts.EnableClientsLogging
}
Expand Down
3 changes: 3 additions & 0 deletions grpc/server/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,13 @@ func (c *Client) ping() error {
}
pongMsg := msg.GetPongMsg()
c.statusLock.Lock()
c.LastSeen = time.Now()
c.NodeType = pongMsg.GetNodeType()
c.ReplicasetUUID = pongMsg.GetReplicaSetUuid()
c.ReplicasetVersion = pongMsg.GetReplicaSetVersion()
c.isTailing = pongMsg.GetIsTailing()
c.isPrimary = pongMsg.GetIsPrimary()
c.isSecondary = pongMsg.GetIsSecondary()
c.lastTailedTimestamp = pongMsg.GetLastTailedTimestamp()
c.statusLock.Unlock()

Expand Down
72 changes: 49 additions & 23 deletions grpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ type MessagesServer struct {
lock *sync.Mutex
clients map[string]*Client
// Current backup status
replicasRunningBackup map[string]bool // Key is ReplicasetUUID
lastOplogTs int64 // Timestamp in Unix format
backupRunning bool
oplogBackupRunning bool
restoreRunning bool
err error
clientsRefreshInterval time.Duration
replicasRunningBackup map[string]bool // Key is ReplicasetUUID
lastOplogTs int64 // Timestamp in Unix format
backupRunning bool
oplogBackupRunning bool
restoreRunning bool
err error
//
workDir string
clientLoggingEnabled bool
Expand All @@ -51,18 +52,18 @@ type restoreSource struct {
Port string
}

func NewMessagesServer(workDir string, logger *logrus.Logger) *MessagesServer {
messagesServer := newMessagesServer(workDir, logger)
func NewMessagesServer(workDir string, clientsRefreshSecs int, logger *logrus.Logger) *MessagesServer {
messagesServer := newMessagesServer(workDir, clientsRefreshSecs, logger)
return messagesServer
}

func NewMessagesServerWithClientLogging(workDir string, logger *logrus.Logger) *MessagesServer {
messagesServer := newMessagesServer(workDir, logger)
func NewMessagesServerWithClientLogging(workDir string, clientsRefreshSecs int, logger *logrus.Logger) *MessagesServer {
messagesServer := newMessagesServer(workDir, clientsRefreshSecs, logger)
messagesServer.clientLoggingEnabled = true
return messagesServer
}

func newMessagesServer(workDir string, logger *logrus.Logger) *MessagesServer {
func newMessagesServer(workDir string, clientsRefreshSecs int, logger *logrus.Logger) *MessagesServer {
if logger == nil {
logger = logrus.New()
logger.SetLevel(logrus.StandardLogger().Level)
Expand All @@ -77,22 +78,43 @@ func newMessagesServer(workDir string, logger *logrus.Logger) *MessagesServer {
}

messagesServer := &MessagesServer{
lock: &sync.Mutex{},
clients: make(map[string]*Client),
clientDisconnetedChan: make(chan string),
stopChan: make(chan struct{}),
clientsLogChan: make(chan *pb.LogEntry, logBufferSize),
dbBackupFinishChan: bfc,
oplogBackupFinishChan: ofc,
restoreFinishChan: rbf,
replicasRunningBackup: make(map[string]bool),
workDir: workDir,
logger: logger,
}
lock: &sync.Mutex{},
clients: make(map[string]*Client),
clientDisconnetedChan: make(chan string),
clientsRefreshInterval: time.Duration(clientsRefreshSecs) * time.Second,
stopChan: make(chan struct{}),
clientsLogChan: make(chan *pb.LogEntry, logBufferSize),
dbBackupFinishChan: bfc,
oplogBackupFinishChan: ofc,
restoreFinishChan: rbf,
replicasRunningBackup: make(map[string]bool),
workDir: workDir,
logger: logger,
}

go messagesServer.refreshClientsScheduler()

return messagesServer
}

func (s *MessagesServer) refreshClientsScheduler() {
s.logger.Debugf("Starting clients background refresher with interval: %s", s.clientsRefreshInterval)
ticker := time.NewTicker(s.clientsRefreshInterval)
for {
select {
case <-s.stopChan:
s.logger.Debug("Stopping clients background refresher")
ticker.Stop()
return
case <-ticker.C:
err := s.RefreshClients()
if err != nil {
s.logger.Errorf(err.Error())
}
}
}
}

func (s *MessagesServer) BackupSourceNameByReplicaset() (map[string]string, error) {
s.lock.Lock()
defer s.lock.Unlock()
Expand Down Expand Up @@ -388,6 +410,10 @@ func (s *MessagesServer) StartBackup(opts *pb.StartBackup) error {

s.lastBackupMetadata = NewBackupMetadata(opts)

if err := s.RefreshClients(); err != nil {
return errors.Wrapf(err, "cannot refresh clients state for backup")
}

clients, err := s.BackupSourceByReplicaset()
if err != nil {
return errors.Wrapf(err, "Cannot start backup. Cannot find backup source for replicas")
Expand Down
2 changes: 1 addition & 1 deletion internal/testutils/grpc/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewGrpcDaemon(ctx context.Context, workDir string, t *testing.T, logger *lo
d.ctx, d.cancelFunc = context.WithCancel(ctx)
// This is the sever/agents gRPC server
d.grpcServer4Clients = grpc.NewServer(opts...)
d.MessagesServer = server.NewMessagesServer(workDir, logger)
d.MessagesServer = server.NewMessagesServer(workDir, 60, logger)
pb.RegisterMessagesServer(d.grpcServer4Clients, d.MessagesServer)

d.wg.Add(1)
Expand Down