Skip to content

Commit

Permalink
Merge 32412a3 into 4e49a53
Browse files Browse the repository at this point in the history
  • Loading branch information
nexustar authored Jun 5, 2023
2 parents 4e49a53 + 32412a3 commit b3d32dc
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 40 deletions.
3 changes: 2 additions & 1 deletion components/dm/command/scale_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tiup/pkg/cluster/spec"
"github.com/pingcap/tiup/pkg/cluster/task"
"github.com/pingcap/tiup/pkg/set"
"github.com/pingcap/tiup/pkg/utils"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -126,7 +127,7 @@ func ScaleInDMCluster(
var dmMasterEndpoint []string
for _, instance := range (&dm.DMMasterComponent{Topology: topo}).Instances() {
if !deletedNodes.Exist(instance.ID()) {
dmMasterEndpoint = append(dmMasterEndpoint, operator.Addr(instance))
dmMasterEndpoint = append(dmMasterEndpoint, utils.JoinHostPort(instance.GetManageHost(), instance.GetPort()))
}
}

Expand Down
2 changes: 1 addition & 1 deletion components/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
return fmt.Errorf("TiDB cluster doesn't support disaggregated mode in version %s", options.Version)
}
if !tidbver.TiFlashPlaygroundNewStartMode(options.Version) {
// For simplicitly, currently we only implemented disagg mode when TiFlash can run without config.
// For simplicity, currently we only implemented disagg mode when TiFlash can run without config.
return fmt.Errorf("TiUP playground only supports disaggregated mode for TiDB cluster >= v7.1.0 (or nightly)")
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/joomcode/errorx v1.1.0
github.com/juju/ansiterm v1.0.0
github.com/mattn/go-runewidth v0.0.14
github.com/minio/minio-go/v7 v7.0.52
github.com/otiai10/copy v1.9.0
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
Expand Down Expand Up @@ -89,7 +90,6 @@ require (
github.com/mattn/go-isatty v0.0.18 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/minio/minio-go/v7 v7.0.52 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand Down
7 changes: 1 addition & 6 deletions pkg/cluster/manager/display.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,13 +809,8 @@ func (m *Manager) DisplayDashboardInfo(clusterName string, timeout time.Duration
return err
}

pdEndpoints := make([]string, 0)
for _, pd := range metadata.Topology.PDServers {
pdEndpoints = append(pdEndpoints, utils.JoinHostPort(pd.Host, pd.ClientPort))
}

ctx := context.WithValue(context.Background(), logprinter.ContextKeyLogger, m.logger)
pdAPI := api.NewPDClient(ctx, pdEndpoints, timeout, tlsCfg)
pdAPI := api.NewPDClient(ctx, metadata.Topology.GetPDListWithManageHost(), timeout, tlsCfg)
dashboardAddr, err := pdAPI.GetDashboardAddress()
if err != nil {
return fmt.Errorf("failed to retrieve TiDB Dashboard instance from PD: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/operation/scale_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func scaleInCDC(
deferInstances := make([]spec.Instance, 0, 1)
for _, instance := range instances {
address := instance.(*spec.CDCInstance).GetAddr()
client := api.NewCDCOpenAPIClient(ctx, []string{address}, 5*time.Second, tlsCfg)
client := api.NewCDCOpenAPIClient(ctx, []string{utils.JoinHostPort(instance.GetManageHost(), instance.GetPort())}, 5*time.Second, tlsCfg)

capture, err := client.GetCaptureByAddr(address)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/operation/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func Upgrade(

// during the upgrade process, endpoint addresses should not change, so only new the client once.
if cdcOpenAPIClient == nil {
cdcOpenAPIClient = api.NewCDCOpenAPIClient(ctx, topo.(*spec.Specification).GetCDCList(), 5*time.Second, tlsCfg)
cdcOpenAPIClient = api.NewCDCOpenAPIClient(ctx, topo.(*spec.Specification).GetCDCListWithManageHost(), 5*time.Second, tlsCfg)
}

capture, err := cdcOpenAPIClient.GetCaptureByAddr(address)
Expand Down
6 changes: 3 additions & 3 deletions pkg/cluster/spec/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (i *CDCInstance) InitConfig(
}
cfg := &scripts.CDCScript{
Addr: utils.JoinHostPort(i.GetListenHost(), spec.Port),
AdvertiseAddr: i.GetAddr(),
AdvertiseAddr: utils.JoinHostPort(i.GetHost(), i.GetPort()),
PD: strings.Join(pds, ","),
GCTTL: spec.GCTTL,
TZ: spec.TZ,
Expand Down Expand Up @@ -261,7 +261,7 @@ func (i *CDCInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutS
}

start := time.Now()
client := api.NewCDCOpenAPIClient(ctx, topo.(*Specification).GetCDCList(), 5*time.Second, tlsCfg)
client := api.NewCDCOpenAPIClient(ctx, topo.(*Specification).GetCDCListWithManageHost(), 5*time.Second, tlsCfg)
if err := client.Healthy(); err != nil {
logger.Debugf("cdc pre-restart skipped, the cluster unhealthy, trigger hard restart, "+
"addr: %s, err: %+v, elapsed: %+v", address, err, time.Since(start))
Expand Down Expand Up @@ -328,7 +328,7 @@ func (i *CDCInstance) PostRestart(ctx context.Context, topo Topology, tlsCfg *tl
start := time.Now()
address := i.GetAddr()

client := api.NewCDCOpenAPIClient(ctx, []string{address}, 5*time.Second, tlsCfg)
client := api.NewCDCOpenAPIClient(ctx, []string{utils.JoinHostPort(i.GetManageHost(), i.GetPort())}, 5*time.Second, tlsCfg)
err := client.IsCaptureAlive()
if err != nil {
logger.Debugf("cdc post-restart finished, get capture status failed, addr: %s, err: %+v, elapsed: %+v", address, err, time.Since(start))
Expand Down
16 changes: 12 additions & 4 deletions pkg/cluster/spec/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s *PDSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.
timeout = statusQueryTimeout
}

addr := utils.JoinHostPort(s.Host, s.ClientPort)
addr := utils.JoinHostPort(s.GetManageHost(), s.ClientPort)
pc := api.NewPDClient(ctx, []string{addr}, timeout, tlsCfg)

// check health
Expand Down Expand Up @@ -101,6 +101,14 @@ func (s *PDSpec) GetMainPort() int {
return s.ClientPort
}

// GetManageHost returns the manage host of the instance
func (s *PDSpec) GetManageHost() string {
if s.ManageHost != "" {
return s.ManageHost
}
return s.Host
}

// IsImported returns if the node is imported from TiDB-Ansible
func (s *PDSpec) IsImported() bool {
return s.Imported
Expand Down Expand Up @@ -372,7 +380,7 @@ func (i *PDInstance) IsLeader(ctx context.Context, topo Topology, apiTimeoutSeco
if !ok {
panic("topo should be type of tidb topology")
}
pdClient := api.NewPDClient(ctx, tidbTopo.GetPDList(), time.Second*5, tlsCfg)
pdClient := api.NewPDClient(ctx, tidbTopo.GetPDListWithManageHost(), time.Second*5, tlsCfg)

return i.checkLeader(pdClient)
}
Expand All @@ -397,7 +405,7 @@ func (i *PDInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutSe
if !ok {
panic("topo should be type of tidb topology")
}
pdClient := api.NewPDClient(ctx, tidbTopo.GetPDList(), time.Second*5, tlsCfg)
pdClient := api.NewPDClient(ctx, tidbTopo.GetPDListWithManageHost(), time.Second*5, tlsCfg)

isLeader, err := i.checkLeader(pdClient)
if err != nil {
Expand All @@ -422,7 +430,7 @@ func (i *PDInstance) PostRestart(ctx context.Context, topo Topology, tlsCfg *tls
Delay: time.Second,
Timeout: 120 * time.Second,
}
currentPDAddrs := []string{utils.JoinHostPort(i.Host, i.Port)}
currentPDAddrs := []string{utils.JoinHostPort(i.GetManageHost(), i.Port)}
pdClient := api.NewPDClient(ctx, currentPDAddrs, 5*time.Second, tlsCfg)

if err := utils.Retry(pdClient.CheckHealth, timeoutOpt); err != nil {
Expand Down
21 changes: 18 additions & 3 deletions pkg/cluster/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,26 @@ func (s *Specification) GetPDList() []string {
return pdList
}

// GetCDCList returns a list of CDC API hosts of the current cluster
func (s *Specification) GetCDCList() []string {
// GetPDListWithManageHost returns a list of PD API hosts of the current cluster
func (s *Specification) GetPDListWithManageHost() []string {
var pdList []string

for _, pd := range s.PDServers {
pdList = append(pdList, utils.JoinHostPort(pd.GetManageHost(), pd.ClientPort))
}

return pdList
}

// GetCDCListWithManageHost returns a list of CDC API hosts of the current cluster
func (s *Specification) GetCDCListWithManageHost() []string {
var result []string
for _, server := range s.CDCServers {
result = append(result, utils.JoinHostPort(server.Host, server.Port))
host := server.Host
if server.ManageHost != "" {
host = server.ManageHost
}
result = append(result, utils.JoinHostPort(host, server.Port))
}
return result
}
Expand Down
12 changes: 2 additions & 10 deletions pkg/cluster/spec/tiflash.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ server_configs:
utils.JoinHostPort(spec.Host, spec.FlashServicePort),
i.GetListenHost(),
spec.StatusPort,
strings.Join(i.getEndpoints(i.topo), ","),
strings.Join(i.topo.(*Specification).GetPDList(), ","),
paths.Deploy,
fmt.Sprintf("%s/tmp", paths.Data[0]),
deprecatedUsersConfig,
Expand Down Expand Up @@ -837,14 +837,6 @@ type replicateConfig struct {
EnablePlacementRules string `json:"enable-placement-rules"`
}

func (i *TiFlashInstance) getEndpoints(topo Topology) []string {
var endpoints []string
for _, pd := range topo.(*Specification).PDServers {
endpoints = append(endpoints, utils.JoinHostPort(pd.Host, pd.ClientPort))
}
return endpoints
}

// PrepareStart checks TiFlash requirements before starting
func (i *TiFlashInstance) PrepareStart(ctx context.Context, tlsCfg *tls.Config) error {
// set enable-placement-rules to true via PDClient
Expand All @@ -867,7 +859,7 @@ func (i *TiFlashInstance) PrepareStart(ctx context.Context, tlsCfg *tls.Config)
topo = i.topo
}

endpoints := i.getEndpoints(topo)
endpoints := topo.(*Specification).GetPDListWithManageHost()
pdClient := api.NewPDClient(ctx, endpoints, 10*time.Second, tlsCfg)
return pdClient.UpdateReplicateConfig(bytes.NewBuffer(enablePlacementRules))
}
Expand Down
13 changes: 4 additions & 9 deletions pkg/cluster/spec/tikv_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,6 @@ func (i *TiKVCDCInstance) setTLSConfig(ctx context.Context, enableTLS bool, conf

var _ RollingUpdateInstance = &TiKVCDCInstance{}

// GetAddr return the address of this TiKV-CDC instance
func (i *TiKVCDCInstance) GetAddr() string {
return utils.JoinHostPort(i.GetHost(), i.GetPort())
}

// PreRestart implements RollingUpdateInstance interface.
// All errors are ignored, to trigger hard restart.
func (i *TiKVCDCInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutSeconds int, tlsCfg *tls.Config) error {
Expand All @@ -244,15 +239,15 @@ func (i *TiKVCDCInstance) PreRestart(ctx context.Context, topo Topology, apiTime
panic("logger not found")
}

address := i.GetAddr()
address := utils.JoinHostPort(i.GetHost(), i.GetPort())
// cdc rolling upgrade strategy only works if there are more than 2 captures
if len(tidbTopo.TiKVCDCServers) <= 1 {
logger.Debugf("tikv-cdc pre-restart skipped, only one capture in the topology, addr: %s", address)
return nil
}

start := time.Now()
client := api.NewTiKVCDCOpenAPIClient(ctx, []string{address}, 5*time.Second, tlsCfg)
client := api.NewTiKVCDCOpenAPIClient(ctx, []string{utils.JoinHostPort(i.GetManageHost(), i.GetPort())}, 5*time.Second, tlsCfg)
captures, err := client.GetAllCaptures()
if err != nil {
logger.Debugf("tikv-cdc pre-restart skipped, cannot get all captures, trigger hard restart, addr: %s, elapsed: %+v", address, time.Since(start))
Expand Down Expand Up @@ -303,9 +298,9 @@ func (i *TiKVCDCInstance) PostRestart(ctx context.Context, topo Topology, tlsCfg
}

start := time.Now()
address := i.GetAddr()
address := utils.JoinHostPort(i.GetHost(), i.GetPort())

client := api.NewTiKVCDCOpenAPIClient(ctx, []string{address}, 5*time.Second, tlsCfg)
client := api.NewTiKVCDCOpenAPIClient(ctx, []string{utils.JoinHostPort(i.GetManageHost(), i.GetPort())}, 5*time.Second, tlsCfg)
err := client.IsCaptureAlive()
if err != nil {
logger.Debugf("tikv-cdc post-restart finished, get capture status failed, addr: %s, err: %+v, elapsed: %+v", address, err, time.Since(start))
Expand Down

0 comments on commit b3d32dc

Please sign in to comment.