Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions cmd/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"os"
"os/signal"
"syscall"

"github.com/Servflow/servflow/config"
"github.com/Servflow/servflow/internal/storage"
Expand All @@ -25,8 +26,8 @@ type ValidationError struct {
Error error
}

func RunServer(cfg *config.Config) error {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
func RunServer(cfg *config.Config, watch bool) error {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGHUP)
defer stop()

storageClient, err := storage.GetClient()
Expand All @@ -39,10 +40,30 @@ func RunServer(cfg *config.Config) error {
return err
}

if err := eng.Start(); err != nil {
if err := eng.Start(watch); err != nil {
return err
}

// Handle SIGHUP for manual reload
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP)

go func() {
for {
select {
case <-sigChan:
log.Println("SIGHUP received, reloading configs...")
if err := eng.Reload(); err != nil {
log.Printf("Reload failed: %v", err)
} else {
log.Println("Reload successful")
}
case <-ctx.Done():
return
}
}
}()

select {
case <-eng.DoneChan():
return nil
Expand Down Expand Up @@ -127,6 +148,13 @@ func CreateApp() *cli.App {
Usage: "Path to integrations configuration folder",
Required: false,
},
&cli.BoolFlag{
Name: "watch",
Aliases: []string{"w"},
Usage: "Enable hot reload - watch for config file changes",
Value: false,
Required: false,
},
},
Action: func(c *cli.Context) error {
if err := godotenv.Load(); err != nil {
Expand All @@ -153,10 +181,15 @@ func CreateApp() *cli.App {
return cli.Exit("Config folder for APIs must be specified either via environment variable SERVFLOW_CONFIGFOLDERS_APIS or as the first argument to 'run' command", 1)
}

watchEnabled := c.Bool("watch")

log.Printf("Starting ServFlow with config folders - APIs: %s, Integrations: %s",
cfg.ConfigFolder, cfg.IntegrationsFile)
if watchEnabled {
log.Printf("Hot reload enabled - watching for config changes")
}

return RunServer(&cfg)
return RunServer(&cfg, watchEnabled)
},
},
{
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/ebitengine/purego v0.8.2 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func (a *Session) startLoop(ctx context.Context) chan agentOutput {

// process content output
for _, c := range r.Content {
logger.Debug("got response", zap.Any("response", c.Text))
a.addToMessages(logger, ContentMessage{
Message: Message{Type: MessageTypeText},
Role: RoleTypeAssistant,
Expand Down
4 changes: 2 additions & 2 deletions pkg/definitions/apiconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ func (o *ResponseObject) ToProto() *proto.ResponseObject {
return &resp
}

type DatasourceConfig struct {
type IntegrationConfig struct {
ID string `json:"id" yaml:"id"`
Config json.RawMessage `json:"config" yaml:"-"`
NewConfig map[string]interface{} `yaml:"config"`
Type string `json:"type" yaml:"type"`
}

func (d *DatasourceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
func (d *IntegrationConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
var tmp struct {
Type string `yaml:"type"`
NewConfig map[string]interface{} `yaml:"config"`
Expand Down
237 changes: 237 additions & 0 deletions pkg/engine/configmanager/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
package configmanager

import (
"fmt"
"net/http"
"os"
"path/filepath"
"sync"

"github.com/Servflow/servflow/internal/logging"
"github.com/Servflow/servflow/pkg/definitions"
"github.com/Servflow/servflow/pkg/engine/yamlloader"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
)

// HandlerCreator is an interface for creating HTTP handlers from API configs
type HandlerCreator interface {
CreateHandler(*apiconfig.APIConfig) (http.Handler, error)
}

// ConfigManager manages API configurations and their handlers with thread-safe updates
type ConfigManager struct {
sync.RWMutex
handlers map[string]http.Handler // configID → handler
fileToConfig map[string]string // filepath → configID
configs map[string]*apiconfig.APIConfig
yamlLoader *yamlloader.YAMLLoader
creator HandlerCreator
}

// New creates a new ConfigManager
func New(yamlLoader *yamlloader.YAMLLoader, creator HandlerCreator) *ConfigManager {
return &ConfigManager{
handlers: make(map[string]http.Handler),
fileToConfig: make(map[string]string),
configs: make(map[string]*apiconfig.APIConfig),
yamlLoader: yamlLoader,
creator: creator,
}
}

// LoadAllConfigs loads all configs from a folder
func (cm *ConfigManager) LoadAllConfigs(configFolder string) ([]*apiconfig.APIConfig, error) {
logger := logging.GetLogger()
logger.Info("Loading all configs from folder", zap.String("folder", configFolder))

configs, err := cm.yamlLoader.FetchAPIConfigs(false)
if err != nil {
return nil, fmt.Errorf("failed to fetch API configs: %w", err)
}

cm.Lock()
defer cm.Unlock()

for _, config := range configs {
handler, err := cm.creator.CreateHandler(config)
if err != nil {
logger.Error("Failed to create handler for config",
zap.String("configID", config.ID),
zap.Error(err))
continue
}

cm.handlers[config.ID] = handler
cm.configs[config.ID] = config
logger.Info("Loaded config", zap.String("configID", config.ID))
}

return configs, nil
}

// LoadConfig loads a single config file
func (cm *ConfigManager) LoadConfig(filePath string) error {
logger := logging.GetLogger()
logger.Info("Loading config from file", zap.String("file", filePath))

config, err := cm.loadAndValidateConfig(filePath)
if err != nil {
logger.Error("Failed to load config", zap.String("file", filePath), zap.Error(err))
return err
}

handler, err := cm.creator.CreateHandler(config)
if err != nil {
logger.Error("Failed to create handler", zap.String("configID", config.ID), zap.Error(err))
return err
}

cm.Lock()
defer cm.Unlock()

cm.handlers[config.ID] = handler
cm.configs[config.ID] = config
cm.fileToConfig[filePath] = config.ID

logger.Info("Config loaded successfully", zap.String("configID", config.ID), zap.String("file", filePath))
return nil
}

// ReloadConfig reloads a specific config file
func (cm *ConfigManager) ReloadConfig(filePath string) error {
logger := logging.GetLogger()
logger.Info("Reloading config from file", zap.String("file", filePath))

config, err := cm.loadAndValidateConfig(filePath)
if err != nil {
logger.Error("Failed to reload config, keeping previous version",
zap.String("file", filePath),
zap.Error(err))
return err
}

handler, err := cm.creator.CreateHandler(config)
if err != nil {
logger.Error("Failed to create handler, keeping previous version",
zap.String("configID", config.ID),
zap.Error(err))
return err
}

cm.Lock()
defer cm.Unlock()

cm.handlers[config.ID] = handler
cm.configs[config.ID] = config
cm.fileToConfig[filePath] = config.ID

logger.Info("Config reloaded successfully", zap.String("configID", config.ID), zap.String("file", filePath))
return nil
}

// RemoveConfig removes a config
func (cm *ConfigManager) RemoveConfig(filePath string) error {
logger := logging.GetLogger()

cm.Lock()
defer cm.Unlock()

configID, ok := cm.fileToConfig[filePath]
if !ok {
logger.Warn("Config file not tracked", zap.String("file", filePath))
return nil
}

delete(cm.handlers, configID)
delete(cm.configs, configID)
delete(cm.fileToConfig, filePath)

logger.Info("Config removed", zap.String("configID", configID), zap.String("file", filePath))
return nil
}

// GetHandler retrieves a handler by config ID (thread-safe)
func (cm *ConfigManager) GetHandler(configID string) http.Handler {
cm.RLock()
defer cm.RUnlock()
return cm.handlers[configID]
}

// GetConfig retrieves a config by ID (thread-safe)
func (cm *ConfigManager) GetConfig(configID string) *apiconfig.APIConfig {
cm.RLock()
defer cm.RUnlock()
return cm.configs[configID]
}

// ReloadAllConfigs reloads all currently loaded configs
func (cm *ConfigManager) ReloadAllConfigs() error {
logger := logging.GetLogger()
logger.Info("Reloading all configs")

cm.RLock()
files := make([]string, 0, len(cm.fileToConfig))
for file := range cm.fileToConfig {
files = append(files, file)
}
cm.RUnlock()

var errors []error
for _, file := range files {
if err := cm.ReloadConfig(file); err != nil {
errors = append(errors, err)
}
}

if len(errors) > 0 {
logger.Error("Some configs failed to reload", zap.Int("count", len(errors)))
return fmt.Errorf("%d config(s) failed to reload", len(errors))
}

logger.Info("All configs reloaded successfully")
return nil
}

// RegisterFile registers a file path to config ID mapping
func (cm *ConfigManager) RegisterFile(filePath string, configID string) {
cm.Lock()
defer cm.Unlock()
cm.fileToConfig[filePath] = configID
}

// loadAndValidateConfig loads and validates a single config file
func (cm *ConfigManager) loadAndValidateConfig(filePath string) (*apiconfig.APIConfig, error) {
absPath, err := filepath.Abs(filePath)
if err != nil {
return nil, fmt.Errorf("failed to get absolute path: %w", err)
}

data, err := os.ReadFile(absPath)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}

var config apiconfig.APIConfig
if err := yaml.Unmarshal(data, &config); err != nil {
return nil, fmt.Errorf("failed to parse YAML: %w", err)
}

if err := config.Validate(); err != nil {
return nil, fmt.Errorf("config validation failed: %w", err)
}

return &config, nil
}

// RegisterHandlerForTest registers a handler for testing purposes
// This is a test helper method that should only be used in tests
func (cm *ConfigManager) RegisterHandlerForTest(configID string, handler http.Handler, config *apiconfig.APIConfig) {
cm.Lock()
defer cm.Unlock()

cm.handlers[configID] = handler
if config != nil {
cm.configs[configID] = config
}
}
Loading