Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions cli/pbm-agent/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (
)

func TestDefaults(t *testing.T) {
opts, err := processCliArgs([]string{})
opts, err := processCliArgs([]string{"--backup-dir", os.TempDir()})
if err != nil {
t.Errorf("Unexpected error: %s", err)
t.Fatalf("Unexpected error: %s", err)
}
opts.app = nil
wantOpts := &cliOptions{
ServerAddress: defaultServerAddress,
BackupDir: os.TempDir(),
MongodbConnOptions: client.ConnectionOptions{
Host: defaultMongoDBHost,
Port: defaultMongoDBPort,
Expand Down Expand Up @@ -52,13 +53,14 @@ func TestOverrideDefaultsFromCommandLine(t *testing.T) {

func TestOverrideDefaultsFromEnv(t *testing.T) {
os.Setenv("PBM_AGENT_SERVER_ADDRESS", "localhost:12345")
opts, err := processCliArgs([]string{})
opts, err := processCliArgs([]string{"--backup-dir", os.TempDir()})
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
opts.app = nil
wantOpts := &cliOptions{
ServerAddress: "localhost:12345",
BackupDir: os.TempDir(),
MongodbConnOptions: client.ConnectionOptions{
Host: defaultMongoDBHost,
Port: defaultMongoDBPort,
Expand Down Expand Up @@ -150,13 +152,14 @@ func TestCommandLineArgsPrecedenceOverEnvVars(t *testing.T) {
os.Setenv("PBM_AGENT_MONGODB_PORT", "12346")
defer os.Setenv("PBM_AGENT_MONGODB_PORT", "")

opts, err := processCliArgs([]string{"--mongodb-port", "12345"})
opts, err := processCliArgs([]string{"--backup-dir", os.TempDir(), "--mongodb-port", "12345"})
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
opts.app = nil
wantOpts := &cliOptions{
ServerAddress: "localhost:12345",
BackupDir: os.TempDir(),
MongodbConnOptions: client.ConnectionOptions{
Host: defaultMongoDBHost,
Port: "12345",
Expand All @@ -177,6 +180,7 @@ func TestCommandLineArgsPrecedenceOverConfig(t *testing.T) {
defer os.Remove(tmpfile.Name())
wantOpts := &cliOptions{
ServerAddress: "localhost:12345",
BackupDir: os.TempDir(),
MongodbConnOptions: client.ConnectionOptions{
Host: defaultMongoDBHost,
Port: "12346",
Expand All @@ -193,7 +197,7 @@ func TestCommandLineArgsPrecedenceOverConfig(t *testing.T) {

wantOpts.MongodbConnOptions.Port = "12345"

opts, err := processCliArgs([]string{"--config-file", tmpfile.Name(), "--mongodb-port", "12345"})
opts, err := processCliArgs([]string{"--backup-dir", os.TempDir(), "--config-file", tmpfile.Name(), "--mongodb-port", "12345"})
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
Expand Down
5 changes: 2 additions & 3 deletions cli/pbmctl/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ func TestListAgents(t *testing.T) {
tmpDir := path.Join(os.TempDir(), "dump_test")
defer os.RemoveAll(tmpDir) // Clean up after testing.
l := log.New()
d, err := testGrpc.NewGrpcDaemon(context.Background(), tmpDir, t, l)
d, err := testGrpc.NewDaemon(context.Background(), tmpDir, t, l)
if err != nil {
t.Fatalf("cannot start a new gRPC daemon/clients group: %s", err)
}

serverAddr := "127.0.0.1:" + testGrpc.TEST_GRPC_API_PORT
serverAddr := "127.0.0.1:" + testGrpc.TestGrpcAPIPort
conn, err := getApiConn(&cliOptions{ServerAddress: serverAddr})
if err != nil {
t.Fatalf("Cannot connect to the API: %s", err)
Expand Down Expand Up @@ -476,7 +476,6 @@ func TestCommandLineArgsPrecedenceOverConfig(t *testing.T) {
}

func nilOpts(opts *cliOptions) {
opts.app = nil
opts.backup = nil
opts.restore = nil
opts.list = nil
Expand Down
10 changes: 5 additions & 5 deletions internal/cluster/node_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,27 @@ import "github.com/globalsign/mgo"

const (
NodeTypeUndefined = iota
NodeTypeMongodShardSvr
NodeTypeMongod
NodeTypeMongodReplicaset
NodeTypeMongodShardSvr
NodeTypeMongodConfigSvr
NodeTypeMongos
NodeTypeMongod
)

func getNodeType(session *mgo.Session) (int, error) {
isMaster, err := NewIsMaster(session)
if err != nil {
return NodeTypeUndefined, err
}
if isMaster.IsConfigServer() {
return NodeTypeMongodConfigSvr, nil
}
if isMaster.IsShardServer() {
return NodeTypeMongodShardSvr, nil
}
if isMaster.IsReplset() {
return NodeTypeMongodReplicaset, nil
}
if isMaster.IsConfigServer() {
return NodeTypeMongodConfigSvr, nil
}
if isMaster.IsMongos() {
return NodeTypeMongos, nil
}
Expand Down
6 changes: 3 additions & 3 deletions internal/testutils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,15 @@ func newDialInfo(di *mgo.DialInfo) (*mgo.DialInfo, error) {
return dialInfo, nil
}

func DialInfoForPort(t *testing.T, rs, port string) *mgo.DialInfo {
func DialInfoForPort(rs, port string) (*mgo.DialInfo, error) {
di, err := dialInfo([]string{
MongoDBHost + ":" + port},
rs,
)
if err != nil {
t.Fatalf(".DialInfoForPort() failed: %v", err.Error())
return nil, errors.Wrap(err, ".DialInfoForPort() failed")
}
return di
return di, nil
}

func PrimaryDialInfo(t *testing.T, rs string) *mgo.DialInfo {
Expand Down
97 changes: 60 additions & 37 deletions internal/testutils/grpc/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ import (
)

const (
TEST_GRPC_MESSAGES_PORT = "10000"
TEST_GRPC_API_PORT = "10001"
TestGrpcMessagesPort = "10000"
TestGrpcAPIPort = "10001"
)

var grpcServerShutdownTimeout = 30

type GrpcDaemon struct {
type Daemon struct {
grpcServer4Api *grpc.Server
grpcServer4Clients *grpc.Server
MessagesServer *server.MessagesServer
ApiServer *api.ApiServer
APIServer *api.ApiServer
msgListener net.Listener
apiListener net.Listener
wg *sync.WaitGroup
Expand All @@ -41,9 +41,16 @@ type GrpcDaemon struct {
logger *logrus.Logger
lock *sync.Mutex
clients []*client.Client
workDir string
clientConn *grpc.ClientConn
}

func NewGrpcDaemon(ctx context.Context, workDir string, t *testing.T, logger *logrus.Logger) (*GrpcDaemon, error) {
type PortRs struct {
Port string
Rs string
}

func NewDaemon(ctx context.Context, workDir string, t *testing.T, logger *logrus.Logger) (*Daemon, error) {
if logger == nil {
logger = &logrus.Logger{
Out: os.Stderr,
Expand All @@ -58,18 +65,19 @@ func NewGrpcDaemon(ctx context.Context, workDir string, t *testing.T, logger *lo
logger.Out = logrus.StandardLogger().Out
}
var opts []grpc.ServerOption
d := &GrpcDaemon{
d := &Daemon{
clients: make([]*client.Client, 0),
wg: &sync.WaitGroup{},
logger: logger,
lock: &sync.Mutex{},
workDir: workDir,
}
var err error

// Start the grpc server
d.msgListener, err = net.Listen("tcp", fmt.Sprintf("localhost:%s", TEST_GRPC_MESSAGES_PORT))
d.msgListener, err = net.Listen("tcp", fmt.Sprintf("localhost:%s", TestGrpcMessagesPort))
if err != nil {
return nil, fmt.Errorf("cannot listen on port %s for the gRPC messages server, %s", TEST_GRPC_MESSAGES_PORT, err)
return nil, fmt.Errorf("cannot listen on port %s for the gRPC messages server, %s", TestGrpcMessagesPort, err)
}

d.ctx, d.cancelFunc = context.WithCancel(ctx)
Expand All @@ -83,75 +91,89 @@ func NewGrpcDaemon(ctx context.Context, workDir string, t *testing.T, logger *lo
d.runAgentsGRPCServer(d.ctx, d.grpcServer4Clients, d.msgListener, grpcServerShutdownTimeout, d.wg)

//
d.apiListener, err = net.Listen("tcp", fmt.Sprintf("localhost:%s", TEST_GRPC_API_PORT))
d.apiListener, err = net.Listen("tcp", fmt.Sprintf("localhost:%s", TestGrpcAPIPort))
if err != nil {
return nil, fmt.Errorf("cannot listen on port %s for the gRPC API server, %s", TEST_GRPC_API_PORT, err)
return nil, fmt.Errorf("cannot listen on port %s for the gRPC API server, %s", TestGrpcAPIPort, err)
}

// This is the server gRPC API
d.grpcServer4Api = grpc.NewServer(opts...)
d.ApiServer = api.NewApiServer(d.MessagesServer)
pbapi.RegisterApiServer(d.grpcServer4Api, d.ApiServer)
d.APIServer = api.NewApiServer(d.MessagesServer)
pbapi.RegisterApiServer(d.grpcServer4Api, d.APIServer)

d.wg.Add(1)
logger.Printf("Starting API gRPC server. Listening on %s", d.apiListener.Addr().String())
d.runAgentsGRPCServer(d.ctx, d.grpcServer4Api, d.apiListener, grpcServerShutdownTimeout, d.wg)

clientOpts := []grpc.DialOption{grpc.WithInsecure()}

clientServerAddr := fmt.Sprintf("127.0.0.1:%s", TEST_GRPC_MESSAGES_PORT)
clientConn, err := grpc.Dial(clientServerAddr, clientOpts...)
clientServerAddr := fmt.Sprintf("127.0.0.1:%s", TestGrpcMessagesPort)
d.clientConn, err = grpc.Dial(clientServerAddr, clientOpts...)
if err != nil {
return nil, fmt.Errorf("cannot dail gRPC address %s: %v", clientServerAddr, err)
}

ports := []string{testutils.MongoDBShard1PrimaryPort, testutils.MongoDBShard1Secondary1Port, testutils.MongoDBShard1Secondary2Port,
testutils.MongoDBShard2PrimaryPort, testutils.MongoDBShard2Secondary1Port, testutils.MongoDBShard2Secondary2Port,
testutils.MongoDBConfigsvr1Port, // testutils.MongoDBConfigsvr2Port, testutils.MongoDBConfigsvr3Port,
testutils.MongoDBMongosPort}
repls := []string{testutils.MongoDBShard1ReplsetName, testutils.MongoDBShard1ReplsetName, testutils.MongoDBShard1ReplsetName,
testutils.MongoDBShard2ReplsetName, testutils.MongoDBShard2ReplsetName, testutils.MongoDBShard2ReplsetName,
testutils.MongoDBConfigsvrReplsetName, // testutils.MongoDBConfigsvrReplsetName, testutils.MongoDBConfigsvrReplsetName,
""}

for i, port := range ports {
di := testutils.DialInfoForPort(t, repls[i], port)
return d, nil
}

func (d *Daemon) StartAgents(portRsList []PortRs) error {
for i, portRs := range portRsList {
di, err := testutils.DialInfoForPort(portRs.Rs, portRs.Port)
if err != nil {
return err
}
session, err := mgo.DialWithInfo(di)
logger.Infof("Connecting agent #%d to: %s\n", i, di.Addrs[0])
d.logger.Infof("Connecting agent #%d to: %s\n", i, di.Addrs[0])
if err != nil {
return nil, fmt.Errorf("cannot create a new agent; cannot connect to the MongoDB server %q: %s", di.Addrs[0], err)
return fmt.Errorf("cannot create a new agent; cannot connect to the MongoDB server %q: %s", di.Addrs[0], err)
}
session.SetMode(mgo.Eventual, true)

agentID := fmt.Sprintf("PMB-%03d", i)

dbConnOpts := client.ConnectionOptions{
Host: testutils.MongoDBHost,
Port: port,
Port: portRs.Port,
User: di.Username,
Password: di.Password,
ReplicasetName: di.ReplicaSetName,
}

client, err := client.NewClient(d.ctx, workDir, dbConnOpts, client.SSLOptions{}, clientConn, logger)
client, err := client.NewClient(d.ctx, d.workDir, dbConnOpts, client.SSLOptions{}, d.clientConn, d.logger)
if err != nil {
return nil, fmt.Errorf("Cannot create an agent instance %s: %s", agentID, err)
return fmt.Errorf("Cannot create an agent instance %s: %s", agentID, err)
}
d.clients = append(d.clients, client)
}
return nil
}

return d, nil
func (d *Daemon) StartAllAgents() error {
portRsList := []PortRs{
{Port: testutils.MongoDBShard1PrimaryPort, Rs: testutils.MongoDBShard1ReplsetName},
{Port: testutils.MongoDBShard1Secondary1Port, Rs: testutils.MongoDBShard1ReplsetName},
{Port: testutils.MongoDBShard1Secondary2Port, Rs: testutils.MongoDBShard1ReplsetName},

{Port: testutils.MongoDBShard2PrimaryPort, Rs: testutils.MongoDBShard2ReplsetName},
{Port: testutils.MongoDBShard2Secondary1Port, Rs: testutils.MongoDBShard2ReplsetName},
{Port: testutils.MongoDBShard2Secondary2Port, Rs: testutils.MongoDBShard2ReplsetName},

{Port: testutils.MongoDBConfigsvr1Port, Rs: testutils.MongoDBConfigsvrReplsetName},

{Port: testutils.MongoDBMongosPort, Rs: ""},
}
return d.StartAgents(portRsList)
}

func (d *GrpcDaemon) APIClient() *grpc.Server {
func (d *Daemon) APIClient() *grpc.Server {
return d.grpcServer4Api
}

func (d *GrpcDaemon) MessagesClient() *grpc.Server {
func (d *Daemon) MessagesClient() *grpc.Server {
return d.grpcServer4Clients
}

func (d *GrpcDaemon) Stop() {
func (d *Daemon) Stop() {
d.lock.Lock()
defer d.lock.Unlock()

Expand All @@ -167,17 +189,18 @@ func (d *GrpcDaemon) Stop() {
d.apiListener.Close()
}

func (d *GrpcDaemon) ClientsCount() int {
func (d *Daemon) ClientsCount() int {
d.lock.Lock()
defer d.lock.Unlock()
return len(d.clients)
}

func (d *GrpcDaemon) Clients() []*client.Client {
func (d *Daemon) Clients() []*client.Client {
return d.clients
}

func (d *GrpcDaemon) runAgentsGRPCServer(ctx context.Context, grpcServer *grpc.Server, lis net.Listener, shutdownTimeout int, wg *sync.WaitGroup) {
func (d *Daemon) runAgentsGRPCServer(ctx context.Context, grpcServer *grpc.Server, lis net.Listener,
shutdownTimeout int, wg *sync.WaitGroup) {
go func() {
err := grpcServer.Serve(lis)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions tests/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ func TestApiWithDaemon(t *testing.T) {
log.Printf("Using %s as the temporary directory", tmpDir)
defer os.RemoveAll(tmpDir) // Clean up after testing.

d, err := testGrpc.NewGrpcDaemon(context.Background(), tmpDir, t, nil)
d, err := testGrpc.NewDaemon(context.Background(), tmpDir, t, nil)
if err != nil {
t.Fatalf("cannot start a new gRPC daemon/clients group: %s", err)
}
d.StartAllAgents()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error return value of d.StartAllAgents is not checked (from errcheck)

defer d.Stop()

msg := &pbapi.RunBackupParams{
Expand All @@ -70,7 +71,7 @@ func TestApiWithDaemon(t *testing.T) {
Description: "test backup",
}

_, err = d.ApiServer.RunBackup(context.Background(), msg)
_, err = d.APIServer.RunBackup(context.Background(), msg)
if err != nil {
t.Fatalf("Cannot start backup from API: %s", err)
}
Expand Down Expand Up @@ -140,7 +141,7 @@ func TestApiWithDaemon(t *testing.T) {

stream := newMockBackupsMetadataStream()

err = d.ApiServer.BackupsMetadata(&pbapi.BackupsMetadataParams{}, stream)
err = d.APIServer.BackupsMetadata(&pbapi.BackupsMetadataParams{}, stream)
if err != nil {
t.Errorf("Cannot get backups metadata: %s", err)
}
Expand Down
Loading