Skip to content

Commit

Permalink
PBM-714 fix: properly show backup status in case of no progress (#595)
Browse files Browse the repository at this point in the history
* PBM-714 fix: properly show backup status in case of no progress

If a backup has no progress hence status is neither `done` nor `error` and
there are no `hb` updates for a while (StaleFrameSec) it should be
PBM-714 fix: properly show backup status in case of no progress

If a backup has no progress hence status is neither `done` nor `error` and
there are no `hb` updates for a while (StaleFrameSec) it should be
properly marked in `pbm status`. Before this commit for ex. backup that had no
election progress, stuck in `starting` with empty replsets was marked as
"Backup has no data for the config server or sole replicaset" by
`bcpsMatchCluster`. Moreover only `done` backup should be processed by
`bcpsMatchCluster`.

Also, added more debug logging.

* PBM-714: add log flag to show opid in the output

* PBM-714: send backup heartbeats during election
  • Loading branch information
dAdAbird authored Sep 1, 2021
1 parent a27e3c0 commit 22fd155
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 18 deletions.
14 changes: 7 additions & 7 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (a *Agent) Delete(d pbm.DeleteBackupCmd, opid pbm.OPID, ep pbm.Epoch) {
Epoch: &epts,
}, pbm.LockOpCollection)

got, err := a.aquireLock(lock)
got, err := a.aquireLock(lock, l)
if err != nil {
l.Error("acquire lock: %v", err)
return
Expand Down Expand Up @@ -198,7 +198,7 @@ func (a *Agent) DeletePITR(d pbm.DeletePITRCmd, opid pbm.OPID, ep pbm.Epoch) {
Epoch: &epts,
}, pbm.LockOpCollection)

got, err := a.aquireLock(lock)
got, err := a.aquireLock(lock, l)
if err != nil {
l.Error("acquire lock: %v", err)
return
Expand Down Expand Up @@ -257,7 +257,7 @@ func (a *Agent) ResyncStorage(opid pbm.OPID, ep pbm.Epoch) {
Epoch: &epts,
})

got, err := a.aquireLock(lock)
got, err := a.aquireLock(lock, l)
if err != nil {
l.Error("acquiring lock: %v", err)
return
Expand Down Expand Up @@ -293,19 +293,19 @@ func (a *Agent) ResyncStorage(opid pbm.OPID, ep pbm.Epoch) {

// aquireLock tries to aquire the lock. If there is a stale lock
// it tries to mark op that held the lock (backup, [pitr]restore) as failed.
func (a *Agent) aquireLock(l *pbm.Lock) (got bool, err error) {
func (a *Agent) aquireLock(l *pbm.Lock, lg *log.Event) (got bool, err error) {
got, err = l.Acquire()
if err == nil {
return got, nil
}

switch err.(type) {
case pbm.ErrDuplicateOp, pbm.ErrConcurrentOp:
a.log.Debug("", "", l.OPID, *l.Epoch, "get lock: %v", err)
lg.Debug("get lock: %v", err)
return false, nil
case pbm.ErrWasStaleLock:
lk := err.(pbm.ErrWasStaleLock).Lock
a.log.Debug("", "", l.OPID, *l.Epoch, "stale lock: %v", lk)
lg.Debug("stale lock: %v", lk)
var fn func(opid string) error
switch lk.Type {
case pbm.CmdBackup:
Expand All @@ -317,7 +317,7 @@ func (a *Agent) aquireLock(l *pbm.Lock) (got bool, err error) {
}
merr := fn(lk.OPID)
if merr != nil {
a.log.Warning("", "", "", *l.Epoch, "failed to mark stale op '%s' as failed: %v", lk.OPID, merr)
lg.Warning("failed to mark stale op '%s' as failed: %v", lk.OPID, merr)
}
return l.Acquire()
default:
Expand Down
4 changes: 2 additions & 2 deletions agent/pitr.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (a *Agent) pitr() (err error) {
Epoch: &epts,
})

got, err := a.aquireLock(lock)
got, err := a.aquireLock(lock, l)
if err != nil {
return errors.Wrap(err, "acquiring lock")
}
Expand Down Expand Up @@ -258,7 +258,7 @@ func (a *Agent) PITRestore(r pbm.PITRestoreCmd, opid pbm.OPID, ep pbm.Epoch) {
Epoch: &epts,
})

got, err := a.aquireLock(lock)
got, err := a.aquireLock(lock, l)
if err != nil {
l.Error("acquiring lock: %v", err)
return
Expand Down
13 changes: 10 additions & 3 deletions agent/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (a *Agent) Backup(cmd pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) {
l.Error("init meta: %v", err)
return
}
l.Debug("init backup meta")
nodes, err := a.pbm.BcpNodesPriority()
if err != nil {
l.Error("get nodes priority: %v", err)
Expand Down Expand Up @@ -164,7 +165,7 @@ func (a *Agent) Backup(cmd pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) {
Epoch: &epts,
})

got, err := a.aquireLock(lock)
got, err := a.aquireLock(lock, l)
if err != nil {
l.Error("acquiring lock: %v", err)
return
Expand Down Expand Up @@ -207,7 +208,7 @@ func (a *Agent) Backup(cmd pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) {
const renominationFrame = 5 * time.Second

func (a *Agent) nominateRS(bcp, rs string, nodes [][]string, l *log.Event) error {
l.Debug("nomination %s: %v", rs, nodes)
l.Debug("nomination list for %s: %v", rs, nodes)
err := a.pbm.SetRSNomination(bcp, rs)
if err != nil {
return errors.Wrap(err, "set nomination meta")
Expand All @@ -227,6 +228,12 @@ func (a *Agent) nominateRS(bcp, rs string, nodes [][]string, l *log.Event) error
if err != nil {
return errors.Wrap(err, "set nominees")
}
l.Debug("nomination %s, set candidates %v", rs, n)

err = a.pbm.BackupHB(bcp)
if err != nil {
l.Warning("send heartbeat: %v", err)
}

time.Sleep(renominationFrame)
}
Expand Down Expand Up @@ -288,7 +295,7 @@ func (a *Agent) Restore(r pbm.RestoreCmd, opid pbm.OPID, ep pbm.Epoch) {
Epoch: &epts,
})

got, err := a.aquireLock(lock)
got, err := a.aquireLock(lock, l)
if err != nil {
l.Error("acquiring lock: %v", err)
return
Expand Down
4 changes: 4 additions & 0 deletions cli/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ func bcpsMatchCluster(bcps []pbm.BackupMeta, shards []pbm.Shard, confsrv string)
}

func bcpMatchCluster(bcp *pbm.BackupMeta, shards map[string]struct{}, confsrv string, nomatch *[]string) {
if bcp.Status != pbm.StatusDone {
return
}

hasconfsrv := false
for _, rs := range bcp.Replsets {
if _, ok := shards[rs.Name]; !ok {
Expand Down
3 changes: 3 additions & 0 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type logsOpts struct {
severity string
event string
opid string
extr bool
}

func Main() {
Expand Down Expand Up @@ -102,6 +103,7 @@ func Main() {
logsCmd.Flag("severity", "Severity level D, I, W, E or F, low to high. Choosing one includes higher levels too.").Short('s').Default("I").EnumVar(&logs.severity, "D", "I", "W", "E", "F")
logsCmd.Flag("event", "Event in format backup[/2020-10-06T11:45:14Z]. Events: backup, restore, cancelBackup, resyncBcpList, pitr, pitrestore, delete").Short('e').StringVar(&logs.event)
logsCmd.Flag("opid", "Operation ID").Short('i').StringVar(&logs.opid)
logsCmd.Flag("extra", "Show extra data in text format").Hidden().Short('x').BoolVar(&logs.extr)

statusCmd := pbmCmd.Command("status", "Show PBM status")
statusSection := statusCmd.Flag("sections", "Sections of status to display <cluster>/<pitr>/<running>/<backups>.").Short('s').Enums("cluster", "pitr", "running", "backups")
Expand Down Expand Up @@ -263,6 +265,7 @@ func runLogs(cn *pbm.PBM, l *logsOpts) (fmt.Stringer, error) {
}

o.ShowNode = r.Node == ""
o.Extr = l.extr

// reverse list
for i := len(o.Data)/2 - 1; i >= 0; i-- {
Expand Down
10 changes: 10 additions & 0 deletions cli/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,11 @@ func getStorageStat(cn *pbm.PBM) (fmt.Stringer, error) {
return s, errors.Wrap(err, "get storage")
}

now, err := cn.ClusterTime()
if err != nil {
return nil, errors.Wrap(err, "get cluster time")
}

for _, bcp := range bcps {
snpsht := snapshotStat{
Name: bcp.Name,
Expand All @@ -548,6 +553,11 @@ func getStorageStat(cn *pbm.PBM) (fmt.Stringer, error) {
snpsht.Size = sz
case pbm.StatusError:
snpsht.Err = bcp.Error
default:
if bcp.Hb.T+pbm.StaleFrameSec < now.T {
snpsht.Err = fmt.Sprintf("Backup stuck at `%v` stage, last beat ts: %d", bcp.Status, bcp.Hb.T)
snpsht.Status = pbm.StatusError
}
}
s.Snapshot = append(s.Snapshot, snpsht)
}
Expand Down
2 changes: 1 addition & 1 deletion pbm/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (b *Backup) run(ctx context.Context, bcp pbm.BackupCmd, opid pbm.OPID, l *p

err = b.setClusterFirstWrite(bcp.Name)
if err != nil {
return errors.Wrap(err, "set cluster last write ts")
return errors.Wrap(err, "set cluster first write ts")
}
}

Expand Down
12 changes: 8 additions & 4 deletions pbm/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ func tsUTC(ts int64) string {
}

func (e *Entry) String() (s string) {
return e.string(tsLocal, false)
return e.string(tsLocal, false, false)
}

func (e *Entry) StringNode() (s string) {
return e.string(tsLocal, true)
return e.string(tsLocal, true, false)
}

type tsformatf func(ts int64) string

func (e *Entry) string(f tsformatf, showNode bool) (s string) {
func (e *Entry) string(f tsformatf, showNode, extr bool) (s string) {
node := ""
if showNode {
node = " [" + e.RS + "/" + e.Node + "]"
Expand All @@ -78,6 +78,9 @@ func (e *Entry) string(f tsformatf, showNode bool) (s string) {
if e.ObjName != "" {
id = append(id, e.ObjName)
}
if extr {
id = append(id, e.OPID)
}
s = fmt.Sprintf("%s %s%s [%s] %s", f(e.TS), e.Severity, node, strings.Join(id, "/"), e.Msg)
} else {
s = fmt.Sprintf("%s %s%s %s", f(e.TS), e.Severity, node, e.Msg)
Expand Down Expand Up @@ -252,6 +255,7 @@ type LogRequest struct {
type Entries struct {
Data []Entry `json:"data"`
ShowNode bool `json:"-"`
Extr bool `json:"-"`
}

func (e Entries) MarshalJSON() ([]byte, error) {
Expand All @@ -260,7 +264,7 @@ func (e Entries) MarshalJSON() ([]byte, error) {

func (e Entries) String() (s string) {
for _, entry := range e.Data {
s += entry.string(tsUTC, e.ShowNode) + "\n"
s += entry.string(tsUTC, e.ShowNode, e.Extr) + "\n"
}

return s
Expand Down
5 changes: 4 additions & 1 deletion pbm/rsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (p *PBM) ResyncStorage(l *log.Event) error {
if err != nil {
return errors.Wrap(err, "get a backups list from the storage")
}
l.Debug("got backups list: %v", len(bcps))

err = p.moveCollection(BcpCollection, BcpOldCollection)
if err != nil {
Expand All @@ -49,6 +50,8 @@ func (p *PBM) ResyncStorage(l *log.Event) error {

var ins []interface{}
for _, b := range bcps {
l.Debug("bcp: %v", b.Name)

d, err := stg.SourceReader(b.Name)
if err != nil {
return errors.Wrapf(err, "read meta for %v", b.Name)
Expand All @@ -58,7 +61,7 @@ func (p *PBM) ResyncStorage(l *log.Event) error {
err = json.NewDecoder(d).Decode(&v)
d.Close()
if err != nil {
return errors.Wrap(err, "unmarshal backup meta")
return errors.Wrapf(err, "unmarshal backup meta [%s]", b.Name)
}
err = checkBackupFiles(&v, stg)
if err != nil {
Expand Down

0 comments on commit 22fd155

Please sign in to comment.