Skip to content

Commit

Permalink
devstack: support node joining existing networks and config passing (#…
Browse files Browse the repository at this point in the history
…4827)

This PR refactors devstack to support two key features:

1. Allow compute nodes to join an existing orchestrator:
- Added --computes flag (alias for --compute-nodes) to specify number of
compute nodes
- Added --orchestrators flag (alias for --requester-nodes) to specify
number of orchestrator nodes
- Added --hybrids flag (alias for --hybrid-nodes) to specify hybrid
nodes
- When no orchestrator nodes are specified and orchestrator address is
provided via -c flag,
devstack will run compute-only nodes that connect to the external
orchestrator

2. Use test configuration as base:
   - Devstack now uses NewTestConfig() as base configuration
- All configuration can be overridden using -c flags (same as bacalhau
serve)
   - Node-specific settings are layered on top of base configuration
   - Maintains backward compatibility with existing devstack flags

This allows for:
```bash
# Run orchestrator node
bacalhau devstack --orchestrators 1

# Run compute nodes connecting to existing orchestrator
bacalhau devstack --computes 3 -c Compute.Orchestrators=127.0.0.1:4222

# Run both with custom config
bacalhau devstack --computes 3 --orchestrators 1 -c Compute.AllowListedLocalPaths=/tmp

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->
## Summary by CodeRabbit

- **New Features**
	- Updated CLI command-line flags for devstack configuration with more intuitive naming.
	- Enhanced configuration setup process with more flexible option handling.
	- Introduced a new package for organizing related functionalities.

- **Refactor**
	- Simplified devstack node configuration terminology.
	- Improved configuration management in devstack and utility functions.
	- Streamlined node setup logic in devstack configuration.

- **Chores**
	- Updated method signatures to support more dynamic configuration options.
	- Maintained backward compatibility with existing flags.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
wdbaruni authored Jan 26, 2025
1 parent ab21916 commit e5db120
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 75 deletions.
115 changes: 77 additions & 38 deletions cmd/cli/devstack/devstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"path/filepath"
"strconv"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"

"github.com/bacalhau-project/bacalhau/cmd/util/flags/configflags"
"github.com/bacalhau-project/bacalhau/pkg/config"
"github.com/bacalhau-project/bacalhau/pkg/config/types"
"github.com/bacalhau-project/bacalhau/pkg/config_legacy"
"github.com/bacalhau-project/bacalhau/pkg/logger"
"github.com/bacalhau-project/bacalhau/webui"
Expand Down Expand Up @@ -47,22 +47,24 @@ var (
)

type options struct {
NumberOfHybridNodes int // Number of nodes to start in the cluster
NumberOfRequesterOnlyNodes int // Number of nodes to start in the cluster
NumberOfComputeOnlyNodes int // Number of nodes to start in the cluster
NumberOfBadComputeActors int // Number of compute nodes to be bad actors
CPUProfilingFile string
MemoryProfilingFile string
BasePath string
WebUIListen string
// Node counts
ComputeNodes int // Number of compute nodes to run
OrchestratorNodes int // Number of orchestrator nodes to run
HybridNodes int // Number of hybrid nodes to run
BadComputeNodes int // Number of bad compute nodes

// Other options
CPUProfilingFile string
MemoryProfilingFile string
BasePath string
}

func (o *options) devstackOptions() []devstack.ConfigOption {
opts := []devstack.ConfigOption{
devstack.WithNumberOfHybridNodes(o.NumberOfHybridNodes),
devstack.WithNumberOfRequesterOnlyNodes(o.NumberOfRequesterOnlyNodes),
devstack.WithNumberOfComputeOnlyNodes(o.NumberOfComputeOnlyNodes),
devstack.WithNumberOfBadComputeActors(o.NumberOfBadComputeActors),
devstack.WithNumberOfHybridNodes(o.HybridNodes),
devstack.WithNumberOfRequesterOnlyNodes(o.OrchestratorNodes),
devstack.WithNumberOfComputeOnlyNodes(o.ComputeNodes),
devstack.WithNumberOfBadComputeActors(o.BadComputeNodes),
devstack.WithCPUProfilingFile(o.CPUProfilingFile),
devstack.WithMemoryProfilingFile(o.MemoryProfilingFile),
devstack.WithBasePath(o.BasePath),
Expand All @@ -72,13 +74,11 @@ func (o *options) devstackOptions() []devstack.ConfigOption {

func newOptions() *options {
return &options{
NumberOfRequesterOnlyNodes: 1,
NumberOfComputeOnlyNodes: 3,
NumberOfBadComputeActors: 0,
CPUProfilingFile: "",
MemoryProfilingFile: "",
BasePath: "",
WebUIListen: config.Default.WebUI.Listen,
OrchestratorNodes: 1,
ComputeNodes: 3,
CPUProfilingFile: "",
MemoryProfilingFile: "",
BasePath: "",
}
}

Expand All @@ -102,12 +102,23 @@ func NewCmd() *cobra.Command {
return configflags.BindFlags(viper.GetViper(), devstackFlags)
},
RunE: func(cmd *cobra.Command, _ []string) error {
// TODO: a hack to force debug logging for devstack
// until I figure out why flags and env vars are not working
logger.ConfigureLogging(logger.LogModeDefault, zerolog.DebugLevel)
defaultConfig, err := config.NewTestConfig()
if err != nil {
return fmt.Errorf("failed to create default config: %w", err)
}
cfg, err := util.SetupConfig(cmd, config.WithDefault(defaultConfig))
if err != nil {
return fmt.Errorf("failed to setup config: %w", err)
}

if err = logger.ParseAndConfigureLogging(cfg.Logging.Mode, cfg.Logging.Level); err != nil {
return fmt.Errorf("failed to configure logging: %w", err)
}

// TODO this should be a part of the config.
telemetry.SetupFromEnvs()
return runDevstack(cmd, ODs)

return runDevstack(cmd, ODs, cfg)
},
}

Expand All @@ -116,25 +127,48 @@ func NewCmd() *cobra.Command {
}

devstackCmd.PersistentFlags().IntVar(
&ODs.NumberOfHybridNodes, "hybrid-nodes", ODs.NumberOfHybridNodes,
`How many hybrid (requester and compute) nodes should be started in the cluster`,
&ODs.ComputeNodes, "computes", ODs.ComputeNodes,
`Number of compute-only nodes to run`,
)
devstackCmd.PersistentFlags().IntVar(
&ODs.NumberOfRequesterOnlyNodes, "requester-nodes", ODs.NumberOfRequesterOnlyNodes,
`How many requester only nodes should be started in the cluster`,
&ODs.OrchestratorNodes, "orchestrators", ODs.OrchestratorNodes,
`Number of orchestrator-only nodes to run`,
)
devstackCmd.PersistentFlags().IntVar(
&ODs.NumberOfComputeOnlyNodes, "compute-nodes", ODs.NumberOfComputeOnlyNodes,
`How many compute only nodes should be started in the cluster`,
&ODs.HybridNodes, "hybrids", ODs.HybridNodes,
`Number of hybrid nodes (both compute and orchestrator) to run`,
)
devstackCmd.PersistentFlags().IntVar(
&ODs.NumberOfBadComputeActors, "bad-compute-actors", ODs.NumberOfBadComputeActors,
`How many compute nodes should be bad actors`,
&ODs.BadComputeNodes, "bad-computes", ODs.BadComputeNodes,
`Number of compute nodes that should be bad actors`,
)
devstackCmd.PersistentFlags().StringVar(
&ODs.WebUIListen, "webui-address", ODs.WebUIListen,
`Listen address for the web UI server`,

// Old style flags - hidden from help but still functional
oldFlags := devstackCmd.PersistentFlags()
oldFlags.IntVar(
&ODs.ComputeNodes, "compute-nodes", ODs.ComputeNodes,
`Number of compute-only nodes to run`,
)
_ = oldFlags.MarkHidden("compute-nodes")

oldFlags.IntVar(
&ODs.OrchestratorNodes, "requester-nodes", ODs.OrchestratorNodes,
`Number of orchestrator-only nodes to run`,
)
_ = oldFlags.MarkHidden("requester-nodes")

oldFlags.IntVar(
&ODs.HybridNodes, "hybrid-nodes", ODs.HybridNodes,
`Number of hybrid nodes (both compute and orchestrator) to run`,
)
_ = oldFlags.MarkHidden("hybrid-nodes")

oldFlags.IntVar(
&ODs.BadComputeNodes, "bad-compute-actors", ODs.BadComputeNodes,
`Number of compute nodes that should be bad actors`,
)
_ = oldFlags.MarkHidden("bad-compute-actors")

devstackCmd.PersistentFlags().StringVar(
&ODs.CPUProfilingFile, "cpu-profiling-file", ODs.CPUProfilingFile,
"File to save CPU profiling to",
Expand All @@ -147,16 +181,21 @@ func NewCmd() *cobra.Command {
&ODs.BasePath, "stack-repo", ODs.BasePath,
"Folder to act as the devstack configuration repo",
)

return devstackCmd
}

//nolint:gocyclo,funlen
func runDevstack(cmd *cobra.Command, ODs *options) error {
func runDevstack(cmd *cobra.Command, ODs *options, cfg types.Bacalhau) error {
ctx := cmd.Context()

cm := util.GetCleanupManager(ctx)
cm.RegisterCallback(telemetry.Cleanup)

// Create devstack options and merge with base config
opts := ODs.devstackOptions()
opts = append(opts, devstack.WithBacalhauConfigOverride(cfg))

config_legacy.DevstackSetShouldPrintInfo()

portFileName := filepath.Join(os.TempDir(), "bacalhau-devstack.port")
Expand Down Expand Up @@ -184,7 +223,7 @@ func runDevstack(cmd *cobra.Command, ODs *options) error {
defer os.RemoveAll(baseRepoPath)
}

stack, err := devstack.Setup(ctx, cm, ODs.devstackOptions()...)
stack, err := devstack.Setup(ctx, cm, opts...)
if err != nil {
return err
}
Expand All @@ -195,7 +234,7 @@ func runDevstack(cmd *cobra.Command, ODs *options) error {
if n.IsRequesterNode() {
webuiConfig := webui.Config{
APIEndpoint: n.APIServer.GetURI().String(),
Listen: ODs.WebUIListen,
Listen: cfg.WebUI.Listen,
}
webuiServer, err := webui.NewServer(webuiConfig)
if err != nil {
Expand Down
11 changes: 5 additions & 6 deletions cmd/util/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ func SetupRepo(cfg types.Bacalhau) (*repo.FsRepo, error) {
return r, nil
}

func SetupConfigType(cmd *cobra.Command) (*config.Config, error) {
var opts []config.Option
func SetupConfigType(cmd *cobra.Command, opts ...config.Option) (*config.Config, error) {
v := viper.GetViper()
// check if the user specified config files via the --config flag
configFiles := getConfigFiles(v)
Expand Down Expand Up @@ -85,16 +84,16 @@ func SetupConfigType(cmd *cobra.Command) (*config.Config, error) {
return cfg, nil
}

func SetupConfig(cmd *cobra.Command) (types.Bacalhau, error) {
cfg, err := SetupConfigType(cmd)
func SetupConfig(cmd *cobra.Command, opts ...config.Option) (types.Bacalhau, error) {
cfg, err := SetupConfigType(cmd, opts...)
if err != nil {
return types.Bacalhau{}, err
}
return UnmarshalBacalhauConfig(cfg)
}

func SetupConfigs(cmd *cobra.Command) (types.Bacalhau, *config.Config, error) {
cfg, err := SetupConfigType(cmd)
func SetupConfigs(cmd *cobra.Command, opts ...config.Option) (types.Bacalhau, *config.Config, error) {
cfg, err := SetupConfigType(cmd, opts...)
if err != nil {
return types.Bacalhau{}, nil, err
}
Expand Down
57 changes: 26 additions & 31 deletions pkg/devstack/devstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,16 @@ func Setup(

log.Ctx(ctx).Info().Object("Config", stackConfig).Msg("Starting Devstack")

cfg := stackConfig.BacalhauConfig

var nodes []*node.Node
orchestratorAddrs := make([]string, 0)
clusterPeersAddrs := make([]string, 0)

totalNodeCount := stackConfig.NumberOfHybridNodes + stackConfig.NumberOfRequesterOnlyNodes + stackConfig.NumberOfComputeOnlyNodes
requesterNodeCount := stackConfig.NumberOfHybridNodes + stackConfig.NumberOfRequesterOnlyNodes
computeNodeCount := stackConfig.NumberOfHybridNodes + stackConfig.NumberOfComputeOnlyNodes

if requesterNodeCount == 0 {
return nil, fmt.Errorf("at least one requester node is required")
cfg := stackConfig.BacalhauConfig

// if running with local orchestrator, we clear the orchestrator list from the config
if requesterNodeCount > 0 {
cfg.Compute.Orchestrators = []string{}
}

for i := 0; i < totalNodeCount; i++ {
Expand All @@ -75,41 +73,38 @@ func Setup(
isComputeNode := (totalNodeCount - i) <= computeNodeCount
log.Ctx(ctx).Debug().Msgf(`Creating Node #%d as {RequesterNode: %t, ComputeNode: %t}`, i+1, isRequesterNode, isComputeNode)

// ////////////////////////////////////
// Transport layer
// ////////////////////////////////////
if os.Getenv("PREDICTABLE_API_PORT") != "" {
cfg.Orchestrator.Port = 4222 + i
} else {
if cfg.Orchestrator.Port, err = network.GetFreePort(); err != nil {
return nil, errors.Wrap(err, "failed to get free port for nats server")
if isRequesterNode {
if os.Getenv("PREDICTABLE_API_PORT") != "" {
cfg.Orchestrator.Port = 4222 + i
} else {
if cfg.Orchestrator.Port, err = network.GetFreePort(); err != nil {
return nil, errors.Wrap(err, "failed to get free port for nats server")
}
}
}

if os.Getenv("PREDICTABLE_API_PORT") != "" {
cfg.Orchestrator.Cluster.Port = 6222 + i
} else {
if cfg.Orchestrator.Cluster.Port, err = network.GetFreePort(); err != nil {
return nil, errors.Wrap(err, "failed to get free port for nats cluster")
if os.Getenv("PREDICTABLE_API_PORT") != "" {
cfg.Orchestrator.Cluster.Port = 6222 + i
} else {
if cfg.Orchestrator.Cluster.Port, err = network.GetFreePort(); err != nil {
return nil, errors.Wrap(err, "failed to get free port for nats cluster")
}
}
}

// always override the default orchestrator address
// for the first orchestrator, this will be empty as it should be
// for the rest, it will be the address of the previous orchestrators
cfg.Compute.Orchestrators = orchestratorAddrs

if isRequesterNode {
cfg.Orchestrator.Cluster.Peers = clusterPeersAddrs
cfg.Orchestrator.Cluster.Name = "devstack"
orchestratorAddrs = append(orchestratorAddrs, fmt.Sprintf("127.0.0.1:%d", cfg.Orchestrator.Port))
if cfg.Orchestrator.Cluster.Name == "" {
cfg.Orchestrator.Cluster.Name = "devstack"
}
cfg.Compute.Orchestrators = append(cfg.Compute.Orchestrators, fmt.Sprintf("127.0.0.1:%d", cfg.Orchestrator.Port))
}

// ////////////////////////////////////
// port for API
// ////////////////////////////////////
if os.Getenv("PREDICTABLE_API_PORT") != "" {
cfg.API.Port = 1234 + i
// add one more if using an external orchestrator to avoid port conflict
if requesterNodeCount == 0 {
cfg.API.Port += 1
}
} else {
if cfg.API.Port, err = network.GetFreePort(); err != nil {
return nil, errors.Wrap(err, "failed to get free port for API server")
Expand Down
1 change: 1 addition & 0 deletions pkg/lib/ncl/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package ncl

0 comments on commit e5db120

Please sign in to comment.