Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ test-race:
ifeq ($(GO_TEST_CODECOV), true)
GOCACHE=$(GOCACHE) go test -v -race -coverprofile=$(GO_TEST_COVER_PROFILE) -covermode=atomic $(GO_TEST_PATH)
else
GOCACHE=$(GOCACHE) go test -v -race $(GO_TEST_PATH)
GOCACHE=$(GOCACHE) go test -v -race -covermode=atomic $(GO_TEST_PATH)
endif

test:
Expand All @@ -42,7 +42,7 @@ test:
TEST_MONGODB_CONFIGSVR1_PORT=$(TEST_MONGODB_CONFIGSVR1_PORT) \
TEST_MONGODB_MONGOS_PORT=$(TEST_MONGODB_MONGOS_PORT) \
GOCACHE=$(GOCACHE) \
go test -v $(GO_TEST_PATH)
go test -v -covermode=atomic $(GO_TEST_PATH)

test-cluster:
TEST_PSMDB_VERSION=$(TEST_PSMDB_VERSION) \
Expand Down
31 changes: 31 additions & 0 deletions internal/cluster/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cluster

import (
"errors"
"time"

"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
Expand Down Expand Up @@ -31,6 +32,9 @@ func runBalancerCommand(session *mgo.Session, balancerCommand string) error {
// GetBalancerStatus returns a struct representing the result of
// the MongoDB 'balancerStatus' command. This command will only
// succeed on a session to a mongos process (as of MongoDB 3.6)
//
// https://docs.mongodb.com/manual/reference/command/balancerStatus/
//
func GetBalancerStatus(session *mgo.Session) (*mdbstructs.BalancerStatus, error) {
balancerStatus := mdbstructs.BalancerStatus{}
err := session.Run(bson.D{{"balancerStatus", "1"}}, &balancerStatus)
Expand All @@ -51,12 +55,39 @@ func IsBalancerRunning(status *mdbstructs.BalancerStatus) bool {

// StopBalancer performs a 'balancerStop' server command on
// the provided session
//
// https://docs.mongodb.com/manual/reference/command/balancerStop/
//
func StopBalancer(session *mgo.Session) error {
return runBalancerCommand(session, "balancerStop")
}

// StartBalancer performs a 'balancerStart' server command on
// the provided session
//
// https://docs.mongodb.com/manual/reference/command/balancerStart/
//
func StartBalancer(session *mgo.Session) error {
return runBalancerCommand(session, "balancerStart")
}

// StopBalancerAndWait performs a StopBalancer and then waits for
// the balancer to stop running any balancer operations
func StopBalancerAndWait(session *mgo.Session, retries int, retryInterval time.Duration) error {
err := StopBalancer(session)
if err != nil {
return err
}
var tries int
for tries < retries {
status, err := GetBalancerStatus(session)
if err != nil {
return err
} else if !IsBalancerRunning(status) {
return nil
}
tries++
time.Sleep(retryInterval)
}
return errors.New("balancer did not stop")
}
27 changes: 27 additions & 0 deletions internal/cluster/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,33 @@ func TestStopBalancer(t *testing.T) {
}
}

func TestStopBalancerAndWait(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
t.Fatalf("Could not connect to mongos: %v", err.Error())
}
defer session.Close()

err = StartBalancer(session)
if err != nil {
t.Fatalf("Failed to run .StartBalancer(): %v", err.Error())
}

err = StopBalancerAndWait(session, 10, time.Second)
if err != nil {
t.Fatalf("Failed to run .StopBalancerAndWait(): %v", err.Error())
}

status, err := GetBalancerStatus(session)
if err != nil {
t.Fatalf("Failed to run .GetBalancerStatus(): %v", err.Error())
}

if IsBalancerRunning(status) || IsBalancerEnabled(status) {
t.Fatal("The balancer did not stop running")
}
}

func TestStartBalancer(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
Expand Down
39 changes: 39 additions & 0 deletions internal/cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package cluster

import (
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/percona/mongodb-backup/mdbstructs"
)

// GetShardingState returns a struct reflecting the output of the
// 'shardingState' server command. This command should be ran on a
// shard mongod
//
// https://docs.mongodb.com/manual/reference/command/shardingState/
//
func GetShardingState(session *mgo.Session) (*mdbstructs.ShardingState, error) {
shardingState := mdbstructs.ShardingState{}
err := session.Run(bson.D{{"shardingState", "1"}}, &shardingState)
return &shardingState, err
}

// GetClusterIDShard returns the cluster ID using the result of the
// 'balancerState' server command
func GetClusterIDShard(state *mdbstructs.ShardingState) *bson.ObjectId {
return &state.ClusterID
}

// GetClusterID returns the cluster ID using the 'config.version'
// collection. This will only succeeed on a mongos or config server,
// use .GetClusterIDShard instead on shard servers
func GetClusterID(session *mgo.Session) (*bson.ObjectId, error) {
configVersion := struct {
ClusterId bson.ObjectId `bson:"clusterId"`
}{}
err := session.DB(configDB).C("version").Find(bson.M{"_id": 1}).One(&configVersion)
if err != nil {
return nil, err
}
return &configVersion.ClusterId, nil
}
73 changes: 73 additions & 0 deletions internal/cluster/cluster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package cluster

import (
"testing"

"github.com/globalsign/mgo"
"github.com/percona/mongodb-backup/internal/testutils"
//"github.com/percona/mongodb-backup/mdbstructs"
)

func TestGetShardingState(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.PrimaryDialInfo())
if err != nil {
t.Fatalf("Failed to get primary session: %v", err.Error())
}
defer session.Close()

state, err := GetShardingState(session)
if err != nil {
t.Fatalf("Failed to run .GetShardingState(): %v", err.Error())
} else if state.Ok != 1 || !state.Enabled || state.ShardName != testutils.MongoDBReplsetName || state.ConfigServer == "" {
t.Fatal("Got unexpected output from .GetShardingState()")
}
}

func TestGetClusterID(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
t.Fatalf("Failed to get mongos session: %v", err.Error())
}
defer session.Close()

clusterId, err := GetClusterID(session)
if err != nil {
t.Fatalf("Failed to run .GetClusterID(): %v", err.Error())
} else if clusterId == nil {
t.Fatal(".GetClusterId() returned nil id")
}
}

func TestGetClusterIDShard(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.PrimaryDialInfo())
if err != nil {
t.Fatalf("Failed to get primary session: %v", err.Error())
}
defer session.Close()

state, err := GetShardingState(session)
if err != nil {
t.Fatalf("Failed to run .GetShardingState(): %v", err.Error())
}

shardClusterId := GetClusterIDShard(state)
if shardClusterId == nil {
t.Fatal("Could not get cluster ID")
}

// check clusterId fetched from the shard/primary is same as mongos
mongosSession, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
t.Fatalf("Failed to get mongos session: %v", err.Error())
}
defer mongosSession.Close()

mongosClusterId, err := GetClusterID(mongosSession)
if err != nil {
t.Fatalf("Failed to run .GetClusterID(): %v", err.Error())
}

if mongosClusterId.Hex() != shardClusterId.Hex() {
t.Fatal("Shard and mongos cluster IDs did not match")
}
}
7 changes: 0 additions & 7 deletions internal/cluster/config.go

This file was deleted.

4 changes: 0 additions & 4 deletions internal/cluster/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ import (

var (
testSecondary2Host = testutils.MongoDBHost + ":" + testutils.MongoDBSecondary2Port
testClusterConfig = &Config{
Username: testutils.MongoDBUser,
Password: testutils.MongoDBPassword,
}
)

func loadBSONFile(file string, out interface{}) error {
Expand Down
59 changes: 24 additions & 35 deletions internal/cluster/replset.go
Original file line number Diff line number Diff line change
@@ -1,55 +1,44 @@
package cluster

import (
"time"

"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/percona/mongodb-backup/mdbstructs"
)

var (
replsetReadPreference = mgo.PrimaryPreferred
)

//func HasReplsetTag(config *mdbstructs.ReplsetConfig, key, val string) bool {
//}

type Replset struct {
name string
addrs []string
config *Config
}

func NewReplset(config *Config, name string, addrs []string) *Replset {
return &Replset{
name: name,
addrs: addrs,
config: config,
}
}

func (r *Replset) GetReplsetSession() (*mgo.Session, error) {
session, err := mgo.DialWithInfo(&mgo.DialInfo{
Addrs: r.addrs,
Username: r.config.Username,
Password: r.config.Password,
ReplicaSetName: r.name,
Timeout: 10 * time.Second,
})
if err != nil || session.Ping() != nil {
return session, err
}
session.SetMode(replsetReadPreference, true)
return session, err
}

// GetConfig returns a struct representing the "replSetGetConfig" server
// command
//
// https://docs.mongodb.com/manual/reference/command/replSetGetConfig/
//
func GetConfig(session *mgo.Session) (*mdbstructs.ReplsetConfig, error) {
rsGetConfig := mdbstructs.ReplSetGetConfig{}
err := session.Run(bson.D{{"replSetGetConfig", "1"}}, &rsGetConfig)
return rsGetConfig.Config, err
}

// GetStatus returns a struct representing the "replSetGetStatus" server
// command
//
// https://docs.mongodb.com/manual/reference/command/replSetGetStatus/
//
func GetStatus(session *mgo.Session) (*mdbstructs.ReplsetStatus, error) {
status := mdbstructs.ReplsetStatus{}
err := session.Run(bson.D{{"replSetGetStatus", "1"}}, &status)
return &status, err
}

// GetReplsetID returns the replica set ID
func GetReplsetID(config *mdbstructs.ReplsetConfig) *bson.ObjectId {
return &config.Settings.ReplicaSetId
}

// GetBackupSource returns the the most appropriate replica set member
// to become the source of the backup. The chosen node should cause
// the least impact/risk possible during backup
func GetBackupSource(config *mdbstructs.ReplsetConfig, status *mdbstructs.ReplsetStatus) (*mdbstructs.ReplsetConfigMember, error) {
scorer, err := ScoreReplset(config, status, nil)
if err != nil {
Expand Down
11 changes: 2 additions & 9 deletions internal/cluster/replset_scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cluster
import (
"testing"

"github.com/globalsign/mgo"
"github.com/percona/mongodb-backup/internal/testutils"
"github.com/percona/mongodb-backup/mdbstructs"
)
Expand All @@ -14,15 +15,7 @@ const (
)

func TestScoreReplset(t *testing.T) {
rs := NewReplset(
testClusterConfig,
testutils.MongoDBReplsetName,
[]string{
testutils.MongoDBHost + ":" + testutils.MongoDBPrimaryPort,
},
)

session, err := rs.GetReplsetSession()
session, err := mgo.DialWithInfo(testutils.PrimaryDialInfo())
if err != nil {
t.Fatalf("Could not connect to replset: %v", err.Error())
}
Expand Down
8 changes: 0 additions & 8 deletions internal/cluster/replset_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"errors"
"time"

"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/percona/mongodb-backup/mdbstructs"
)

Expand Down Expand Up @@ -90,9 +88,3 @@ func GetReplsetStatusPrimary(status *mdbstructs.ReplsetStatus) *mdbstructs.Repls
}
return nil
}

func GetStatus(session *mgo.Session) (*mdbstructs.ReplsetStatus, error) {
status := mdbstructs.ReplsetStatus{}
err := session.Run(bson.D{{"replSetGetStatus", "1"}}, &status)
return &status, err
}
Loading