Skip to content
This repository has been archived by the owner on Aug 24, 2022. It is now read-only.

PMM-8673 Send pbm logs to pmm #289

Merged
merged 25 commits into from
Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/percona/exporter_shared v0.7.3
github.com/percona/go-mysql v0.0.0-20200630114833-b77f37c0bfa2
github.com/percona/percona-toolkit v3.2.1+incompatible
github.com/percona/pmm v0.0.0-20210902153818-0f316cc1adf9
github.com/percona/pmm v0.0.0-20211005103724-a3a8d2dfd25d
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/common v0.10.0
Expand All @@ -36,6 +36,7 @@ require (
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013
google.golang.org/grpc v1.32.0
google.golang.org/protobuf v1.25.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/reform.v1 v1.5.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ github.com/percona/go-mysql v0.0.0-20200630114833-b77f37c0bfa2 h1:0tQBti5FIrKfH3
github.com/percona/go-mysql v0.0.0-20200630114833-b77f37c0bfa2/go.mod h1:/SGLf9OMxlnK6jq4mkFiImBcJXXk5jwD+lDrwDaGXcw=
github.com/percona/percona-toolkit v3.2.1+incompatible h1:5jLvtZKcu9fDmaLRB8qA4bLR727t5iYyguHJJQTk9w0=
github.com/percona/percona-toolkit v3.2.1+incompatible/go.mod h1:netQWdWMaF1cnmwiIS+i5uyaqNXz46yNeM6HKkR6yeI=
github.com/percona/pmm v0.0.0-20210902153818-0f316cc1adf9 h1:Zx4JcrldiZPgyTOyLqxIjQyJx+9JDUP3zDNW7ziU8xY=
github.com/percona/pmm v0.0.0-20210902153818-0f316cc1adf9/go.mod h1:OmWayvQAavtvlzLkvpea5tAqaWGGNNyG+xj4MJUsNm4=
github.com/percona/pmm v0.0.0-20211005103724-a3a8d2dfd25d h1:l4FblfFN+4CV3D9DlLwqfW+Qf+FpAQe6NyyfOOpw+Cw=
github.com/percona/pmm v0.0.0-20211005103724-a3a8d2dfd25d/go.mod h1:OmWayvQAavtvlzLkvpea5tAqaWGGNNyG+xj4MJUsNm4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
4 changes: 4 additions & 0 deletions jobs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
"github.com/percona/pmm/api/agentpb"
)

const (
maxLogsChunkSize = 50
)

// Send is interface for function that used by jobs to send messages back to pmm-server.
type Send func(payload agentpb.AgentResponsePayload)

Expand Down
223 changes: 86 additions & 137 deletions jobs/mongodb_backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,38 @@
package jobs

import (
"bytes"
"context"
"io/ioutil"
"io"
"net"
"net/url"
"os"
"os/exec"
"regexp"
"strconv"
"sync/atomic"
"time"

"github.com/golang/protobuf/ptypes"
"github.com/percona/pmm/api/agentpb"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
pbmBin = "pbm"

cmdTimeout = time.Minute
resyncTimeout = 5 * time.Minute
statusCheckInterval = 5 * time.Second
logsCheckInterval = 3 * time.Second
waitForLogs = 2 * logsCheckInterval
)

// This regexp checks that there is no running pbm operations.
var noRunningPBMOperationsRE = regexp.MustCompile(`Currently running:\n=*\n\(none\)`)

// MongoDBBackupJob implements Job from MongoDB backup.
type MongoDBBackupJob struct {
id string
timeout time.Duration
l logrus.FieldLogger
name string
dbURL *url.URL
location BackupLocationConfig
id string
timeout time.Duration
l logrus.FieldLogger
name string
dbURL *url.URL
location BackupLocationConfig
logChunkID uint32
}

// NewMongoDBBackupJob creates new Job for MongoDB backup.
Expand Down Expand Up @@ -83,6 +79,8 @@ func (j *MongoDBBackupJob) Timeout() time.Duration {

// Run starts Job execution.
func (j *MongoDBBackupJob) Run(ctx context.Context, send Send) error {
defer j.sendLog(send, "", true)

if _, err := exec.LookPath(pbmBin); err != nil {
return errors.Wrapf(err, "lookpath: %s", pbmBin)
}
Expand All @@ -97,28 +95,42 @@ func (j *MongoDBBackupJob) Run(ctx context.Context, send Send) error {
}

rCtx, cancel := context.WithTimeout(ctx, resyncTimeout)
if err := waitForNoRunningPBMOperations(rCtx, j.l, j.dbURL); err != nil {
if err := waitForPBMState(rCtx, j.l, j.dbURL, pbmNoRunningOperations); err != nil {
cancel()
return errors.Wrap(err, "failed to wait pbm resync completion")
}
cancel()

if err := j.startBackup(ctx); err != nil {
pbmBackupOut, err := j.startBackup(ctx)
if err != nil {
j.sendLog(send, err.Error(), false)
return errors.Wrap(err, "failed to start backup")
}
streamCtx, streamCancel := context.WithCancel(ctx)
defer streamCancel()
go func() {
err := j.streamLogs(streamCtx, send, pbmBackupOut.Name)
if err != nil && err != io.EOF && err != context.Canceled {
j.l.Errorf("stream logs: %v", err)
}
}()

if err := waitForNoRunningPBMOperations(ctx, j.l, j.dbURL); err != nil {
if err := waitForPBMState(ctx, j.l, j.dbURL, pbmBackupFinished(pbmBackupOut.Name)); err != nil {
j.sendLog(send, err.Error(), false)
return errors.Wrap(err, "failed to wait backup completion")
}

send(&agentpb.JobResult{
JobId: j.id,
Timestamp: ptypes.TimestampNow(),
Timestamp: timestamppb.Now(),
Result: &agentpb.JobResult_MongodbBackup{
MongodbBackup: &agentpb.JobResult_MongoDBBackup{},
},
})

select {
case <-ctx.Done():
case <-time.After(waitForLogs):
artemgavrilov marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

Expand Down Expand Up @@ -147,141 +159,78 @@ func createDBURL(dbConfig DBConnConfig) *url.URL {
}
}

func (j *MongoDBBackupJob) startBackup(ctx context.Context) error {
func (j *MongoDBBackupJob) startBackup(ctx context.Context) (*pbmBackup, error) {
j.l.Info("Starting backup.")
var result pbmBackup

nCtx, cancel := context.WithTimeout(ctx, cmdTimeout)
defer cancel()

output, err := exec.CommandContext(nCtx, pbmBin, "backup", "--mongodb-uri="+j.dbURL.String()).CombinedOutput() // #nosec G204

if err != nil {
return errors.Wrapf(err, "pbm backup error: %s", string(output))
}

return nil
}

func checkRunningPBMOperations(ctx context.Context, l logrus.FieldLogger, dbURL *url.URL) (bool, error) {
l.Debug("Checking running pbm operations.")

nCtx, cancel := context.WithTimeout(ctx, cmdTimeout)
defer cancel()

output, err := exec.CommandContext(nCtx, pbmBin, "status", "--mongodb-uri="+dbURL.String()).CombinedOutput() // #nosec G204
if err != nil {
return false, errors.Wrapf(err, "pbm status error: %s", string(output))
if err := execPBMCommand(ctx, j.dbURL, &result, "backup"); err != nil {
return nil, err
}

return noRunningPBMOperationsRE.Match(output), nil
return &result, nil
}

func waitForNoRunningPBMOperations(ctx context.Context, l logrus.FieldLogger, dbURL *url.URL) error {
l.Info("Waiting for pbm operations completion.")
func (j *MongoDBBackupJob) streamLogs(ctx context.Context, send Send, name string) error {
Dasio marked this conversation as resolved.
Show resolved Hide resolved
var (
err error
logs []pbmLogEntry
buffer bytes.Buffer
skip int
)
j.logChunkID = 0

ticker := time.NewTicker(statusCheckInterval)
ticker := time.NewTicker(logsCheckInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
done, err := checkRunningPBMOperations(ctx, l, dbURL)
logs, err = retrieveLogs(ctx, j.dbURL, "backup/"+name)
if err != nil {
return errors.Wrapf(err, "failed to check running operations")
return err
}

if done {
return nil
// @TODO Replace skip with proper paging after this is done https://jira.percona.com/browse/PBM-713
logs = logs[skip:]
skip += len(logs)
Dasio marked this conversation as resolved.
Show resolved Hide resolved
if len(logs) == 0 {
continue
}
from, to := 0, maxLogsChunkSize
for from < len(logs) {
if to > len(logs) {
to = len(logs)
}
buffer.Reset()
for i, log := range logs[from:to] {
_, err := buffer.WriteString(log.String())
if err != nil {
return err
}
if i != to-from-1 {
buffer.WriteRune('\n')
}
}
j.sendLog(send, buffer.String(), false)
from += maxLogsChunkSize
to += maxLogsChunkSize
}
case <-ctx.Done():
return ctx.Err()
}
}
}

func pbmSetupS3(ctx context.Context, l logrus.FieldLogger, dbURL *url.URL, prefix string, s3Config *S3LocationConfig, resync bool) error {
l.Info("Configuring S3 location.")
nCtx, cancel := context.WithTimeout(ctx, cmdTimeout)
defer cancel()

confFile, err := writePBMConfigFile(prefix, s3Config)
if err != nil {
return errors.WithStack(err)
}
defer os.Remove(confFile) //nolint:errcheck

output, err := exec.CommandContext( //nolint:gosec
nCtx,
pbmBin,
"config",
"--mongodb-uri="+dbURL.String(),
"--file="+confFile,
).CombinedOutput()

if err != nil {
return errors.Wrapf(err, "pbm config error: %s", string(output))
}

if resync {
nCtx, cancel := context.WithTimeout(ctx, cmdTimeout)
defer cancel()

output, err = exec.CommandContext( //nolint:gosec
nCtx,
pbmBin,
"config",
"--mongodb-uri="+dbURL.String(),
"--force-resync",
).CombinedOutput()

if err != nil {
return errors.Wrapf(err, "pbm config error: %s", string(output))
}
}

return nil
}

func writePBMConfigFile(prefix string, s3Config *S3LocationConfig) (string, error) {
tmp, err := ioutil.TempFile("", "pbm-config-*.yml")
if err != nil {
return "", errors.Wrap(err, "failed to create pbm configuration file")
}

var conf struct {
Storage struct {
Type string `yaml:"type"`
S3 struct {
Region string `yaml:"region"`
Bucket string `yaml:"bucket"`
Prefix string `yaml:"prefix"`
EndpointURL string `yaml:"endpointUrl"`
Credentials struct {
AccessKeyID string `yaml:"access-key-id"`
SecretAccessKey string `yaml:"secret-access-key"`
}
} `yaml:"s3"`
} `yaml:"storage"`
}

conf.Storage.Type = "s3"
conf.Storage.S3.EndpointURL = s3Config.Endpoint
conf.Storage.S3.Region = s3Config.BucketRegion
conf.Storage.S3.Bucket = s3Config.BucketName
conf.Storage.S3.Prefix = prefix
conf.Storage.S3.Credentials.AccessKeyID = s3Config.AccessKey
conf.Storage.S3.Credentials.SecretAccessKey = s3Config.SecretKey

bytes, err := yaml.Marshal(&conf)
if err != nil {
tmp.Close() //nolint:errcheck
return "", errors.Wrap(err, "failed to marshall pbm configuration")
}

if _, err := tmp.Write(bytes); err != nil {
tmp.Close() //nolint:errcheck
return "", errors.Wrap(err, "failed to write pbm configuration file")
}

return tmp.Name(), tmp.Close()
func (j *MongoDBBackupJob) sendLog(send Send, data string, done bool) {
send(&agentpb.JobProgress{
JobId: j.id,
Timestamp: timestamppb.Now(),
Result: &agentpb.JobProgress_Logs_{
Logs: &agentpb.JobProgress_Logs{
ChunkId: atomic.AddUint32(&j.logChunkID, 1) - 1,
Data: data,
Done: done,
},
},
})
}
Loading