diff --git a/cluster/provisioning.go b/cluster/provisioning.go index 8120bcbda..88a3c2b20 100644 --- a/cluster/provisioning.go +++ b/cluster/provisioning.go @@ -1,11 +1,14 @@ package cluster import ( - "bytes" + "errors" "os" "os/exec" "os/user" "strconv" + "time" + + "github.com/tanji/replication-manager/misc" ) func (cluster *Cluster) RejoinMysqldump(source *ServerMonitor, dest *ServerMonitor) error { @@ -32,6 +35,7 @@ func (cluster *Cluster) RejoinMysqldump(source *ServerMonitor, dest *ServerMonit func (cluster *Cluster) InitClusterSemiSync() error { for k, server := range cluster.servers { + cluster.LogPrintf("INFO : Starting Server %s", cluster.cfgGroup+strconv.Itoa(k)) cluster.initMariaDB(server, cluster.cfgGroup+strconv.Itoa(k), "semisync.cnf") } @@ -49,33 +53,63 @@ func (cluster *Cluster) ShutdownClusterSemiSync() error { func (cluster *Cluster) initMariaDB(server *ServerMonitor, name string, conf string) error { path := cluster.conf.HttpRoot + "/tests/" + name os.RemoveAll(path) - if _, err := os.Stat(path); os.IsNotExist(err) { - os.MkdirAll(path, 0711) + usr, err := user.Current() + if err != nil { cluster.LogPrintf("ERRROR : %s", err) return err } - usr, err := user.Current() + + err = misc.CopyDir(cluster.conf.HttpRoot+"/tests/data", path) if err != nil { cluster.LogPrintf("ERRROR : %s", err) return err } - installDB := exec.Command(cluster.conf.MariaDBBinaryPath+"/scripts/mysql_install_db", "--datadir="+path, "--user="+usr.Username) + /* + if _, err := os.Stat(path); os.IsNotExist(err) { + os.MkdirAll(path, 0711) - var outrun bytes.Buffer - installDB.Stdout = &outrun + } else { + cluster.LogPrintf("ERRROR : %s", err) + return err + } + installDB := exec.Command(cluster.conf.MariaDBBinaryPath+"/scripts/mysql_install_db", "--datadir="+path, "--user="+usr.Username) + cluster.LogPrintf("INFO : %s", installDB.Path) + var outrun bytes.Buffer + installDB.Stdout = &outrun - cmdrunErr := installDB.Run() - if cmdrunErr != nil { - cluster.LogPrintf("ERRROR : %s", cmdrunErr) - return cmdrunErr - } - cluster.LogPrintf("PROVISIONING : %s", outrun.String()) + cmdrunErr := installDB.Run() + if cmdrunErr != nil { + cluster.LogPrintf("ERRROR : %s", cmdrunErr) + return cmdrunErr + } + cluster.LogPrintf("PROVISIONING : %s", outrun.String()) + */ + + mariadbdCmd := exec.Command(cluster.conf.MariaDBBinaryPath+"/mysqld", "--defaults-file="+path+"/../etc/"+conf, "--port="+server.Port, "--server-id="+server.Port, "--datadir="+path, "--port="+server.Port, "--user="+usr.Username, "--pid="+path+"/"+name+".pid") - mariadbdCmd := exec.Command(cluster.conf.MariaDBBinaryPath+"/mysqld", "--defaults-file="+path+"../"+conf, "--port="+server.Port, "--server-id="+server.Port, "--datadir="+path, "--port="+server.Port, "--user="+cluster.dbUser, "--user="+usr.Username) - mariadbdCmd.Process.Kill() cluster.LogPrintf("%s %s", mariadbdCmd.Path, mariadbdCmd.Args) go mariadbdCmd.Run() server.Process = mariadbdCmd.Process + + var err2 error + exitloop := 0 + for exitloop < 30 { + time.Sleep(time.Millisecond * 2000) + cluster.LogPrint("Waiting startup ..") + _, err2 = os.Stat(path + "/" + name + ".pid") + if err2 == nil { + exitloop = 30 + } + exitloop++ + + } + if exitloop < 30 { + cluster.LogPrintf("MariaDB started.", err) + } else { + cluster.LogPrintf("MariaDB start timeout.", err) + return errors.New("Failed to start") + } + return nil } @@ -83,3 +117,7 @@ func (cluster *Cluster) killMariaDB(server *ServerMonitor) error { server.Process.Kill() return nil } + +func (cluster *Cluster) StartAllNodes() error { + return nil +} diff --git a/cluster/regressiontest.go b/cluster/regressiontest.go index 4be18327d..14a1d106d 100644 --- a/cluster/regressiontest.go +++ b/cluster/regressiontest.go @@ -174,6 +174,11 @@ func (cluster *Cluster) testFailoverReplAllDelayInteractive() bool { } func (cluster *Cluster) testFailoverReplAllDelayAuto() bool { + cluster.LogPrintf("TESTING : InitClusterSemiSync") + cluster.InitClusterSemiSync() + cluster.Bootstrap() + cluster.wait_failover_end() + cluster.ShutdownClusterSemiSync() return false } @@ -861,115 +866,142 @@ func (cluster *Cluster) getTestResultLabel(res bool) string { } } -func (cluster *Cluster) RunAllTests() bool { - +func (cluster *Cluster) RunAllTests(test string) bool { var allTests = map[string]string{} - cluster.cleanall = true - cluster.Bootstrap() - cluster.wait_failover_end() + ret := true var res bool - - res = cluster.testSwitchOverLongTransactionNoRplCheckNoSemiSync() - allTests["1 Switchover Concurrent Long Transaction "] = cluster.getTestResultLabel(res) - if res == false { - ret = res + cluster.LogPrintf("TESTING : %s", test) + if test == "testFailoverReplAllDelayAuto" || test == "ALL" { + res = cluster.testFailoverReplAllDelayAuto() + allTests["1 Failover all slaves delay "] = cluster.getTestResultLabel(res) + if res == false { + ret = res + } } - res = cluster.testSwitchOverLongQueryNoRplCheckNoSemiSync() - allTests["1 Switchover Concurrent Long Query "] = cluster.getTestResultLabel(res) - if res == false { - ret = res + cluster.cleanall = true + cluster.Bootstrap() + cluster.wait_failover_end() + if test == "testSwitchOverLongTransactionNoRplCheckNoSemiSync" || test == "ALL" { + res = cluster.testSwitchOverLongTransactionNoRplCheckNoSemiSync() + allTests["1 Switchover Concurrent Long Transaction "] = cluster.getTestResultLabel(res) + if res == false { + ret = res + } } - res = cluster.testSwitchOverNoReadOnlyNoRplCheck() - allTests["1 Switchover "] = cluster.getTestResultLabel(res) - if res == false { - ret = res + if test == "testSwitchOverLongQueryNoRplCheckNoSemiSync" || test == "ALL" { + res = cluster.testSwitchOverLongQueryNoRplCheckNoSemiSync() + allTests["1 Switchover Concurrent Long Query "] = cluster.getTestResultLabel(res) + if res == false { + ret = res + } } - - res = cluster.testSwitchOverReadOnlyNoRplCheck() - allTests["1 Switchover "] = cluster.getTestResultLabel(res) - if res == false { - ret = res + if test == "testSwitchOverNoReadOnlyNoRplCheck" || test == "ALL" { + res = cluster.testSwitchOverNoReadOnlyNoRplCheck() + allTests["1 Switchover "] = cluster.getTestResultLabel(res) + if res == false { + ret = res + } } - - res = cluster.testSwitchOver2TimesReplicationOkNoSemiSyncNoRplCheck() - allTests["2 Switchover Replication Ok <2 threads benchmark> "] = cluster.getTestResultLabel(res) - if res == false { - ret = res + if test == "testSwitchOverReadOnlyNoRplCheck" || test == "ALL" { + res = cluster.testSwitchOverReadOnlyNoRplCheck() + allTests["1 Switchover "] = cluster.getTestResultLabel(res) + if res == false { + ret = res + } } - - res = cluster.testSwitchOver2TimesReplicationOkSemiSyncNoRplCheck() - allTests["2 Switchover Replication Ok <2 threads benchmark> "] = cluster.getTestResultLabel(res) - if res == false { - ret = res + if test == "testSwitchOver2TimesReplicationOkNoSemiSyncNoRplCheck" || test == "ALL" { + res = cluster.testSwitchOver2TimesReplicationOkNoSemiSyncNoRplCheck() + allTests["2 Switchover Replication Ok <2 threads benchmark> "] = cluster.getTestResultLabel(res) + if res == false { + ret = res + } } - - res = cluster.testSwitchOverBackPreferedMasterNoRplCheckSemiSync() - allTests["2 Switchover Back Prefered Master "] = cluster.getTestResultLabel(res) - if res == false { - ret = res + if test == "testSwitchOver2TimesReplicationOkSemiSyncNoRplCheck" || test == "ALL" { + res = cluster.testSwitchOver2TimesReplicationOkSemiSyncNoRplCheck() + allTests["2 Switchover Replication Ok <2 threads benchmark> "] = cluster.getTestResultLabel(res) + if res == false { + ret = res + } } - - res = cluster.testSwitchOverAllSlavesStopRplCheckNoSemiSync() - allTests["Can't Switchover All Slaves Stop "] = cluster.getTestResultLabel(res) - if res == false { - ret = res + if test == "testSwitchOverBackPreferedMasterNoRplCheckSemiSync" || test == "ALL" { + res = cluster.testSwitchOverBackPreferedMasterNoRplCheckSemiSync() + allTests["2 Switchover Back Prefered Master "] = cluster.getTestResultLabel(res) + if res == false { + ret = res + } } - - res = cluster.testSwitchOverAllSlavesStopNoSemiSyncNoRplCheck() - allTests["Can Switchover All Slaves Stop "] = cluster.getTestResultLabel(res) - if res == false { - ret = res + if test == "testSwitchOverAllSlavesStopRplCheckNoSemiSync" || test == "ALL" { + res = cluster.testSwitchOverAllSlavesStopRplCheckNoSemiSync() + allTests["Can't Switchover All Slaves Stop "] = cluster.getTestResultLabel(res) + if res == false { + ret = res + } } - - res = cluster.testSwitchOverAllSlavesDelayRplCheckNoSemiSync() - allTests["Can't Switchover All Slaves Delay "] = cluster.getTestResultLabel(res) - if res == false { - ret = res + if test == "testSwitchOverAllSlavesStopNoSemiSyncNoRplCheck" || test == "ALL" { + res = cluster.testSwitchOverAllSlavesStopNoSemiSyncNoRplCheck() + allTests["Can Switchover All Slaves Stop "] = cluster.getTestResultLabel(res) + if res == false { + ret = res + } } - - res = cluster.testSwitchOverAllSlavesDelayNoRplChecksNoSemiSync() - allTests["Can Switchover All Slaves Delay "] = cluster.getTestResultLabel(res) - if res == false { - ret = res + if test == "testSwitchOverAllSlavesDelayRplCheckNoSemiSync" || test == "ALL" { + res = cluster.testSwitchOverAllSlavesDelayRplCheckNoSemiSync() + allTests["Can't Switchover All Slaves Delay "] = cluster.getTestResultLabel(res) + if res == false { + ret = res + } } - - res = cluster.testSlaReplAllSlavesStopNoSemiSync() - allTests["SLA Decrease Can't Switchover All Slaves Stop "] = cluster.getTestResultLabel(res) - if res == false { - ret = res + if test == "testSwitchOverAllSlavesDelayNoRplChecksNoSemiSync" || test == "ALL" { + res = cluster.testSwitchOverAllSlavesDelayNoRplChecksNoSemiSync() + allTests["Can Switchover All Slaves Delay "] = cluster.getTestResultLabel(res) + if res == false { + ret = res + } } - - res = cluster.testFailOverNoRplChecksNoSemiSync() - allTests["1 Failover "] = cluster.getTestResultLabel(res) - if res == false { - ret = res + if test == "testSlaReplAllSlavesStopNoSemiSync" || test == "ALL" { + res = cluster.testSlaReplAllSlavesStopNoSemiSync() + allTests["SLA Decrease Can't Switchover All Slaves Stop "] = cluster.getTestResultLabel(res) + if res == false { + ret = res + } } - - res = cluster.testFailOverAllSlavesDelayNoRplChecksNoSemiSync() - allTests["1 Failover All Slave Delay "] = cluster.getTestResultLabel(res) - if res == false { - ret = res + if test == "testFailOverNoRplChecksNoSemiSync" || test == "ALL" { + res = cluster.testFailOverNoRplChecksNoSemiSync() + allTests["1 Failover "] = cluster.getTestResultLabel(res) + if res == false { + ret = res + } } - - res = cluster.testFailOverAllSlavesDelayRplChecksNoSemiSync() - allTests["1 Failover All Slave Delay "] = cluster.getTestResultLabel(res) - if res == false { - ret = res + if test == "testFailOverAllSlavesDelayNoRplChecksNoSemiSync" || test == "ALL" { + res = cluster.testFailOverAllSlavesDelayNoRplChecksNoSemiSync() + allTests["1 Failover All Slave Delay "] = cluster.getTestResultLabel(res) + if res == false { + ret = res + } } - - res = cluster.testNumberFailOverLimitReach() - allTests["1 Failover Number of Failover Reach "] = cluster.getTestResultLabel(res) - if res == false { - ret = res + if test == "testFailOverAllSlavesDelayRplChecksNoSemiSync" || test == "ALL" { + res = cluster.testFailOverAllSlavesDelayRplChecksNoSemiSync() + allTests["1 Failover All Slave Delay "] = cluster.getTestResultLabel(res) + if res == false { + ret = res + } } - - res = cluster.testFailOverTimeNotReach() - allTests["1 Failover Before Time Limit "] = cluster.getTestResultLabel(res) - if res == false { - ret = res + if test == "testNumberFailOverLimitReach" || test == "ALL" { + res = cluster.testNumberFailOverLimitReach() + allTests["1 Failover Number of Failover Reach "] = cluster.getTestResultLabel(res) + if res == false { + ret = res + } + } + if test == "testFailOverTimeNotReach" || test == "ALL" { + res = cluster.testFailOverTimeNotReach() + allTests["1 Failover Before Time Limit "] = cluster.getTestResultLabel(res) + if res == false { + ret = res + } } keys := make([]string, 0, len(allTests)) @@ -1054,7 +1086,20 @@ func (cluster *Cluster) RunSysbench() error { return nil } -func (cluster *Cluster) StartAllNodes() error { - +func (cluster *Cluster) DelayAllSlaves() error { + for _, s := range cluster.slaves { + dbhelper.StopSlave(s.Conn) + } + result, err := dbhelper.WriteConcurrent2(cluster.master.DSN, 10) + if err != nil { + cluster.LogPrintf("BENCH : %s %s", err.Error(), result) + } + dbhelper.InjectLongTrx(cluster.master.Conn, 10) + time.Sleep(10 * time.Second) + for _, s := range cluster.slaves { + dbhelper.StartSlave(s.Conn) + } return nil } + +func diff --git a/httpserver.go b/httpserver.go index 20bd9db62..aab3519b1 100644 --- a/httpserver.go +++ b/httpserver.go @@ -160,6 +160,7 @@ func httpserver() { router.HandleFunc("/logout", g.LogoutHandler).Methods("POST") router.HandleFunc("/servers", handlerServers) router.HandleFunc("/setcluster", handlerSetCluster) + router.HandleFunc("/runonetest", handlerSetOneTest) router.HandleFunc("/master", handlerMaster) router.HandleFunc("/log", handlerLog) router.HandleFunc("/switchover", handlerSwitchover) @@ -189,6 +190,7 @@ func httpserver() { http.HandleFunc("/stats", handlerStats) http.HandleFunc("/servers", handlerServers) http.HandleFunc("/setcluster", handlerSetCluster) + http.HandleFunc("/runonetest", handlerSetOneTest) http.HandleFunc("/master", handlerMaster) http.HandleFunc("/log", handlerLog) http.HandleFunc("/switchover", handlerSwitchover) @@ -227,6 +229,10 @@ func handlerSetCluster(w http.ResponseWriter, r *http.Request) { } } +func handlerSetOneTest(w http.ResponseWriter, r *http.Request) { + currentCluster.RunAllTests(r.URL.Query().Get("test")) +} + func handlerApp(w http.ResponseWriter, r *http.Request) { http.ServeFile(w, r, confs[cfgGroup].HttpRoot+"/app.html") } @@ -276,7 +282,7 @@ func handlerSettings(w http.ResponseWriter, r *http.Request) { s.UptimeSemiSync = currentCluster.GetStateMachine().GetUptimeSemiSync() s.Test = fmt.Sprintf("%v", currentCluster.GetConf().Test) s.Heartbeat = fmt.Sprintf("%v", currentCluster.GetConf().Heartbeat) - s.Status = fmt.Sprintf("%v", currentCluster.GetRunStatus()) + s.Status = fmt.Sprintf("%v", runStatus) s.ConfGroup = fmt.Sprintf("%s", cfgGroup) s.MonitoringTicker = fmt.Sprintf("%d", currentCluster.GetConf().MonitoringTicker) s.FailResetTime = fmt.Sprintf("%d", currentCluster.GetConf().FailResetTime) @@ -423,7 +429,7 @@ func handlerBootstrap(w http.ResponseWriter, r *http.Request) { func handlerTests(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") - err := currentCluster.RunAllTests() + err := currentCluster.RunAllTests("ALL") if err == false { currentCluster.LogPrint("ERROR: Some tests failed") } diff --git a/misc/misc.go b/misc/misc.go index 85bd61738..89eca383c 100644 --- a/misc/misc.go +++ b/misc/misc.go @@ -10,8 +10,12 @@ package misc import ( "fmt" + "io" + "io/ioutil" "log" "net" + "os" + "path/filepath" "strconv" "strings" ) @@ -90,3 +94,105 @@ func Contains(s []string, e string) bool { } return false } + +// CopyFile copies the contents of the file named src to the file named +// by dst. The file will be created if it does not already exist. If the +// destination file exists, all it's contents will be replaced by the contents +// of the source file. The file mode will be copied from the source and +// the copied data is synced/flushed to stable storage. +func CopyFile(src, dst string) (err error) { + in, err := os.Open(src) + if err != nil { + return + } + defer in.Close() + + out, err := os.Create(dst) + if err != nil { + return + } + defer func() { + if e := out.Close(); e != nil { + err = e + } + }() + + _, err = io.Copy(out, in) + if err != nil { + return + } + + err = out.Sync() + if err != nil { + return + } + + si, err := os.Stat(src) + if err != nil { + return + } + err = os.Chmod(dst, si.Mode()) + if err != nil { + return + } + + return +} + +// CopyDir recursively copies a directory tree, attempting to preserve permissions. +// Source directory must exist, destination directory must *not* exist. +// Symlinks are ignored and skipped. +func CopyDir(src string, dst string) (err error) { + src = filepath.Clean(src) + dst = filepath.Clean(dst) + + si, err := os.Stat(src) + if err != nil { + return err + } + if !si.IsDir() { + return fmt.Errorf("source is not a directory") + } + + _, err = os.Stat(dst) + if err != nil && !os.IsNotExist(err) { + return + } + if err == nil { + return fmt.Errorf("destination already exists") + } + + err = os.MkdirAll(dst, si.Mode()) + if err != nil { + return + } + + entries, err := ioutil.ReadDir(src) + if err != nil { + return + } + + for _, entry := range entries { + srcPath := filepath.Join(src, entry.Name()) + dstPath := filepath.Join(dst, entry.Name()) + + if entry.IsDir() { + err = CopyDir(srcPath, dstPath) + if err != nil { + return + } + } else { + // Skip symlinks. + if entry.Mode()&os.ModeSymlink != 0 { + continue + } + + err = CopyFile(srcPath, dstPath) + if err != nil { + return + } + } + } + + return +} diff --git a/repmgr.go b/repmgr.go index 61aa4bd0b..d250492de 100644 --- a/repmgr.go +++ b/repmgr.go @@ -30,7 +30,7 @@ var ( termlength int runUUID string repmgrHostname string - + runStatus string swChan = make(chan bool) exitMsg string exit bool @@ -40,7 +40,7 @@ var ( func init() { runUUID = uuid.NewV4().String() - // runStatus = "A" + runStatus = "A" // conf := confs[cfgGroup] var errLog = mysql.Logger(log.New(ioutil.Discard, "", 0)) mysql.SetLogger(errLog)