Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 64 additions & 32 deletions internal/cluster/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,50 @@ import (
)

// The amount of time in milliseconds to wait for a balancer
// command to run
// command to run. 60sec is the same default as mongo shell
var BalancerCmdTimeoutMs = 60000

type Balancer struct {
session *mgo.Session
wasEnabled bool
}

func NewBalancer(session *mgo.Session) (*Balancer, error) {
var err error
b := &Balancer{session: session}
b.wasEnabled, err = b.IsEnabled()
return b, err
}

// RestoreState ensures the balancer is restored to its original state
func (b *Balancer) RestoreState() error {
isEnabled, err := b.IsEnabled()
if err != nil {
return err
}
if b.wasEnabled && !isEnabled {
return b.Start()
}
return nil
}

// getStatus returns a struct representing the result of the
// MongoDB 'balancerStatus' command. This command will only
// succeed on a session to a mongos process (as of MongoDB 3.6)
//
// https://docs.mongodb.com/manual/reference/command/balancerStatus/
//
func (b *Balancer) getStatus() (*mdbstructs.BalancerStatus, error) {
status := mdbstructs.BalancerStatus{}
err := b.session.Run(bson.D{{"balancerStatus", "1"}}, &status)
return &status, err
}

// runBalancerCommand is a helper for running a
// balancerStart/balancerStop server command
func runBalancerCommand(session *mgo.Session, balancerCommand string) error {
func (b *Balancer) runBalancerCommand(balancerCommand string) error {
okResp := mdbstructs.OkResponse{}
err := session.Run(bson.D{
err := b.session.Run(bson.D{
{balancerCommand, "1"},
{"maxTimeMS", BalancerCmdTimeoutMs},
}, &okResp)
Expand All @@ -29,61 +65,57 @@ func runBalancerCommand(session *mgo.Session, balancerCommand string) error {
return nil
}

// GetBalancerStatus returns a struct representing the result of
// the MongoDB 'balancerStatus' command. This command will only
// succeed on a session to a mongos process (as of MongoDB 3.6)
//
// https://docs.mongodb.com/manual/reference/command/balancerStatus/
//
func GetBalancerStatus(session *mgo.Session) (*mdbstructs.BalancerStatus, error) {
balancerStatus := mdbstructs.BalancerStatus{}
err := session.Run(bson.D{{"balancerStatus", "1"}}, &balancerStatus)
return &balancerStatus, err
}

// IsBalancerEnabled returns a boolean reflecting if the balancer
// IsEnabled returns a boolean reflecting if the balancer
// is enabled
func IsBalancerEnabled(status *mdbstructs.BalancerStatus) bool {
return status.Mode == mdbstructs.BalancerModeFull
func (b *Balancer) IsEnabled() (bool, error) {
status, err := b.getStatus()
if err != nil {
return false, err
}
return status.Mode == mdbstructs.BalancerModeFull, nil
}

// IsBalancerRunning returns a boolean reflecting if the balancer
// IsRunning returns a boolean reflecting if the balancer
// is currently running
func IsBalancerRunning(status *mdbstructs.BalancerStatus) bool {
return status.InBalancerRound
func (b *Balancer) IsRunning() (bool, error) {
status, err := b.getStatus()
if err != nil {
return false, err
}
return status.InBalancerRound, nil
}

// StopBalancer performs a 'balancerStop' server command on
// Stop performs a 'balancerStop' server command on
// the provided session
//
// https://docs.mongodb.com/manual/reference/command/balancerStop/
//
func StopBalancer(session *mgo.Session) error {
return runBalancerCommand(session, "balancerStop")
func (b *Balancer) Stop() error {
return b.runBalancerCommand("balancerStop")
}

// StartBalancer performs a 'balancerStart' server command on
// Start performs a 'balancerStart' server command on
// the provided session
//
// https://docs.mongodb.com/manual/reference/command/balancerStart/
//
func StartBalancer(session *mgo.Session) error {
return runBalancerCommand(session, "balancerStart")
func (b *Balancer) Start() error {
return b.runBalancerCommand("balancerStart")
}

// StopBalancerAndWait performs a StopBalancer and then waits for
// StopAndWait performs a Stop and then waits for
// the balancer to stop running any balancer operations
func StopBalancerAndWait(session *mgo.Session, retries int, retryInterval time.Duration) error {
err := StopBalancer(session)
func (b *Balancer) StopAndWait(retries int, retryInterval time.Duration) error {
err := b.Stop()
if err != nil {
return err
}
var tries int
for tries < retries {
status, err := GetBalancerStatus(session)
isRunning, err := b.IsRunning()
if err != nil {
return err
} else if !IsBalancerRunning(status) {
} else if !isRunning {
return nil
}
tries++
Expand Down
141 changes: 105 additions & 36 deletions internal/cluster/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@ func TestMain(m *testing.M) {
os.Exit(1)
}

err = StartBalancer(session)
b, err := NewBalancer(session)
if err != nil {
fmt.Printf("Failed to run .StartBalancer(): %v", err.Error())
fmt.Printf("Could not run .NewBalancer(): %v", err.Error())
os.Exit(1)
}

err = b.Start()
if err != nil {
fmt.Printf("Failed to run .Start(): %v", err.Error())
session.Close()
os.Exit(1)
}
Expand All @@ -29,69 +35,80 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}

func TestGetBalancerStatus(t *testing.T) {
func TestBalancerGetStatus(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
t.Fatalf("Could not connect to mongos: %v", err.Error())
}
defer session.Close()

status, err := GetBalancerStatus(session)
b, err := NewBalancer(session)
if err != nil {
t.Fatalf("Could not run .NewBalancer(): %v", err.Error())
}

status, err := b.getStatus()
if err != nil {
t.Fatalf("Failed to run .GetBalancerStatus(): %v", err.Error())
t.Fatalf("Failed to run .getStatus(): %v", err.Error())
}
if status == nil || status.Ok != 1 {
t.Fatal("Got unexpected result from .GetBalancerStatus()")
t.Fatal("Got unexpected result from .getStatus()")
}
}

func TestIsBalancerEnabled(t *testing.T) {
func TestBalancerIsEnabled(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
t.Fatalf("Could not connect to mongos: %v", err.Error())
}
defer session.Close()

status, err := GetBalancerStatus(session)
b, err := NewBalancer(session)
if err != nil {
t.Fatalf("Failed to run .GetBalancerStatus(): %v", err.Error())
t.Fatalf("Could not run .NewBalancer(): %v", err.Error())
}

if !IsBalancerEnabled(status) {
t.Fatal(".IsBalancerEnabled() should return true")
isEnabled, err := b.IsEnabled()
if err != nil {
t.Fatalf("Failed to run .IsEnabled(): %v", err.Error())
} else if !isEnabled {
t.Fatal(".IsEnabled() should return true")
}
}

func TestStopBalancer(t *testing.T) {
func TestBalancerStop(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
t.Fatalf("Could not connect to mongos: %v", err.Error())
}
defer session.Close()

err = StopBalancer(session)
b, err := NewBalancer(session)
if err != nil {
t.Fatalf("Failed to run .StopBalancer(): %v", err.Error())
t.Fatalf("Could not run .NewBalancer(): %v", err.Error())
}

status, err := GetBalancerStatus(session)
err = b.Stop()
if err != nil {
t.Fatalf("Failed to run .GetBalancerStatus(): %v", err.Error())
t.Fatalf("Failed to run .Stop(): %v", err.Error())
}

if IsBalancerEnabled(status) {
t.Fatal(".IsBalancerEnabled() should return false after .StopBalancer()")
isEnabled, err := b.IsEnabled()
if err != nil {
t.Fatalf(".IsEnabled() returned an error: %v", err.Error())
} else if isEnabled {
t.Fatal(".IsEnabled() should return false after .Stop()")
}

// sometimes the balancer doesn't stop right away
tries := 1
maxTries := 60
for tries < maxTries {
status, err := GetBalancerStatus(session)
isRunning, err := b.IsRunning()
if err != nil {
t.Fatalf("Failed to run .GetBalancerStatus(): %v", err.Error())
t.Fatalf("Failed to run .IsRunning(): %v", err.Error())
}
if !IsBalancerRunning(status) {
if !isRunning {
break
}
time.Sleep(time.Second)
Expand All @@ -102,51 +119,103 @@ func TestStopBalancer(t *testing.T) {
}
}

func TestStopBalancerAndWait(t *testing.T) {
func TestBalancerStopAndWait(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
t.Fatalf("Could not connect to mongos: %v", err.Error())
}
defer session.Close()

err = StartBalancer(session)
b, err := NewBalancer(session)
if err != nil {
t.Fatalf("Failed to run .StartBalancer(): %v", err.Error())
t.Fatalf("Could not run .NewBalancer(): %v", err.Error())
}

err = StopBalancerAndWait(session, 10, time.Second)
err = b.Start()
if err != nil {
t.Fatalf("Failed to run .StopBalancerAndWait(): %v", err.Error())
t.Fatalf("Failed to run .Start(): %v", err.Error())
}

status, err := GetBalancerStatus(session)
err = b.StopAndWait(10, time.Second)
if err != nil {
t.Fatalf("Failed to run .GetBalancerStatus(): %v", err.Error())
t.Fatalf("Failed to run .StopAndWait(): %v", err.Error())
}

if IsBalancerRunning(status) || IsBalancerEnabled(status) {
isEnabled, err := b.IsEnabled()
if err != nil {
t.Fatalf("Failed to run .IsEnabled(): %v", err.Error())
}

isRunning, err := b.IsRunning()
if err != nil {
t.Fatalf("Failed to run .IsRunning(): %v", err.Error())
}

if isRunning || isEnabled {
t.Fatal("The balancer did not stop running")
}
}

func TestStartBalancer(t *testing.T) {
func TestBalancerStart(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
t.Fatalf("Could not connect to mongos: %v", err.Error())
}
defer session.Close()

err = StartBalancer(session)
b, err := NewBalancer(session)
if err != nil {
t.Fatalf("Failed to run .StartBalancer(): %v", err.Error())
t.Fatalf("Could not run .NewBalancer(): %v", err.Error())
}

status, err := GetBalancerStatus(session)
err = b.Start()
if err != nil {
t.Fatalf("Failed to run .GetBalancerStatus(): %v", err.Error())
t.Fatalf("Failed to run .Start(): %v", err.Error())
}

if !IsBalancerEnabled(status) {
t.Fatal(".IsBalancerEnabled() should return true after .StartBalancer()")
isEnabled, err := b.IsEnabled()
if err != nil {
t.Fatalf("Failed to run .IsEnabled(): %v", err.Error())
} else if !isEnabled {
t.Fatal(".IsEnabled() should return true after .Start()")
}
}

func TestBalancerRestoreState(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
t.Fatalf("Could not connect to mongos: %v", err.Error())
}
defer session.Close()

// start the balancer before the test
b, err := NewBalancer(session)
if err != nil {
t.Fatalf("Could not run .NewBalancer(): %v", err.Error())
}
err = b.Start()
if err != nil {
t.Fatalf("Failed to run .Start(): %v", err.Error())
}

// create a new Balancer struct to test .RestoreState() after .Stop()
b2, err := NewBalancer(session)
if !b2.wasEnabled {
t.Fatal("Balancer .wasEnabled bool should be true")
}
err = b2.Stop()
if err != nil {
t.Fatalf("Failed to run .Stop(): %v", err.Error())
}
err = b2.RestoreState()
if err != nil {
t.Fatalf("Failed to run .RestoreState(): %v", err.Error())
}

isEnabled, err := b.IsEnabled()
if err != nil {
t.Fatalf("Failed to run .IsEnabled(): %v", err.Error())
} else if !isEnabled {
t.Fatal(".IsEnabled() should return true after .Start()")
}
}
Loading