Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 50 additions & 115 deletions components/playground/instance/tici.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ import (
"context"
"fmt"
"path/filepath"
"syscall"

"github.com/pingcap/tiup/pkg/tui/colorstr"
"github.com/pingcap/tiup/pkg/utils"
)

Expand All @@ -36,65 +34,50 @@ const (
// TiCIInstance represents a TiCI service instance (either MetaServer or WorkerNode)
type TiCIInstance struct {
instance
Process

// TiCI specific fields
pds []*PDInstance
dbs []*TiDBInstance
role TiCIRole // Instance role (meta or worker)

// Process - only one process per instance
process *process
}

var _ Instance = &TiCIInstance{}

// NewTiCIMetaInstance creates a TiCI MetaServer instance
func NewTiCIMetaInstance(shOpt SharedOptions, baseDir, host string, id int, pds []*PDInstance,
ticiBinaryDir, configDir string) *TiCIInstance {
return NewTiCIInstanceWithRole(shOpt, baseDir, host, id, pds, ticiBinaryDir, configDir, TiCIRoleMeta)
func NewTiCIMetaInstance(shOpt SharedOptions, binPath string, dir, host, configPath string, id int, pds []*PDInstance, dbs []*TiDBInstance) *TiCIInstance {
return NewTiCIInstanceWithRole(shOpt, binPath, dir, host, configPath, id, pds, dbs, TiCIRoleMeta)
}

// NewTiCIWorkerInstance creates a TiCI WorkerNode instance
func NewTiCIWorkerInstance(shOpt SharedOptions, baseDir, host string, id int, pds []*PDInstance,
ticiBinaryDir, configDir string) *TiCIInstance {
return NewTiCIInstanceWithRole(shOpt, baseDir, host, id, pds, ticiBinaryDir, configDir, TiCIRoleWorker)
func NewTiCIWorkerInstance(shOpt SharedOptions, binPath string, dir, host, configPath string, id int, pds []*PDInstance, dbs []*TiDBInstance) *TiCIInstance {
return NewTiCIInstanceWithRole(shOpt, binPath, dir, host, configPath, id, pds, dbs, TiCIRoleWorker)
}

// NewTiCIInstanceWithRole creates a TiCI instance with specified role
func NewTiCIInstanceWithRole(shOpt SharedOptions, baseDir, host string, id int, pds []*PDInstance,
ticiBinaryDir, configDir string, role TiCIRole) *TiCIInstance {
var componentSuffix string
func NewTiCIInstanceWithRole(shOpt SharedOptions, binPath string, dir, host, configPath string, id int, pds []*PDInstance, dbs []*TiDBInstance, role TiCIRole) *TiCIInstance {
var defaultPort, defaultStatusPort int
var configPath, binPath string
var configFilePath string

switch role {
case TiCIRoleMeta:
// MetaServer default port
componentSuffix = "meta"
defaultPort = 8500
defaultStatusPort = 8501
if configDir != "" {
configPath = filepath.Join(configDir, "test-meta.toml")
} else {
configPath = filepath.Join(ticiBinaryDir, "../../ci", "test-meta.toml")
if configPath != "" {
configFilePath = filepath.Join(configPath, "test-meta.toml")
}
binPath = filepath.Join(ticiBinaryDir, "meta_service_server")
case TiCIRoleWorker:
// WorkerNode default port
componentSuffix = "worker"
defaultPort = 8510
defaultStatusPort = 8511
if configDir != "" {
configPath = filepath.Join(configDir, "test-worker.toml")
} else {
configPath = filepath.Join(ticiBinaryDir, "../../ci", "test-worker.toml")
if configPath != "" {
configFilePath = filepath.Join(configPath, "test-worker.toml")
}
binPath = filepath.Join(ticiBinaryDir, "worker_node_server")
default:
panic("invalid TiCI role")
}

dir := filepath.Join(baseDir, fmt.Sprintf("tici-%s-%d", componentSuffix, id))

tici := &TiCIInstance{
instance: instance{
BinPath: binPath,
Expand All @@ -103,69 +86,50 @@ func NewTiCIInstanceWithRole(shOpt SharedOptions, baseDir, host string, id int,
Host: host,
Port: utils.MustGetFreePort(host, defaultPort, shOpt.PortOffset),
StatusPort: utils.MustGetFreePort(host, defaultStatusPort, shOpt.PortOffset),
ConfigPath: configPath,
ConfigPath: configFilePath,
},
pds: pds,
dbs: dbs,
role: role,
}

return tici
}

// Start implements Instance interface - starts the appropriate process
func (t *TiCIInstance) Start(ctx context.Context) error {
if t.process != nil {
return fmt.Errorf("TiCI instance already started")
}

if err := utils.MkdirAll(t.Dir, 0755); err != nil {
return fmt.Errorf("failed to create directory %s: %v", t.Dir, err)
func (inst *TiCIInstance) Start(ctx context.Context) error {
configPath := filepath.Join(inst.Dir, fmt.Sprintf("%s.toml", inst.Component()))
if err := prepareConfig(
configPath,
inst.ConfigPath,
inst.getConfig(),
); err != nil {
return err
}

return t.startInstance(ctx)
}

func (t *TiCIInstance) startInstance(ctx context.Context) error {
args := []string{}

// Set the config path
args = append(args, fmt.Sprintf("--config=%s", t.ConfigPath))

t.process = &process{cmd: PrepareCommand(ctx, t.BinPath, args, nil, t.Dir)}

// Set up logging
logIfErr(t.process.SetOutputFile(t.LogFile()))

return t.process.Start()
}

// Wait implements Instance interface
func (t *TiCIInstance) Wait() error {
if t.process != nil {
return t.process.Wait()
args := []string{
fmt.Sprintf("--config=%s", configPath),
}
return nil
}
inst.Process = &process{cmd: PrepareCommand(ctx, inst.BinPath, args, nil, inst.Dir)}

// Pid implements Instance interface
func (t *TiCIInstance) Pid() int {
if t.process != nil && t.process.Cmd() != nil && t.process.Cmd().Process != nil {
return t.process.Cmd().Process.Pid
}
return 0
logIfErr(inst.Process.SetOutputFile(inst.LogFile()))
return inst.Process.Start()
}

// Uptime implements Instance interface
func (t *TiCIInstance) Uptime() string {
if t.process != nil {
return t.process.Uptime()
func (inst *TiCIInstance) getConfig() map[string]any {
switch inst.role {
case TiCIRoleMeta:
return inst.getMetaConfig()
case TiCIRoleWorker:
return inst.getWorkerConfig()
default:
return nil // Should not happen
}
return "N/A"
}

// Component implements Instance interface
func (t *TiCIInstance) Component() string {
switch t.role {
// Component implements Process interface
func (inst *TiCIInstance) Component() string {
switch inst.role {
case TiCIRoleMeta:
return "tici-meta"
case TiCIRoleWorker:
Expand All @@ -175,53 +139,24 @@ func (t *TiCIInstance) Component() string {
}
}

// LogFile implements Instance interface
func (t *TiCIInstance) LogFile() string {
switch t.role {
// LogFile implements Process interface
func (inst *TiCIInstance) LogFile() string {
switch inst.role {
case TiCIRoleMeta:
return filepath.Join(t.Dir, "tici-meta.log")
return filepath.Join(inst.Dir, "tici-meta.log")
case TiCIRoleWorker:
return filepath.Join(t.Dir, "tici-worker.log")
return filepath.Join(inst.Dir, "tici-worker.log")
default:
return filepath.Join(t.Dir, "tici.log")
return filepath.Join(inst.Dir, "tici.log")
}
}

// Cmd returns the process command
func (t *TiCIInstance) Cmd() any {
if t.process != nil {
return t.process.Cmd()
}
return nil
}

// MetaAddr returns the MetaServer address (only valid for MetaServer instances)
func (t *TiCIInstance) MetaAddr() string {
if t.role == TiCIRoleMeta {
return utils.JoinHostPort(AdvertiseHost(t.Host), t.Port)
}
return ""
// Addr returns the address for connecting to the TiCI instance.
func (inst *TiCIInstance) Addr() string {
return utils.JoinHostPort(AdvertiseHost(inst.Host), inst.Port)
}

// WorkerAddr returns the WorkerNode address (only valid for WorkerNode instances)
func (t *TiCIInstance) WorkerAddr() string {
if t.role == TiCIRoleWorker {
return utils.JoinHostPort(AdvertiseHost(t.Host), t.Port)
}
return ""
}

// PrepareBinary is a no-op for TiCI since it uses external binaries
func (t *TiCIInstance) PrepareBinary(component, name string, version utils.Version) error {
// TiCI uses external binaries, no preparation needed
// But we output the startup message to match other components
_, _ = colorstr.Printf("[dark_gray]Start %s instance: %s[reset]\n", component, t.BinPath)
return nil
}

// Terminate terminates the process gracefully
func (t *TiCIInstance) Terminate(sig syscall.Signal) {
if t.process != nil && t.process.Cmd() != nil && t.process.Cmd().Process != nil {
_ = syscall.Kill(t.process.Cmd().Process.Pid, sig)
}
// StatusAddr returns the status address for the TiCI instance.
func (inst *TiCIInstance) StatusAddr() string {
return utils.JoinHostPort(AdvertiseHost(inst.Host), inst.StatusPort)
}
66 changes: 66 additions & 0 deletions components/playground/instance/tici_meta_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE_2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package instance

import (
"fmt"
)

func warpAddr(addr string) string {
return fmt.Sprintf("http://%s", addr)
}

func (inst *TiCIInstance) getMetaConfig() map[string]any {
config := make(map[string]any)
tidbServers := make([]string, 0, len(inst.dbs))
for _, db := range inst.dbs {
tidbServers = append(tidbServers, db.DSN())
}
config["tidb_servers"] = tidbServers
config["pd_addr"] = warpAddr(inst.pds[0].Addr())
config["cert_path"] = ""
config["addr"] = warpAddr(inst.Addr())
config["advertise_addr"] = warpAddr(inst.Addr())
config["status_addr"] = warpAddr(inst.StatusAddr())
config["advertise_status_addr"] = warpAddr(inst.StatusAddr())

// reader pool config
config["reader_pool.ttl_seconds"] = 9
config["reader_pool.cleanup_interval_seconds"] = 1
config["reader_pool.scheduling_strategy"] = "round_robin"

// S3 config
// TODO: make it configurable
endpoint, ak, sk, bucket, prefix := GetDefaultTiCIMetaS3Config()
config["s3.endpoint"] = endpoint
config["s3.region"] = "us_east_1"
config["s3.access_key"] = ak
config["s3.secret_key"] = sk
config["s3.use_path_style"] = false
config["s3.bucket"] = bucket
config["s3.prefix"] = prefix

// shard config
config["shard.compaction_fragments"] = 10
config["shard.compaction_datafiles"] = 300
config["shard.max_size"] = "1024MB"
config["shard.split_threshold"] = 0.75

return config
}

// GetDefaultTiCIMetaS3Config returns the default S3 configuration for TiCI Meta Service
func GetDefaultTiCIMetaS3Config() (string, string, string, string, string) {
return "http://localhost:9000", "minioadmin", "minioadmin", "logbucket", "storage_test"
}
41 changes: 41 additions & 0 deletions components/playground/instance/tici_worker_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE_2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package instance

func (inst *TiCIInstance) getWorkerConfig() map[string]any {
config := make(map[string]any)

// S3 config
// TODO: make it configurable
config["s3.endpoint"] = "http://localhost:9000"
config["s3.region"] = "us_east_1"
config["s3.access_key"] = "minioadmin"
config["s3.secret_key"] = "minioadmin"
config["s3.use_path_style"] = true

// fragment writer config
config["frag_writer.local_data_path"] = "fragments"
config["frag_writer.index_num_threads"] = 4
config["frag_writer.index_mem_budget"] = "255MB"
config["frag_writer.index_flush_interval"] = "5s"
config["frag_writer.index_flush_size_limit"] = "5MB"

// server config
config["server.grpc_address"] = warpAddr(inst.Addr())
config["server.status_addr"] = warpAddr(inst.StatusAddr())
config["server.heartbeat_interval"] = "3s"
config["server.pd_addr"] = warpAddr(inst.pds[0].Addr())

return config
}
5 changes: 5 additions & 0 deletions components/playground/instance/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,8 @@ func (inst *TiDBInstance) LogFile() string {
func (inst *TiDBInstance) Addr() string {
return utils.JoinHostPort(AdvertiseHost(inst.Host), inst.Port)
}

// DSN returns the Data Source Name for connecting to TiDB.
func (inst *TiDBInstance) DSN() string {
return fmt.Sprintf("mysql://root@%s", inst.Addr())
}
Loading