Skip to content

Allow for hot-reloading yaml config pipelines #2236

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions pkg/provisioning/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,212 @@ func NewService(
}
}

// parseYamlInput parses a YAML input (either a file path or a YAML string) and returns all pipeline configurations.
func (s *Service) parseYamlInput(ctx context.Context, yamlInput string) ([]config.Pipeline, string, error) {
// First, determine if the input is a file path or a YAML string
isFilePath := false
if !strings.Contains(yamlInput, "\n") && len(yamlInput) < 1024 {
if _, err := os.Stat(yamlInput); err == nil {
isFilePath = true
}
}

var source string
var pipelineConfigs []config.Pipeline

// Parse the YAML input
if isFilePath {
s.logger.Debug(ctx).
Str("file_path", yamlInput).
Msg("parsing pipeline configurations from file")

// Parse the file and get all pipeline configurations
configs, err := s.parsePipelineConfigFile(ctx, yamlInput)
if err != nil {
return nil, "", cerrors.Errorf("failed to parse file %q: %w", yamlInput, err)
}

if len(configs) == 0 {
return nil, "", cerrors.New("no pipelines found in the YAML file")
}

// Use all pipelines from the file
pipelineConfigs = configs
source = yamlInput
} else {
s.logger.Debug(ctx).
Msg("parsing pipeline configurations from YAML string")

reader := strings.NewReader(yamlInput)
configs, err := s.parser.Parse(ctx, reader)
if err != nil {
return nil, "", cerrors.Errorf("failed to parse YAML string: %w", err)
}

if len(configs) == 0 {
return nil, "", cerrors.New("no pipelines found in the YAML string")
}

// Use all pipelines from the string
pipelineConfigs = configs
source = "<yaml-string>"
}

// Log the number of pipelines found
s.logger.Debug(ctx).
Int("pipeline_count", len(pipelineConfigs)).
Msg("found pipelines in YAML")

return pipelineConfigs, source, nil
}

// UpsertYamlResult represents the result of processing a YAML input with multiple pipelines
type UpsertYamlResult struct {
PipelineIDs []string `json:"pipelineIDs"` // IDs of all successfully processed pipelines
}

// UpsertYaml parses a YAML input (either a file path or a YAML string) and creates or updates pipelines.
// If a pipeline doesn't exist, it will be created. If it does exist, it will be updated.
// This can be used to reload pipeline configurations without restarting Conduit.
// The function processes all pipeline definitions found in the YAML input.
// Returns a list of all successfully processed pipeline IDs, or an error if parsing failed.
func (s *Service) UpsertYaml(ctx context.Context, yamlInput string) (*UpsertYamlResult, error) {
// Parse the YAML input
pipelineConfigs, source, err := s.parseYamlInput(ctx, yamlInput)
if err != nil {
return nil, err
}

// Initialize the result
result := &UpsertYamlResult{
PipelineIDs: make([]string, 0, len(pipelineConfigs)),
}

// Process all pipelines
var errs []error

s.logger.Info(ctx).
Int("pipeline_count", len(pipelineConfigs)).
Msg("processing pipelines from YAML")

for i, pipelineConfig := range pipelineConfigs {
// Process the pipeline
pipelineID, err := s.reloadPipeline(ctx, pipelineConfig, source)
if err != nil {
errs = append(errs, cerrors.Errorf("failed to process pipeline %d (%s): %w", i, pipelineConfig.ID, err))
} else {
// Add successfully processed pipeline ID to the result
result.PipelineIDs = append(result.PipelineIDs, pipelineID)
}
}

// If there were any errors, return them along with the partial result
if len(errs) > 0 {
return result, cerrors.Join(errs...)
}

return result, nil
}

// reloadPipeline is a helper function that handles the common logic for reloading a pipeline
// from either a file or a YAML string. It will create a new pipeline if it doesn't exist,
// or update an existing one if it does.
func (s *Service) reloadPipeline(ctx context.Context, pipelineConfig config.Pipeline, source string) (string, error) {
// Extract the pipeline ID from the config
pipelineID := pipelineConfig.ID

// Check if pipeline already exists
pipelineInstance, err := s.pipelineService.Get(ctx, pipelineID)
if err != nil {
if cerrors.Is(err, pipeline.ErrInstanceNotFound) {
// Pipeline doesn't exist, create it
return s.upsertPipeline(ctx, nil, pipelineConfig, source)
} else {
return "", cerrors.Errorf("error getting pipeline instance with ID %q: %w", pipelineID, err)
}
} else {
// Pipeline exists, update it
return s.upsertPipeline(ctx, pipelineInstance, pipelineConfig, source)
}
}

// upsertPipeline creates a new pipeline or updates an existing one with the given configuration.
// If pipelineInstance is nil, a new pipeline will be created. Otherwise, the existing pipeline will be updated.
func (s *Service) upsertPipeline(ctx context.Context, pipelineInstance *pipeline.Instance, pipelineConfig config.Pipeline, source string) (string, error) {
pipelineID := pipelineConfig.ID

// Check if we're creating a new pipeline or updating an existing one
isCreate := pipelineInstance == nil

if isCreate {
// Creating a new pipeline
s.logger.Info(ctx).
Str("pipeline_id", pipelineID).
Msg("pipeline doesn't exist, creating new pipeline")
} else {
// Updating an existing pipeline
// Check if the pipeline was provisioned by config
if pipelineInstance.ProvisionedBy != pipeline.ProvisionTypeConfig {
return "", cerrors.Errorf("pipeline with ID %q was not provisioned by config: %w", pipelineID, ErrNotProvisionedByConfig)
}

// Check if pipeline is running and stop it if needed
pipelineWasRunning := pipelineInstance.GetStatus() == pipeline.StatusRunning
if pipelineWasRunning {
s.logger.Debug(ctx).
Str("pipeline_id", pipelineID).
Msg("stopping pipeline before updating configuration")

err := s.lifecycleService.Stop(ctx, pipelineID, false)
if err != nil {
// Ignore the error if the pipeline is not running
if !strings.Contains(err.Error(), "pipeline not running") {
return "", cerrors.Errorf("could not stop pipeline %q before updating: %w", pipelineID, err)
}
s.logger.Debug(ctx).
Str("pipeline_id", pipelineID).
Msg("pipeline was not running, continuing with update")
}
}

// Delete the existing pipeline
s.logger.Debug(ctx).
Str("pipeline_id", pipelineID).
Msg("deleting existing pipeline before recreating")

// Use s.Delete to properly clean up the pipeline
err := s.Delete(ctx, pipelineID)
if err != nil {
return "", cerrors.Errorf("could not delete existing pipeline %q: %w", pipelineID, err)
}
}

// Provision the pipeline with the configuration
s.logger.Debug(ctx).
Str("pipeline_id", pipelineID).
Msg("provisioning pipeline")

err := s.provisionPipeline(ctx, pipelineConfig)
if err != nil {
return "", cerrors.Errorf("pipeline %q, error while provisioning: %w", pipelineID, err)
}

// Log success message based on whether we created or updated the pipeline
if isCreate {
s.logger.Info(ctx).
Str("pipeline_id", pipelineID).
Str("source", source).
Msg("pipeline created successfully")
} else {
s.logger.Info(ctx).
Str("pipeline_id", pipelineID).
Str("source", source).
Msg("pipeline configuration updated successfully")
}

return pipelineID, nil
}

// Init provision pipelines defined in pipelinePath directory. should initialize pipeline service
// before calling this function, and all pipelines should be stopped.
func (s *Service) Init(ctx context.Context) error {
Expand Down
Loading