Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions components/playground/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func prepareConfig(outputConfigPath string, userConfigPath string, preDefinedCon
return err
}

userConfig, err := unmarshalConfig(userConfigPath)
userConfig, err := UnmarshalConfig(userConfigPath)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -194,7 +194,8 @@ func prepareConfig(outputConfigPath string, userConfigPath string, preDefinedCon
return enc.Encode(spec.MergeConfig(preDefinedConfig, userConfig))
}

func unmarshalConfig(path string) (map[string]any, error) {
// UnmarshalConfig reads a TOML config file and returns the parsed content as a map.
func UnmarshalConfig(path string) (map[string]any, error) {
if path == "" {
return nil, nil
}
Expand Down
88 changes: 31 additions & 57 deletions components/playground/instance/tici.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ type TiCIInstance struct {
instance

// TiCI specific fields
pds []*PDInstance
ticiDir string // TiCI project directory
configDir string // Configuration directory
role TiCIRole // Instance role (meta or worker)
pds []*PDInstance
role TiCIRole // Instance role (meta or worker)

// Process - only one process per instance
process *process
Expand All @@ -51,33 +49,46 @@ var _ Instance = &TiCIInstance{}

// NewTiCIMetaInstance creates a TiCI MetaServer instance
func NewTiCIMetaInstance(shOpt SharedOptions, baseDir, host string, id int, pds []*PDInstance,
ticiDir, configDir string) *TiCIInstance {
return NewTiCIInstanceWithRole(shOpt, baseDir, host, id, pds, ticiDir, configDir, TiCIRoleMeta)
ticiBinaryDir, configDir string) *TiCIInstance {
return NewTiCIInstanceWithRole(shOpt, baseDir, host, id, pds, ticiBinaryDir, configDir, TiCIRoleMeta)
}

// NewTiCIWorkerInstance creates a TiCI WorkerNode instance
func NewTiCIWorkerInstance(shOpt SharedOptions, baseDir, host string, id int, pds []*PDInstance,
ticiDir, configDir string) *TiCIInstance {
return NewTiCIInstanceWithRole(shOpt, baseDir, host, id, pds, ticiDir, configDir, TiCIRoleWorker)
ticiBinaryDir, configDir string) *TiCIInstance {
return NewTiCIInstanceWithRole(shOpt, baseDir, host, id, pds, ticiBinaryDir, configDir, TiCIRoleWorker)
}

// NewTiCIInstanceWithRole creates a TiCI instance with specified role
func NewTiCIInstanceWithRole(shOpt SharedOptions, baseDir, host string, id int, pds []*PDInstance,
ticiDir, configDir string, role TiCIRole) *TiCIInstance {
ticiBinaryDir, configDir string, role TiCIRole) *TiCIInstance {
var componentSuffix string
var defaultPort, defaultStatusPort int
var configPath, binPath string

switch role {
case TiCIRoleMeta:
// MetaServer default port
componentSuffix = "meta"
defaultPort = 8500
defaultStatusPort = 8501
if configDir != "" {
configPath = filepath.Join(configDir, "test-meta.toml")
} else {
configPath = filepath.Join(ticiBinaryDir, "../../ci", "test-meta.toml")
}
binPath = filepath.Join(ticiBinaryDir, "meta_service_server")
case TiCIRoleWorker:
// WorkerNode default port
componentSuffix = "worker"
defaultPort = 8510
defaultStatusPort = 8511
if configDir != "" {
configPath = filepath.Join(configDir, "test-worker.toml")
} else {
configPath = filepath.Join(ticiBinaryDir, "../../ci", "test-worker.toml")
}
binPath = filepath.Join(ticiBinaryDir, "worker_node_server")
default:
panic("invalid TiCI role")
}
Expand All @@ -86,18 +97,16 @@ func NewTiCIInstanceWithRole(shOpt SharedOptions, baseDir, host string, id int,

tici := &TiCIInstance{
instance: instance{
BinPath: ticiDir, // TiCI project directory
BinPath: binPath,
ID: id,
Dir: dir,
Host: host,
Port: utils.MustGetFreePort(host, defaultPort, shOpt.PortOffset),
StatusPort: utils.MustGetFreePort(host, defaultStatusPort, shOpt.PortOffset),
ConfigPath: configDir,
ConfigPath: configPath,
},
pds: pds,
ticiDir: ticiDir,
configDir: configDir,
role: role,
pds: pds,
role: role,
}

return tici
Expand All @@ -113,54 +122,19 @@ func (t *TiCIInstance) Start(ctx context.Context) error {
return fmt.Errorf("failed to create directory %s: %v", t.Dir, err)
}

switch t.role {
case TiCIRoleMeta:
return t.startMetaServer(ctx)
case TiCIRoleWorker:
return t.startWorkerNode(ctx)
default:
return fmt.Errorf("invalid TiCI role: %v", t.role)
}
}

func (t *TiCIInstance) startMetaServer(ctx context.Context) error {
args := []string{}

// Use default or provided config path
metaConfigPath := filepath.Join(t.ticiDir, "ci", "test-meta.toml")
if t.configDir != "" {
metaConfigPath = filepath.Join(t.configDir, "test-meta.toml")
}
args = append(args, fmt.Sprintf("--config=%s", metaConfigPath))

// MetaServer binary path
metaBinPath := filepath.Join(t.ticiDir, "meta_service_server")
t.process = &process{cmd: PrepareCommand(ctx, metaBinPath, args, nil, t.ticiDir)}

// Set up logging
logFile := filepath.Join(t.Dir, "tici-meta.log")
logIfErr(t.process.SetOutputFile(logFile))

return t.process.Start()
return t.startInstance(ctx)
}

func (t *TiCIInstance) startWorkerNode(ctx context.Context) error {
func (t *TiCIInstance) startInstance(ctx context.Context) error {
args := []string{}

// Use default or provided config path
workerConfigPath := filepath.Join(t.ticiDir, "ci", "test-worker.toml")
if t.configDir != "" {
workerConfigPath = filepath.Join(t.configDir, "test-worker.toml")
}
args = append(args, fmt.Sprintf("--config=%s", workerConfigPath))
// Set the config path
args = append(args, fmt.Sprintf("--config=%s", t.ConfigPath))

// WorkerNode binary path
workerBinPath := filepath.Join(t.ticiDir, "worker_node_server")
t.process = &process{cmd: PrepareCommand(ctx, workerBinPath, args, nil, t.ticiDir)}
t.process = &process{cmd: PrepareCommand(ctx, t.BinPath, args, nil, t.Dir)}

// Set up logging
logFile := filepath.Join(t.Dir, "tici-worker.log")
logIfErr(t.process.SetOutputFile(logFile))
logIfErr(t.process.SetOutputFile(t.LogFile()))

return t.process.Start()
}
Expand Down Expand Up @@ -241,7 +215,7 @@ func (t *TiCIInstance) WorkerAddr() string {
func (t *TiCIInstance) PrepareBinary(component, name string, version utils.Version) error {
// TiCI uses external binaries, no preparation needed
// But we output the startup message to match other components
_, _ = colorstr.Printf("[dark_gray]Start %s instance: %s[reset]\n", component, t.ticiDir)
_, _ = colorstr.Printf("[dark_gray]Start %s instance: %s[reset]\n", component, t.BinPath)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion components/playground/instance/tiflash.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (inst *TiFlashInstance) Start(ctx context.Context) error {
{"flash.proxy.data-dir", filepath.Join(inst.Dir, "proxy_data")},
{"flash.proxy.log-file", filepath.Join(inst.Dir, "tiflash_tikv.log")},
}
userConfig, err := unmarshalConfig(configPath)
userConfig, err := UnmarshalConfig(configPath)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions components/playground/instance/tiflash_pre7.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ func (inst *TiFlashInstance) checkConfigOld(deployDir, clusterManagerPath string
return
}

cfg, err := unmarshalConfig(inst.ConfigPath)
cfg, err := UnmarshalConfig(inst.ConfigPath)
if err != nil {
return errors.Trace(err)
}
proxyPath := getTiFlashProxyConfigPathOld(cfg)
if proxyPath != "" {
proxyCfg, err := unmarshalConfig(proxyPath)
proxyCfg, err := UnmarshalConfig(proxyPath)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion components/playground/instance/tiproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (c *TiProxy) Start(ctx context.Context) error {
return err
}

userConfig, err := unmarshalConfig(c.ConfigPath)
userConfig, err := UnmarshalConfig(c.ConfigPath)
if err != nil {
return err
}
Expand Down
25 changes: 17 additions & 8 deletions components/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,13 +895,11 @@ func (p *Playground) addInstance(componentID string, pdRole instance.PDRole, tif
ins = inst
p.tikvCdcs = append(p.tikvCdcs, inst)
case "tici-meta":
inst := instance.NewTiCIInstanceWithRole(p.bootOptions.ShOpt, p.dataDir, host, id, p.pds,
cfg.BinPath, cfg.ConfigPath, instance.TiCIRoleMeta)
inst := instance.NewTiCIMetaInstance(p.bootOptions.ShOpt, p.dataDir, host, id, p.pds, cfg.BinPath, cfg.ConfigPath)
ins = inst
p.ticis = append(p.ticis, inst)
case "tici-worker":
inst := instance.NewTiCIInstanceWithRole(p.bootOptions.ShOpt, p.dataDir, host, id, p.pds,
cfg.BinPath, cfg.ConfigPath, instance.TiCIRoleWorker)
inst := instance.NewTiCIWorkerInstance(p.bootOptions.ShOpt, p.dataDir, host, id, p.pds, cfg.BinPath, cfg.ConfigPath)
ins = inst
p.ticis = append(p.ticis, inst)
case spec.ComponentPump:
Expand Down Expand Up @@ -1328,11 +1326,22 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
}

// Wait for TiCDC to be ready
time.Sleep(8 * time.Second)
time.Sleep(15 * time.Second)

// Create changefeed
fmt.Println("Creating changefeed...")
if err := p.createChangefeed(); err != nil {
ticiMetaConfigPath := filepath.Join(options.TiCIMeta.BinPath, "../../ci", "test-meta.toml")
if options.TiCIMeta.ConfigPath != "" {
ticiMetaConfigPath = filepath.Join(options.TiCIMeta.ConfigPath, "test-meta.toml")
}
// read the configuration file
ticiMetaConfig, err := instance.UnmarshalConfig(ticiMetaConfigPath)
if err != nil {
return err
}
bucket := ticiMetaConfig["s3"].(map[string]any)["bucket"].(string)
prefix := ticiMetaConfig["s3"].(map[string]any)["prefix"].(string)
if err := p.createChangefeed(bucket, prefix); err != nil {
fmt.Println(color.RedString("Failed to create changefeed: %s", err))
} else {
fmt.Println("Changefeed created successfully.")
Expand Down Expand Up @@ -1902,7 +1911,7 @@ func parseMysqlVersion(versionOutput string) (vMaj int, vMin int, vPatch int, er
}

// createChangefeed creates a changefeed using tiup cdc cli
func (p *Playground) createChangefeed() error {
func (p *Playground) createChangefeed(bucket, prefix string) error {
if len(p.ticdcs) == 0 {
return fmt.Errorf("no TiCDC instances available")
}
Expand All @@ -1911,7 +1920,7 @@ func (p *Playground) createChangefeed() error {
cdcAddr := fmt.Sprintf("http://%s", p.ticdcs[0].Addr())

// Prepare changefeed creation command
sinkURI := "s3://logbucket/storage_test?protocol=canal-json&access-key=minioadmin&secret-access-key=minioadmin&endpoint=http://127.0.0.1:9000&enable-tidb-extension=true&output-row-key=true"
sinkURI := fmt.Sprintf("s3://%s/%s/cdc?protocol=canal-json&access-key=minioadmin&secret-access-key=minioadmin&endpoint=http://127.0.0.1:9000&enable-tidb-extension=true&output-row-key=true", bucket, prefix)

cmd := exec.Command("tiup", "cdc", "cli", "changefeed", "create",
fmt.Sprintf("--server=%s", cdcAddr),
Expand Down
2 changes: 1 addition & 1 deletion pkg/tui/tui.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func PrintTable(rows [][]string, header bool) {
Header: text.FormatDefault,
},
Options: table.Options{
SeparateColumns: true,
SeparateColumns: true,
},
})
t.Render()
Expand Down