Skip to content

Commit

Permalink
mcs: add a test for starting tso server first (tikv#6535)
Browse files Browse the repository at this point in the history
ref tikv#5895

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] committed Aug 2, 2023
1 parent 9ffc5ff commit f77c86b
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pkg/mcs/resource_manager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (s *Server) startServer() (err error) {
s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, strconv.FormatUint(s.clusterID, 10),
utils.ResourceManagerServiceName, s.cfg.AdvertiseListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds)
if err := s.serviceRegister.Register(); err != nil {
log.Error("failed to regiser the service", zap.String("service-name", utils.ResourceManagerServiceName), errs.ZapError(err))
log.Error("failed to register the service", zap.String("service-name", utils.ResourceManagerServiceName), errs.ZapError(err))
return err
}
atomic.StoreInt64(&s.isRunning, 1)
Expand Down
3 changes: 3 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,9 @@ func (s *Server) Run() error {
if err := s.startEtcd(s.ctx); err != nil {
return err
}
failpoint.Inject("delayStartServer", func() {
time.Sleep(2 * time.Second)
})
if err := s.startServer(s.ctx); err != nil {
return err
}
Expand Down
69 changes: 69 additions & 0 deletions tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,22 @@
package tso

import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
apis "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
)
Expand Down Expand Up @@ -104,3 +109,67 @@ func mustGetKeyspaceGroupMembers(re *require.Assertions, server *tso.Server) map
re.NoError(json.Unmarshal(data, &resp))
return resp
}

func TestTSOServerStartFirst(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServer", `return(true)`))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

apiCluster, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, serverName string) {
conf.Keyspace.PreAlloc = []string{"k1", "k2"}
})
defer apiCluster.Destroy()
re.NoError(err)
addr := apiCluster.GetConfig().GetClientURL()
ch := make(chan struct{})
defer close(ch)
clusterCh := make(chan *mcs.TestTSOCluster)
defer close(clusterCh)
go func() {
tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, addr)
re.NoError(err)
primary := tsoCluster.WaitForDefaultPrimaryServing(re)
re.NotNil(primary)
clusterCh <- tsoCluster
ch <- struct{}{}
}()
err = apiCluster.RunInitialServers()
re.NoError(err)
leaderName := apiCluster.WaitLeader()
pdLeaderServer := apiCluster.GetServer(leaderName)
re.NoError(pdLeaderServer.BootstrapCluster())
re.NoError(err)
tsoCluster := <-clusterCh
defer tsoCluster.Destroy()
<-ch

time.Sleep(time.Second * 1)
input := make(map[string]interface{})
input["new-id"] = 1
input["keyspaces"] = []uint32{2}
jsonBody, err := json.Marshal(input)
re.NoError(err)
httpReq, err := http.NewRequest(http.MethodPost, addr+"/pd/api/v2/tso/keyspace-groups/0/split", bytes.NewBuffer(jsonBody))
re.NoError(err)
httpResp, err := dialClient.Do(httpReq)
re.NoError(err)
defer httpResp.Body.Close()
re.Equal(http.StatusOK, httpResp.StatusCode)

httpReq, err = http.NewRequest(http.MethodGet, addr+"/pd/api/v2/tso/keyspace-groups/0", nil)
re.NoError(err)
httpResp, err = dialClient.Do(httpReq)
re.NoError(err)
data, err := io.ReadAll(httpResp.Body)
re.NoError(err)
defer httpResp.Body.Close()
re.Equal(http.StatusOK, httpResp.StatusCode)

var group endpoint.KeyspaceGroup
re.NoError(json.Unmarshal(data, &group))
re.Len(group.Keyspaces, 2)
re.Len(group.Members, 2)

re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServer"))
}

0 comments on commit f77c86b

Please sign in to comment.