Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 57 additions & 65 deletions grpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ type flusher interface {
}

type Client struct {
clientID string
id string
ctx context.Context
replicasetName string
replicasetID string
nodeType pb.NodeType
nodeName string
clusterID string
backupDir string
doneChan chan struct{}
mongoDumper *dumper.Mongodump
oplogTailer *oplog.OplogTail
mdbSession *mgo.Session
Expand All @@ -44,8 +43,10 @@ type Client struct {
sslOpts SSLOptions
logger *logrus.Logger
//
lock *sync.Mutex
status pb.Status
lock *sync.Mutex
running bool
status pb.Status
//
streamLock *sync.Mutex
stream pb.Messages_MessagesChatClient
}
Expand Down Expand Up @@ -96,7 +97,6 @@ func NewClient(ctx context.Context, backupDir string, mdbConnOpts ConnectionOpti
// Safe Safe
FailFast: true,
Direct: true,
Source: "admin",
}
mdbSession, err := mgo.DialWithInfo(di)

Expand Down Expand Up @@ -132,37 +132,8 @@ func NewClient(ctx context.Context, backupDir string, mdbConnOpts ConnectionOpti
return nil, errors.Wrap(err, "cannot connect to the gRPC server")
}

// m := &pb.ClientMessage{
// ClientID: nodeName,
// Type: pb.ClientMessage_REGISTER,
// Payload: &pb.ClientMessage_RegisterMsg{
// RegisterMsg: &pb.Register{
// NodeType: nodeType,
// NodeName: nodeName,
// ClusterID: clusterIDString,
// ReplicasetName: replicasetName,
// ReplicasetID: replicasetID,
// BackupDir: backupDir,
// },
// },
// }

// logger.Infof("Registering node ...")
// if err := stream.Send(m); err != nil {
// return nil, errors.Wrap(err, "Failed to send registration message")
// }

// response, err := stream.Recv()
// if err != nil {
// return nil, errors.Wrap(err, "Error while receiving the registration response from the server")
// }
// if response.Type != pb.ServerMessage_REGISTRATION_OK {
// return nil, fmt.Errorf("Invalid registration response type: %d", response.Type)
// }
// logger.Infof("Node registration OK.")

c := &Client{
clientID: nodeName,
id: nodeName,
ctx: ctx,
backupDir: backupDir,
replicasetName: replicasetName,
Expand All @@ -180,7 +151,7 @@ func NewClient(ctx context.Context, backupDir string, mdbConnOpts ConnectionOpti
sslOpts: mdbSSLOpts,
logger: logger,
lock: &sync.Mutex{},
doneChan: make(chan struct{}),
running: true,
// This lock is used to sync the access to the stream Send() method.
// For example, if the backup is running, we can receive a Ping request from
// the server but while we are sending the Ping response, the backup can finish
Expand All @@ -201,7 +172,21 @@ func NewClient(ctx context.Context, backupDir string, mdbConnOpts ConnectionOpti
return c, nil
}

func (c *Client) ID() string {
return c.id
}

func (c *Client) isRunning() bool {
c.lock.Lock()
defer c.lock.Unlock()
return c.running
}

func (c *Client) connect() {
if !c.isRunning() {
return
}

for {
log.Infof("Reconnecting with the gRPC server")
stream, err := c.grpcClient.MessagesChat(c.ctx)
Expand All @@ -210,11 +195,18 @@ func (c *Client) connect() {
continue
}
c.stream = stream
c.lock.Lock()
c.running = true
c.lock.Unlock()
return
}
}

func (c *Client) register() error {
if !c.isRunning() {
return fmt.Errorf("gRPC stream is closed. Cannot register the client (%s)", c.id)
}

m := &pb.ClientMessage{
ClientID: c.nodeName,
Type: pb.ClientMessage_REGISTER,
Expand Down Expand Up @@ -245,12 +237,16 @@ func (c *Client) register() error {
return nil
}

func (c *Client) Done() <-chan struct{} {
return c.doneChan
}
func (c *Client) Stop() error {
if !c.isRunning() {
return fmt.Errorf("Client is not running")
}

c.lock.Lock()
defer c.lock.Unlock()
c.running = false

func (c *Client) Stop() {
c.stream.CloseSend()
return c.stream.CloseSend()
}

func (c *Client) IsDBBackupRunning() bool {
Expand Down Expand Up @@ -314,7 +310,7 @@ func (c *Client) processPing() {
c.logger.Debug("Received Ping command")
msg := &pb.ClientMessage{
Type: pb.ClientMessage_PONG,
ClientID: c.clientID,
ClientID: c.id,
Payload: &pb.ClientMessage_PingMsg{PingMsg: &pb.Pong{Timestamp: time.Now().Unix()}},
}
if err := c.streamSend(msg); err != nil {
Expand Down Expand Up @@ -347,7 +343,6 @@ func (c *Client) processStopBalancer() (*pb.ClientMessage, error) {
}

func (c *Client) processStartBackup(msg *pb.StartBackup) {
fmt.Printf("Starting backup on client: %s\n", c.nodeName)
c.logger.Infof("%s: Received StartBackup command", c.nodeName)
c.logger.Debugf("%s: Received start backup command: %+v", c.nodeName, *msg)

Expand Down Expand Up @@ -381,7 +376,7 @@ func (c *Client) processStartBackup(msg *pb.StartBackup) {

response := &pb.ClientMessage{
Type: pb.ClientMessage_ACK,
ClientID: c.clientID,
ClientID: c.id,
Payload: &pb.ClientMessage_AckMsg{AckMsg: &pb.Ack{}},
}
if err = c.streamSend(response); err != nil {
Expand All @@ -392,7 +387,7 @@ func (c *Client) processStartBackup(msg *pb.StartBackup) {
func (c *Client) sendACK() {
response := &pb.ClientMessage{
Type: pb.ClientMessage_ACK,
ClientID: c.clientID,
ClientID: c.id,
Payload: &pb.ClientMessage_AckMsg{AckMsg: &pb.Ack{}},
}
if err := c.streamSend(response); err != nil {
Expand All @@ -403,7 +398,7 @@ func (c *Client) sendACK() {
func (c *Client) sendError(err error) {
response := &pb.ClientMessage{
Type: pb.ClientMessage_ERROR,
ClientID: c.clientID,
ClientID: c.id,
Payload: &pb.ClientMessage_ErrorMsg{ErrorMsg: &pb.Error{Message: err.Error()}},
}
if err := c.streamSend(response); err != nil {
Expand All @@ -420,7 +415,7 @@ func (c *Client) runOplogBackup(msg *pb.StartBackup, extension string) {
fw, err := os.Create(path.Join(c.backupDir, msg.GetDestinationName()+extension))
if err != nil {
finishMsg := &pb.OplogBackupFinishStatus{
ClientID: c.clientID,
ClientID: c.id,
OK: false,
Ts: time.Now().Unix(),
Error: fmt.Sprintf("Cannot create destination file: %s", err),
Expand Down Expand Up @@ -448,7 +443,7 @@ func (c *Client) runOplogBackup(msg *pb.StartBackup, extension string) {
if err != nil {
c.logger.Errorf("Cannot open the oplog tailer: %s", err)
finishMsg := &pb.OplogBackupFinishStatus{
ClientID: c.clientID,
ClientID: c.id,
OK: false,
Ts: time.Now().Unix(),
Error: fmt.Sprintf("Cannot open the oplog tailer: %s", err),
Expand All @@ -464,7 +459,7 @@ func (c *Client) runOplogBackup(msg *pb.StartBackup, extension string) {
c.setOplogBackupRunning(false)
c.logger.Errorf("Error while copying data from the oplog tailer: %s", err)
finishMsg := &pb.OplogBackupFinishStatus{
ClientID: c.clientID,
ClientID: c.id,
OK: false,
Ts: time.Now().Unix(),
Error: fmt.Sprintf("Cannot open the oplog tailer: %s", err),
Expand Down Expand Up @@ -493,7 +488,7 @@ func (c *Client) runOplogBackup(msg *pb.StartBackup, extension string) {
err := fmt.Errorf("Cannot flush/close oplog chained writer: %s", err)
c.logger.Error(err)
finishMsg := &pb.OplogBackupFinishStatus{
ClientID: c.clientID,
ClientID: c.id,
OK: false,
Ts: time.Now().Unix(),
Error: err.Error(),
Expand All @@ -505,7 +500,7 @@ func (c *Client) runOplogBackup(msg *pb.StartBackup, extension string) {

c.logger.Info("Oplog backup completed")
finishMsg := &pb.OplogBackupFinishStatus{
ClientID: c.clientID,
ClientID: c.id,
OK: true,
Ts: time.Now().Unix(),
Error: "",
Expand Down Expand Up @@ -597,7 +592,7 @@ func (c *Client) runDBBackup(msg *pb.StartBackup, extension string) {

func (c *Client) sendBackupError(errMsg string) {
finishMsg := &pb.DBBackupFinishStatus{
ClientID: c.clientID,
ClientID: c.id,
OK: false,
Ts: 0,
Error: errMsg,
Expand All @@ -615,7 +610,7 @@ func (c *Client) sendBackupFinishOK() {
if err != nil {
log.Errorf("cannot get LastWrite.OpTime.Ts from MongoDB: %s", err)
finishMsg := &pb.DBBackupFinishStatus{
ClientID: c.clientID,
ClientID: c.id,
OK: false,
Ts: 0,
Error: err.Error(),
Expand All @@ -625,7 +620,7 @@ func (c *Client) sendBackupFinishOK() {
}

finishMsg := &pb.DBBackupFinishStatus{
ClientID: c.clientID,
ClientID: c.id,
OK: true,
Ts: int64(ismaster.IsMasterDoc().LastWrite.OpTime.Ts),
Error: "",
Expand All @@ -651,24 +646,21 @@ func (c *Client) setOplogBackupRunning(status bool) {
}

func (c *Client) processStopOplogTail(msg *pb.StopOplogTail) {
fmt.Printf("Received StopOplogTail command for client: %s\n", c.clientID)
c.logger.Debugf("Received StopOplogTail command for client: %s", c.clientID)
c.logger.Debugf("Received StopOplogTail command for client: %s", c.id)
out := &pb.ClientMessage{
Type: pb.ClientMessage_ACK,
ClientID: c.clientID,
ClientID: c.id,
Payload: &pb.ClientMessage_AckMsg{AckMsg: &pb.Ack{}},
}
c.logger.Debugf("Sending ACK message to the gRPC server")
fmt.Printf("Sending ACK message to the gRPC server")
c.streamSend(out)
fmt.Println(">>>>> 1")

c.setOplogBackupRunning(false)

if err := c.oplogTailer.CloseAt(bson.MongoTimestamp(msg.GetTs())); err != nil {
c.logger.Errorf("Cannot stop the oplog tailer: %s", err)
finishMsg := &pb.OplogBackupFinishStatus{
ClientID: c.clientID,
ClientID: c.id,
OK: false,
Ts: time.Now().Unix(),
Error: fmt.Sprintf("Cannot close the oplog tailer: %s", err),
Expand All @@ -683,7 +675,7 @@ func (c *Client) processStopOplogTail(msg *pb.StopOplogTail) {
}

finishMsg := &pb.OplogBackupFinishStatus{
ClientID: c.clientID,
ClientID: c.id,
OK: true,
Ts: time.Now().Unix(),
Error: "",
Expand All @@ -702,7 +694,7 @@ func (c *Client) processStatus() {

msg := &pb.ClientMessage{
Type: pb.ClientMessage_STATUS,
ClientID: c.clientID,
ClientID: c.id,
Payload: &pb.ClientMessage_StatusMsg{
StatusMsg: &pb.Status{
RunningDBBackUp: c.status.RunningDBBackUp,
Expand Down Expand Up @@ -736,7 +728,7 @@ func (c *Client) processGetBackupSource() {
if err != nil {
msg := &pb.ClientMessage{
Type: pb.ClientMessage_BACKUP_SOURCE,
ClientID: c.clientID,
ClientID: c.id,
Payload: &pb.ClientMessage_BackupSourceMsg{BackupSourceMsg: &pb.BackupSource{SourceClient: c.nodeName}},
}
c.logger.Debugf("Sending GetBackupSource response to the RPC server: %+v", *msg)
Expand All @@ -749,7 +741,7 @@ func (c *Client) processGetBackupSource() {
c.logger.Errorf("Cannot get a backup source winner: %s", err)
msg := &pb.ClientMessage{
Type: pb.ClientMessage_ERROR,
ClientID: c.clientID,
ClientID: c.id,
Payload: &pb.ClientMessage_ErrorMsg{ErrorMsg: &pb.Error{Message: fmt.Sprintf("Cannot get backoup source: %s", err)}},
}
c.logger.Debugf("Sending error response to the RPC server: %+v", *msg)
Expand All @@ -764,7 +756,7 @@ func (c *Client) processGetBackupSource() {

msg := &pb.ClientMessage{
Type: pb.ClientMessage_BACKUP_SOURCE,
ClientID: c.clientID,
ClientID: c.id,
Payload: &pb.ClientMessage_BackupSourceMsg{BackupSourceMsg: &pb.BackupSource{SourceClient: winner}},
}
c.logger.Debugf("%s -> Sending GetBackupSource response to the RPC server: %+v (winner: %q)", c.nodeName, *msg, winner)
Expand Down
Loading