Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ test:
TEST_MONGODB_USERNAME=$(TEST_MONGODB_USERNAME) \
TEST_MONGODB_PASSWORD=$(TEST_MONGODB_PASSWORD) \
TEST_MONGODB_RS=$(TEST_MONGODB_RS) \
TEST_MONGODB_CONFIGSVR_RS=$(TEST_MONGODB_CONFIGSVR_RS) \
TEST_MONGODB_PRIMARY_PORT=$(TEST_MONGODB_PRIMARY_PORT) \
TEST_MONGODB_SECONDARY1_PORT=$(TEST_MONGODB_SECONDARY1_PORT) \
TEST_MONGODB_SECONDARY2_PORT=$(TEST_MONGODB_SECONDARY2_PORT) \
TEST_MONGODB_CONFIGSVR_RS=$(TEST_MONGODB_CONFIGSVR_RS) \
TEST_MONGODB_CONFIGSVR1_PORT=$(TEST_MONGODB_CONFIGSVR1_PORT) \
TEST_MONGODB_MONGOS_PORT=$(TEST_MONGODB_MONGOS_PORT) \
GOCACHE=$(GOCACHE) \
Expand All @@ -52,10 +52,10 @@ test-cluster:
TEST_MONGODB_USERNAME=$(TEST_MONGODB_USERNAME) \
TEST_MONGODB_PASSWORD=$(TEST_MONGODB_PASSWORD) \
TEST_MONGODB_RS=$(TEST_MONGODB_RS) \
TEST_MONGODB_CONFIGSVR_RS=$(TEST_MONGODB_CONFIGSVR_RS) \
TEST_MONGODB_PRIMARY_PORT=$(TEST_MONGODB_PRIMARY_PORT) \
TEST_MONGODB_SECONDARY1_PORT=$(TEST_MONGODB_SECONDARY1_PORT) \
TEST_MONGODB_SECONDARY2_PORT=$(TEST_MONGODB_SECONDARY2_PORT) \
TEST_MONGODB_CONFIGSVR_RS=$(TEST_MONGODB_CONFIGSVR_RS) \
TEST_MONGODB_CONFIGSVR1_PORT=$(TEST_MONGODB_CONFIGSVR1_PORT) \
TEST_MONGODB_MONGOS_PORT=$(TEST_MONGODB_MONGOS_PORT) \
docker-compose up \
Expand All @@ -77,10 +77,10 @@ test-full: test-cluster-clean test-cluster
TEST_MONGODB_USERNAME=$(TEST_MONGODB_USERNAME) \
TEST_MONGODB_PASSWORD=$(TEST_MONGODB_PASSWORD) \
TEST_MONGODB_RS=$(TEST_MONGODB_RS) \
TEST_MONGODB_CONFIGSVR_RS=$(TEST_MONGODB_CONFIGSVR_RS) \
TEST_MONGODB_PRIMARY_PORT=$(TEST_MONGODB_PRIMARY_PORT) \
TEST_MONGODB_SECONDARY1_PORT=$(TEST_MONGODB_SECONDARY1_PORT) \
TEST_MONGODB_SECONDARY2_PORT=$(TEST_MONGODB_SECONDARY2_PORT) \
TEST_MONGODB_CONFIGSVR_RS=$(TEST_MONGODB_CONFIGSVR_RS) \
TEST_MONGODB_CONFIGSVR1_PORT=$(TEST_MONGODB_CONFIGSVR1_PORT) \
TEST_MONGODB_MONGOS_PORT=$(TEST_MONGODB_MONGOS_PORT) \
docker-compose up \
Expand Down
15 changes: 15 additions & 0 deletions internal/cluster/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}

func TestNewBalancer(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
t.Fatalf("Could not connect to mongos: %v", err.Error())
}
defer session.Close()

b, err := NewBalancer(session)
if err != nil {
t.Fatalf("Could not run .NewBalancer(): %v", err.Error())
} else if b.session == nil {
t.Fatal("Got unexpected output from .NewBalancer()")
}
}

func TestBalancerGetStatus(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
Expand Down
18 changes: 11 additions & 7 deletions internal/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,26 @@ import (
"github.com/percona/mongodb-backup/mdbstructs"
)

// GetShardingState returns a struct reflecting the output of the
type ShardingState struct {
state *mdbstructs.ShardingState
}

// NewShardingState returns a struct reflecting the output of the
// 'shardingState' server command. This command should be ran on a
// shard mongod
//
// https://docs.mongodb.com/manual/reference/command/shardingState/
//
func GetShardingState(session *mgo.Session) (*mdbstructs.ShardingState, error) {
shardingState := mdbstructs.ShardingState{}
err := session.Run(bson.D{{"shardingState", "1"}}, &shardingState)
return &shardingState, err
func NewShardingState(session *mgo.Session) (*ShardingState, error) {
s := ShardingState{}
err := session.Run(bson.D{{"shardingState", "1"}}, &s.state)
return &s, err
}

// GetClusterIDShard returns the cluster ID using the result of the
// 'balancerState' server command
func GetClusterIDShard(state *mdbstructs.ShardingState) *bson.ObjectId {
return &state.ClusterID
func (s *ShardingState) ClusterID() *bson.ObjectId {
return &s.state.ClusterID
}

// GetClusterID returns the cluster ID using the 'config.version'
Expand Down
54 changes: 25 additions & 29 deletions internal/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,34 @@ import (

"github.com/globalsign/mgo"
"github.com/percona/mongodb-backup/internal/testutils"
//"github.com/percona/mongodb-backup/mdbstructs"
)

func TestGetShardingState(t *testing.T) {
func TestNewShardingState(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.PrimaryDialInfo())
if err != nil {
t.Fatalf("Failed to get primary session: %v", err.Error())
}
defer session.Close()

state, err := GetShardingState(session)
s, err := NewShardingState(session)
if err != nil {
t.Fatalf("Failed to run .GetShardingState(): %v", err.Error())
} else if state.Ok != 1 || !state.Enabled || state.ShardName != testutils.MongoDBReplsetName || state.ConfigServer == "" {
t.Fatal("Got unexpected output from .GetShardingState()")
t.Fatalf("Failed to run .NewShardingState(): %v", err.Error())
} else if s.state.Ok != 1 || !s.state.Enabled || s.state.ShardName != testutils.MongoDBReplsetName || s.state.ConfigServer == "" {
t.Fatal("Got unexpected output from .NewShardingState()")
}
}

func TestGetClusterID(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
t.Fatalf("Failed to get mongos session: %v", err.Error())
}
defer session.Close()

clusterId, err := GetClusterID(session)
if err != nil {
t.Fatalf("Failed to run .GetClusterID(): %v", err.Error())
} else if clusterId == nil {
t.Fatal(".GetClusterId() returned nil id")
}
}

func TestGetClusterIDShard(t *testing.T) {
func TestShardingStateClusterID(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.PrimaryDialInfo())
if err != nil {
t.Fatalf("Failed to get primary session: %v", err.Error())
}
defer session.Close()

state, err := GetShardingState(session)
s, err := NewShardingState(session)
if err != nil {
t.Fatalf("Failed to run .GetShardingState(): %v", err.Error())
}

shardClusterId := GetClusterIDShard(state)
if shardClusterId == nil {
t.Fatalf("Failed to run .NewShardingState(): %v", err.Error())
} else if s.ClusterID() == nil {
t.Fatal("Could not get cluster ID")
}

Expand All @@ -67,7 +48,22 @@ func TestGetClusterIDShard(t *testing.T) {
t.Fatalf("Failed to run .GetClusterID(): %v", err.Error())
}

if mongosClusterId.Hex() != shardClusterId.Hex() {
if mongosClusterId.Hex() != s.ClusterID().Hex() {
t.Fatal("Shard and mongos cluster IDs did not match")
}
}

func TestGetClusterID(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.MongosDialInfo())
if err != nil {
t.Fatalf("Failed to get mongos session: %v", err.Error())
}
defer session.Close()

clusterId, err := GetClusterID(session)
if err != nil {
t.Fatalf("Failed to run .GetClusterID(): %v", err.Error())
} else if clusterId == nil {
t.Fatal(".GetClusterId() returned nil id")
}
}
6 changes: 6 additions & 0 deletions internal/cluster/is_master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,10 @@ func TestIsMasterIsShardServer(t *testing.T) {
} else if !i.IsShardServer() {
t.Fatalf("Expected true from .IsShardServer()")
}

// replset-only node (should fail)
i.isMaster.ConfigServerState = nil
if i.IsShardServer() {
t.Fatalf("Expected false from .IsShardServer()")
}
}
2 changes: 1 addition & 1 deletion internal/cluster/replset.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (r *Replset) ID() *bson.ObjectId {
// the least impact/risk possible during backup
func (r *Replset) BackupSource() (*mdbstructs.ReplsetConfigMember, error) {
// todo: pass replset-tags instead of nil
scorer, err := r.score(nil)
scorer, err := r.scoreMembers(nil)
if err != nil {
return nil, err
}
Expand Down
18 changes: 9 additions & 9 deletions internal/cluster/replset_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (m *ReplsetScoringMember) MultiplyScore(multiplier float64, msg ReplsetScor
m.log = append(m.log, msg)
}

func minPrioritySecondary(members map[string]*ReplsetScoringMember) *ReplsetScoringMember {
func minPrioritySecondary(members []*ReplsetScoringMember) *ReplsetScoringMember {
var minPriority *ReplsetScoringMember
for _, member := range members {
if member.status.State != mdbstructs.ReplsetMemberStateSecondary || member.config.Priority < 1 {
Expand All @@ -79,7 +79,7 @@ func minPrioritySecondary(members map[string]*ReplsetScoringMember) *ReplsetScor
return minPriority
}

func minVotesSecondary(members map[string]*ReplsetScoringMember) *ReplsetScoringMember {
func minVotesSecondary(members []*ReplsetScoringMember) *ReplsetScoringMember {
var minVotes *ReplsetScoringMember
for _, member := range members {
if member.status.State != mdbstructs.ReplsetMemberStateSecondary || member.config.Votes < 1 {
Expand All @@ -92,8 +92,8 @@ func minVotesSecondary(members map[string]*ReplsetScoringMember) *ReplsetScoring
return minVotes
}

func (r *Replset) getReplsetScoringMembers() (map[string]*ReplsetScoringMember, error) {
members := map[string]*ReplsetScoringMember{}
func (r *Replset) getReplsetScoringMembers() ([]*ReplsetScoringMember, error) {
members := []*ReplsetScoringMember{}
for _, cnfMember := range r.config.Members {
var statusMember *mdbstructs.ReplsetStatusMember
for _, m := range r.status.Members {
Expand All @@ -105,21 +105,21 @@ func (r *Replset) getReplsetScoringMembers() (map[string]*ReplsetScoringMember,
if statusMember == nil {
return nil, errors.New("no status info")
}
members[cnfMember.Host] = &ReplsetScoringMember{
members = append(members, &ReplsetScoringMember{
config: cnfMember,
status: statusMember,
score: baseScore,
}
})
}
return members, nil
}

type ReplsetScorer struct {
replsetTags map[string]string
members map[string]*ReplsetScoringMember
members []*ReplsetScoringMember
}

func (r *Replset) score(replsetTags map[string]string) (*ReplsetScorer, error) {
func (r *Replset) scoreMembers(replsetTags map[string]string) (*ReplsetScorer, error) {
r.Lock()
defer r.Unlock()

Expand Down Expand Up @@ -203,7 +203,7 @@ func (r *Replset) score(replsetTags map[string]string) (*ReplsetScorer, error) {
return scorer, nil
}

func (s *ReplsetScorer) All() map[string]*ReplsetScoringMember {
func (s *ReplsetScorer) All() []*ReplsetScoringMember {
return s.members
}

Expand Down
8 changes: 4 additions & 4 deletions internal/cluster/replset_scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (
testReplSetStatusSecondaryFile = "testdata/replSetGetStatus-secondary.bson"
)

func TestReplsetScore(t *testing.T) {
func TestReplsetScoreMembers(t *testing.T) {
session, err := mgo.DialWithInfo(testutils.PrimaryDialInfo())
if err != nil {
t.Fatalf("Could not connect to replset: %v", err.Error())
Expand All @@ -26,7 +26,7 @@ func TestReplsetScore(t *testing.T) {
t.Fatalf("Failed to run .NewReplset(): %v", err.Error())
}

scorer, err := r.score(nil)
scorer, err := r.scoreMembers(nil)
if err != nil {
t.Fatalf("Failed to run .ScoreReplset(): %v", err.Error())
} else if len(scorer.members) < 1 {
Expand All @@ -50,7 +50,7 @@ func TestReplsetScore(t *testing.T) {
if err != nil {
t.Fatalf("Failed to run .NewReplset(): %v", err.Error())
}
scorer, err = r.score(map[string]string{"role": "backup"})
scorer, err = r.scoreMembers(map[string]string{"role": "backup"})
if err != nil {
t.Fatalf("Failed to run .score(): %v", err.Error())
}
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestReplsetScore(t *testing.T) {
t.Fatalf("Cannot load test file %v: %v", statusFile, err.Error())
}

scorer, err := r.score(nil)
scorer, err := r.scoreMembers(nil)
if err != nil {
t.Fatalf("Failed to run .score(): %v", err.Error())
}
Expand Down