Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 25 additions & 24 deletions grpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,9 @@ func NewClient(inctx context.Context, in InputOptions) (*Client, error) {
Password: in.DbConnOptions.Password,
Source: in.DbConnOptions.AuthDB,
ReplicaSetName: in.DbConnOptions.ReplicasetName,
AppName: "percona/percona-backup-mongodb",
// ReadPreference *ReadPreference
// Safe Safe
FailFast: true,
Direct: true,
AppName: "percona-backup-mongodb",
FailFast: true,
Direct: true,
}

c := &Client{
Expand Down Expand Up @@ -947,7 +945,14 @@ func (c *Client) processStartBackup(msg *pb.StartBackup) {
return
}
log.Debugf("Starting DB backup")
go c.runDBBackup(msg, sess)
go func() {
err := c.runDBBackup(msg)
if err != nil {
c.sendDBBackupFinishError(fmt.Errorf("cannot check if S3 bucket %q exists: %s", c.backupDir, err))
return
}
c.sendBackupFinishOK()
}()
}

func (c *Client) processStartBalancer() (*pb.ClientMessage, error) {
Expand Down Expand Up @@ -1075,15 +1080,14 @@ func (c *Client) processStopOplogTail(msg *pb.StopOplogTail) {
}
}

func (c *Client) runDBBackup(msg *pb.StartBackup, sess *session.Session) {
func (c *Client) runDBBackup(msg *pb.StartBackup) error {
var err error
c.logger.Info("Starting DB backup")
stg, _ := c.storages.Get(msg.GetStorageName())

bw, err := writer.NewBackupWriter(stg, msg.GetDbBackupName(), msg.GetCompressionType(), msg.GetCypher())
if err != nil {
c.sendDBBackupFinishError(fmt.Errorf("cannot check if S3 bucket %q exists: %s", c.backupDir, err))
return
return err
}

mi := &dumper.MongodumpInput{
Expand All @@ -1100,25 +1104,23 @@ func (c *Client) runDBBackup(msg *pb.StartBackup, sess *session.Session) {

c.mongoDumper, err = dumper.NewMongodump(mi)
if err != nil {
c.logger.Fatalf("Cannot call mongodump: %s", err)
//TODO send error
return err
}

c.setDBBackupRunning(true)
c.mongoDumper.Start()
dumpErr := c.mongoDumper.Wait()
bw.Close()

c.setDBBackupRunning(false)
defer c.setDBBackupRunning(false)

if dumpErr != nil {
c.sendDBBackupFinishError(fmt.Errorf("backup was cancelled: %s", dumpErr))
c.logger.Info("DB dump cancelled")
return
c.mongoDumper.Start()
derr := c.mongoDumper.Wait()
if err = bw.Close(); err != nil {
return err
}
if derr != nil {
return derr
}

c.logger.Info("DB dump completed")
c.sendBackupFinishOK()
return nil
}

func (c *Client) runOplogBackup(msg *pb.StartBackup, sess *session.Session, oplogTailer io.Reader) {
Expand Down Expand Up @@ -1294,9 +1296,8 @@ func (c *Client) restoreDBDump(msg *pb.RestoreBackup) (err error) {
Password: c.connOpts.Password,
Gzip: false,
Oplog: false,
Threads: 1,
//Reader: readers[len(readers)-1],
Reader: rdr,
Threads: 10,
Reader: rdr,
// A real restore would be applied to a just created and empty instance and it should be
// configured to run without user authentication.
// For testing purposes, we can skip restoring users and roles.
Expand Down
171 changes: 169 additions & 2 deletions grpc/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"reflect"
"testing"

"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/kr/pretty"
"github.com/percona/percona-backup-mongodb/internal/testutils"
"github.com/percona/percona-backup-mongodb/internal/testutils/testenv"
pb "github.com/percona/percona-backup-mongodb/proto/messages"
log "github.com/sirupsen/logrus"
)

func TestValidateFilesystemStorage(t *testing.T) {
Expand Down Expand Up @@ -98,6 +100,149 @@ func TestValidateS3Storage(t *testing.T) {
}
}

func TestFsBackupAndRestore(t *testing.T) {
dbName := "test001"
col1 := "col1"
col2 := "col2"
ndocs := 100000
bulkSize := 5000

input, err := buildInputParams()
if err != nil {
t.Fatalf("Cannot build agent's input params: %s", err)
}

c, err := NewClient(context.TODO(), input)
if err != nil {
t.Fatalf("Cannot get S3 storage: %s", err)
}
c.dbConnect()

session, err := mgo.DialWithInfo(testutils.PrimaryDialInfo(t, testutils.MongoDBShard1ReplsetName))
if err != nil {
log.Fatalf("Cannot connect to the DB: %s", err)
}

session.SetMode(mgo.Strong, true)
session.DB(dbName).C(col1).DropCollection()
session.DB(dbName).C(col2).DropCollection()

generateDataToBackup(t, c.mdbSession, dbName, col1, ndocs, bulkSize)
generateDataToBackup(t, c.mdbSession, dbName, col2, ndocs, bulkSize)

msg := &pb.StartBackup{
BackupType: pb.BackupType_BACKUP_TYPE_LOGICAL,
DbBackupName: "0001.dump",
OplogBackupName: "",
CompressionType: pb.CompressionType_COMPRESSION_TYPE_NO_COMPRESSION,
Cypher: pb.Cypher_CYPHER_NO_CYPHER,
OplogStartTime: 0,
Description: "test001",
StorageName: "local-filesystem",
}

err = c.runDBBackup(msg)
if err != nil {
t.Errorf("Cannot process restore from s3: %s", err)
}

rmsg := &pb.RestoreBackup{
MongodbHost: "127.0.0.1",
BackupType: pb.BackupType_BACKUP_TYPE_LOGICAL,
//SourceBucket
DbSourceName: "0001.dump",
OplogSourceName: "",
CompressionType: pb.CompressionType_COMPRESSION_TYPE_NO_COMPRESSION,
Cypher: pb.Cypher_CYPHER_NO_CYPHER,
OplogStartTime: 0,
SkipUsersAndRoles: true,
Host: "127.0.0.1",
Port: "17001",
StorageName: "local-filesystem",
}

err = c.restoreDBDump(rmsg)
if err != nil {
t.Errorf("Cannot process restore from s3: %s", err)
}

if err := testutils.CleanTempDirAndBucket(); err != nil {
t.Errorf("Cannot clean up directory and bucket: %s", err)
}
}

func TestS3sBackupAndRestore(t *testing.T) {
dbName := "test001"
col1 := "col1"
col2 := "col2"
ndocs := 100000
bulkSize := 5000

input, err := buildInputParams()
if err != nil {
t.Fatalf("Cannot build agent's input params: %s", err)
}

c, err := NewClient(context.TODO(), input)
if err != nil {
t.Fatalf("Cannot get S3 storage: %s", err)
}
c.dbConnect()

session, err := mgo.DialWithInfo(testutils.PrimaryDialInfo(t, testutils.MongoDBShard1ReplsetName))
if err != nil {
log.Fatalf("Cannot connect to the DB: %s", err)
}

session.SetMode(mgo.Strong, true)
session.DB(dbName).C(col1).DropCollection()
session.DB(dbName).C(col2).DropCollection()

generateDataToBackup(t, c.mdbSession, dbName, col1, ndocs, bulkSize)
generateDataToBackup(t, c.mdbSession, dbName, col2, ndocs, bulkSize)

msg := &pb.StartBackup{
BackupType: pb.BackupType_BACKUP_TYPE_LOGICAL,
NamePrefix: "backup_test_",
DbBackupName: "0001.dump",
OplogBackupName: "",
CompressionType: pb.CompressionType_COMPRESSION_TYPE_NO_COMPRESSION,
Cypher: pb.Cypher_CYPHER_NO_CYPHER,
OplogStartTime: 0,
Description: "test001",
StorageName: "s3-us-west",
}

err = c.runDBBackup(msg)
if err != nil {
t.Errorf("Cannot process restore from s3: %s", err)
}

rmsg := &pb.RestoreBackup{
MongodbHost: "127.0.0.1",
BackupType: pb.BackupType_BACKUP_TYPE_LOGICAL,
//SourceBucket
DbSourceName: "0001.dump",
OplogSourceName: "",
CompressionType: pb.CompressionType_COMPRESSION_TYPE_NO_COMPRESSION,
Cypher: pb.Cypher_CYPHER_NO_CYPHER,
OplogStartTime: 0,
SkipUsersAndRoles: true,
Host: "127.0.0.1",
Port: "17001",
StorageName: "s3-us-west",
}

err = c.restoreDBDump(rmsg)
if err != nil {
t.Errorf("Cannot process restore from s3: %s", err)
}

if err := testutils.CleanTempDirAndBucket(); err != nil {
t.Errorf("Cannot clean up directory and bucket: %s", err)
}
}

func buildInputParams() (InputOptions, error) {
port := testutils.MongoDBShard1PrimaryPort
rs := testutils.MongoDBShard1ReplsetName
Expand All @@ -110,7 +255,7 @@ func buildInputParams() (InputOptions, error) {
storages := testutils.TestingStorages()

dbConnOpts := ConnectionOptions{
Host: testenv.MongoDBHost,
Host: testutils.MongoDBHost,
Port: port,
User: di.Username,
Password: di.Password,
Expand All @@ -127,3 +272,25 @@ func buildInputParams() (InputOptions, error) {

return input, nil
}

func generateDataToBackup(t *testing.T, session *mgo.Session, dbName string, colName string, ndocs, bulkSize int) {
// Don't check for error because the collection might not exist.
session.DB(dbName).C(colName).DropCollection()
session.DB(dbName).C(colName).EnsureIndexKey("number")
session.Refresh()

number := 0
for i := 0; i < ndocs/bulkSize; i++ {
docs := make([]interface{}, 0, bulkSize)
bulk := session.DB(dbName).C(colName).Bulk()
for j := 0; j < bulkSize; j++ {
number++
docs = append(docs, bson.M{"number": number})
}
bulk.Insert(docs...)
_, err := bulk.Run()
if err != nil {
t.Fatalf("Cannot insert data to back up: %s", err)
}
}
}
13 changes: 13 additions & 0 deletions internal/backup/dumper/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,21 @@ import (
"fmt"
"io"
"sync"
"time"

"github.com/mongodb/mongo-tools-common/log"
"github.com/mongodb/mongo-tools/common/options"
"github.com/mongodb/mongo-tools/common/progress"
"github.com/mongodb/mongo-tools/mongodump"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

const (
progressBarLength = 24
progressBarWaitTime = time.Second * 3
)

type MongodumpInput struct {
Archive string
Host string
Expand Down Expand Up @@ -146,6 +154,11 @@ func (md *Mongodump) Wait() error {
}

func (md *Mongodump) dump() {
progressManager := progress.NewBarWriter(log.Writer(0), progressBarWaitTime, progressBarLength, false)
md.mongodump.ProgressManager = progressManager
progressManager.Start()
defer progressManager.Stop()

err := md.mongodump.Dump()
md.waitChan <- err
}
Expand Down
17 changes: 3 additions & 14 deletions internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,9 @@ func MakeReader(name string, stg storage.Storage, compressionType pb.Compression
if err != nil {
return nil, errors.Wrapf(err, "Cannot read backup file %s from bucket %s", name, stg.S3.Bucket)
}
//pr, pw := io.Pipe()
//go func() {
// buf := make([]byte, 16*1024*1024)
// br.bytesRead, err = io.CopyBuffer(pw, result.Body, buf)
// if err := result.Body.Close(); err != nil {
// br.lastError = err
// }
// if err := pw.Close(); err != nil {
// log.Errorf(">>> cannot close pipe: %s", err)
// }
//}()
//br.readers = append(br.readers, pr)
// Since we are chaining readers, we don't want to let s3Svc.GetObject to close the Body, we
// have our own Close() method call so, here we need to wrap it with a NopCloser

// we don't want that the body gets closed when the reader is depleated otherwise it we be
// closed too soon
br.readers = append(br.readers, ioutil.NopCloser(result.Body))
default:
return nil, fmt.Errorf("Don't know how to handle %q storage type", stg.Type)
Expand Down
Loading