Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions internal/cluster/balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package cluster

import (
"errors"

"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/percona/mongodb-backup/mdbstructs"
)

// The amount of time in milliseconds to wait for a balancer
// command to run
var BalancerCmdTimeoutMs = 60000

// runBalancerCommand is a helper for running a
// balancerStart/balancerStop server command
func runBalancerCommand(session *mgo.Session, balancerCommand string) error {
okResp := mdbstructs.OkResponse{}
err := session.Run(bson.D{
{balancerCommand, "1"},
{"maxTimeMS", BalancerCmdTimeoutMs},
}, &okResp)
if err != nil {
return err
} else if okResp.Ok != 1 {
return errors.New("got failed response from server")
}
return nil
}

// GetBalancerStatus returns a struct representing the result of
// the MongoDB 'balancerStatus' command. This command will only
// succeed on a session to a mongos process (as of MongoDB 3.6)
func GetBalancerStatus(session *mgo.Session) (*mdbstructs.BalancerStatus, error) {
balancerStatus := mdbstructs.BalancerStatus{}
err := session.Run(bson.D{{"balancerStatus", "1"}}, &balancerStatus)
return &balancerStatus, err
}

// IsBalancerEnabled returns a boolean reflecting if the balancer
// is enabled
func IsBalancerEnabled(status *mdbstructs.BalancerStatus) bool {
return status.Mode == mdbstructs.BalancerModeFull
}

// IsBalancerRunning returns a boolean reflecting if the balancer
// is currently running
func IsBalancerRunning(status *mdbstructs.BalancerStatus) bool {
return status.InBalancerRound
}

// StopBalancer performs a 'balancerStop' server command on
// the provided session
func StopBalancer(session *mgo.Session) error {
return runBalancerCommand(session, "balancerStop")
}

// StartBalancer performs a 'balancerStart' server command on
// the provided session
func StartBalancer(session *mgo.Session) error {
return runBalancerCommand(session, "balancerStart")
}
125 changes: 125 additions & 0 deletions internal/cluster/balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package cluster

import (
"fmt"
"os"
"testing"
"time"

"github.com/globalsign/mgo"
"github.com/percona/mongodb-backup/internal/testutils"
)

// Make sure the balancer is started before the tests, then run tests
func TestMain(m *testing.M) {
session, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
fmt.Printf("Could not connect to mongos: %v", err.Error())
os.Exit(1)
}

err = StartBalancer(session)
if err != nil {
fmt.Printf("Failed to run .StartBalancer(): %v", err.Error())
session.Close()
os.Exit(1)
}
session.Close()

os.Exit(m.Run())
}

func TestGetBalancerStatus(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
t.Fatalf("Could not connect to mongos: %v", err.Error())
}
defer session.Close()

status, err := GetBalancerStatus(session)
if err != nil {
t.Fatalf("Failed to run .GetBalancerStatus(): %v", err.Error())
}
if status == nil || status.Ok != 1 {
t.Fatal("Got unexpected result from .GetBalancerStatus()")
}
}

func TestIsBalancerEnabled(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
t.Fatalf("Could not connect to mongos: %v", err.Error())
}
defer session.Close()

status, err := GetBalancerStatus(session)
if err != nil {
t.Fatalf("Failed to run .GetBalancerStatus(): %v", err.Error())
}

if !IsBalancerEnabled(status) {
t.Fatal(".IsBalancerEnabled() should return true")
}
}

func TestStopBalancer(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
t.Fatalf("Could not connect to mongos: %v", err.Error())
}
defer session.Close()

err = StopBalancer(session)
if err != nil {
t.Fatalf("Failed to run .StopBalancer(): %v", err.Error())
}

status, err := GetBalancerStatus(session)
if err != nil {
t.Fatalf("Failed to run .GetBalancerStatus(): %v", err.Error())
}

if IsBalancerEnabled(status) {
t.Fatal(".IsBalancerEnabled() should return false after .StopBalancer()")
}

// sometimes the balancer doesn't stop right away
tries := 1
maxTries := 60
for tries < maxTries {
status, err := GetBalancerStatus(session)
if err != nil {
t.Fatalf("Failed to run .GetBalancerStatus(): %v", err.Error())
}
if !IsBalancerRunning(status) {
break
}
time.Sleep(time.Second)
tries++
}
if tries >= maxTries {
t.Fatal("The balancer did not stop running")
}
}

func TestStartBalancer(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
t.Fatalf("Could not connect to mongos: %v", err.Error())
}
defer session.Close()

err = StartBalancer(session)
if err != nil {
t.Fatalf("Failed to run .StartBalancer(): %v", err.Error())
}

status, err := GetBalancerStatus(session)
if err != nil {
t.Fatalf("Failed to run .GetBalancerStatus(): %v", err.Error())
}

if !IsBalancerEnabled(status) {
t.Fatal(".IsBalancerEnabled() should return true after .StartBalancer()")
}
}
41 changes: 0 additions & 41 deletions internal/cluster/cluster.go

This file was deleted.

95 changes: 0 additions & 95 deletions internal/cluster/cluster_test.go

This file was deleted.

7 changes: 7 additions & 0 deletions internal/cluster/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package cluster

type Config struct {
Username string
Password string
AuthDB string
}
8 changes: 4 additions & 4 deletions internal/cluster/is_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ func IsConfigServer(isMaster *mdbstructs.IsMaster) bool {
// determine if a node is a mongod with the 'shardsvr'
// cluster role.
//
func IsShardsvr(isMaster *mdbstructs.IsMaster) bool {
func IsShardServer(isMaster *mdbstructs.IsMaster) bool {
return IsReplset(isMaster) && isMaster.ConfigServerState != nil
}

// We are connected to a Sharded Cluster if the seed host
// is a valid mongos or config server.
// The isMaster struct is from a Sharded Cluster if the seed host
// is a valid mongos, config server or shard server.
//
func IsShardedCluster(isMaster *mdbstructs.IsMaster) bool {
if IsConfigServer(isMaster) || IsMongos(isMaster) {
if IsConfigServer(isMaster) || IsMongos(isMaster) || IsShardServer(isMaster) {
return true
}
return false
Expand Down
20 changes: 7 additions & 13 deletions internal/cluster/is_master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,27 +144,21 @@ func TestIsShardedCluster(t *testing.T) {
}
}

func TestIsShardsvr(t *testing.T) {
if IsShardsvr(&mdbstructs.IsMaster{
func TestIsShardServer(t *testing.T) {
if IsShardServer(&mdbstructs.IsMaster{
IsMaster: true,
SetName: "test",
}) {
t.Fatal(".IsShardsvr() should be false")
t.Fatal(".IsShardServer() should be false")
}
ts, _ := bson.NewMongoTimestamp(time.Now(), 0)
if IsShardsvr(&mdbstructs.IsMaster{
if IsShardServer(&mdbstructs.IsMaster{
IsMaster: true,
Msg: "dbgrid",
ConfigServerState: &mdbstructs.ConfigServerState{
OpTime: &mdbstructs.OpTime{
Ts: ts,
Term: int64(1),
},
},
}) {
t.Fatal(".IsShardsvr() should be false")
t.Fatal(".IsShardServer() should be false")
}
if !IsShardsvr(&mdbstructs.IsMaster{
if !IsShardServer(&mdbstructs.IsMaster{
IsMaster: true,
SetName: "test",
ConfigServerState: &mdbstructs.ConfigServerState{
Expand All @@ -174,6 +168,6 @@ func TestIsShardsvr(t *testing.T) {
},
},
}) {
t.Fatal(".IsShardsvr() should be true")
t.Fatal(".IsShardServer() should be true")
}
}
Loading