From 25902b8fad496bae74dcbdacba27b1bf79791bb1 Mon Sep 17 00:00:00 2001 From: Sergei Egorov Date: Tue, 24 May 2016 13:54:39 +0300 Subject: [PATCH] Composite source (#33) --- main.go | 82 +++++++++++++------------ sources/composite.go | 47 ++++++++++++++ config.go => sources/config.go | 32 ++++------ spec/integration/sources/composite.bats | 63 +++++++++++++++++++ spec/integration/sources/http.bats | 4 +- spec/integration/sources/unknown.bats | 4 +- 6 files changed, 169 insertions(+), 63 deletions(-) create mode 100644 sources/composite.go rename config.go => sources/config.go (58%) create mode 100644 spec/integration/sources/composite.bats diff --git a/main.go b/main.go index d8522bf..f690cb0 100644 --- a/main.go +++ b/main.go @@ -1,11 +1,12 @@ package main import ( + "encoding/json" "fmt" . "github.com/ahmetalpbalkan/go-linq" "github.com/op/go-logging" "github.com/zeroturnaround/configo/exec" - "github.com/zeroturnaround/configo/flatmap" + "github.com/zeroturnaround/configo/sources" "os" "strconv" "strings" @@ -18,11 +19,9 @@ type env struct { value string } -type sourceContext struct { - priority int - value string - loader Source - partialConfig map[string]interface{} +type sourceWithPriority struct { + priority int + value string } var log = logging.MustGetLogger("configo") @@ -81,54 +80,59 @@ func main() { } func resolveAll(environ []string) error { - count, err := fromEnviron(environ). + rawTSources, err := fromEnviron(environ). Where(func(kv T) (bool, error) { return strings.HasPrefix(kv.(env).key, envVariablePrefix), nil }). Select(func(kv T) (T, error) { priority, err := strconv.Atoi(strings.TrimLeft(kv.(env).key, envVariablePrefix)) if err != nil { return nil, err } - return sourceContext{priority, kv.(env).value, nil, nil}, nil + return sourceWithPriority{priority, kv.(env).value}, nil }). - OrderBy(func(a T, b T) bool { return a.(sourceContext).priority <= b.(sourceContext).priority }). - Select(func(context T) (T, error) { - loader, err := GetSource(context.(sourceContext).value) - - if err != nil { - return nil, fmt.Errorf("Failed to parse source #%d: %s", context.(sourceContext).priority, err) - } - return sourceContext{context.(sourceContext).priority, context.(sourceContext).value, loader, nil}, nil + OrderBy(func(a T, b T) bool { return a.(sourceWithPriority).priority <= b.(sourceWithPriority).priority }). + Select(func(it T) (T, error) { + sourceBytes := []byte(it.(sourceWithPriority).value) + rawSource := make(map[string]interface{}) + err := json.Unmarshal(sourceBytes, &rawSource) + return rawSource, err }). - // Resolve in parallel because some sources might use IO and will take some time - AsParallel().AsOrdered(). - Select(func(context T) (T, error) { - result, err := context.(sourceContext).loader.Get() + Results() - if err != nil { - return nil, fmt.Errorf("Failed to resolve source #%d: %s", context.(sourceContext).priority, err) - } + if err != nil { + return err + } - return sourceContext{context.(sourceContext).priority, context.(sourceContext).value, context.(sourceContext).loader, result}, nil - }). - AsSequential(). - CountBy(func(context T) (bool, error) { - for key, value := range flatmap.Flatten(context.(sourceContext).partialConfig) { - log.Infof("Source #%d: Setting %s to %v", context.(sourceContext).priority, key, value) - os.Setenv(key, fmt.Sprintf("%v", value)) - } - return true, nil - }) + rawSources := make([]map[string]interface{}, len(rawTSources)) + + for k, v := range rawTSources { + rawSources[k] = v.(map[string]interface{}) + } + + if len(rawSources) == 0 { + log.Warning("No sources provided") + return nil + } + + loader := sources.CompositeSource{rawSources} + + resultEnv, err := loader.Get() if err != nil { return err } - if count == 0 { - log.Warning("No sources provided") - } else { - if log.IsEnabledFor(logging.DEBUG) { - log.Debugf("Environment variables after resolve:\n\t%s", strings.Join(os.Environ(), "\n\t")) - } + if len(resultEnv) == 0 { + log.Info("No new env variables were added.") + return nil + } + + for key, value := range resultEnv { + log.Infof("Setting %s to %v", key, value) + os.Setenv(key, fmt.Sprintf("%v", value)) + } + + if log.IsEnabledFor(logging.DEBUG) { + log.Debugf("Environment variables after resolve:\n\t%s", strings.Join(os.Environ(), "\n\t")) } return nil diff --git a/sources/composite.go b/sources/composite.go new file mode 100644 index 0000000..51de285 --- /dev/null +++ b/sources/composite.go @@ -0,0 +1,47 @@ +package sources + +import ( + "fmt" + . "github.com/ahmetalpbalkan/go-linq" + "github.com/zeroturnaround/configo/flatmap" +) + +type CompositeSource struct { + Sources []map[string]interface{} `json:"sources"` +} + +func (compositeSource *CompositeSource) Get() (map[string]interface{}, error) { + + resultEnv := make(map[string]interface{}) + + _, err := From(compositeSource.Sources). + Select(func(rawSource T) (T, error) { + loader, err := GetSource(rawSource.(map[string]interface{})) + + if err != nil { + return nil, fmt.Errorf("Failed to parse source: %s", err) + } + + return loader, nil + }). + // Resolve in parallel because some sources might use IO and will take some time + AsParallel().AsOrdered(). + Select(func(loader T) (T, error) { + result, err := loader.(Source).Get() + + if err != nil { + return nil, fmt.Errorf("Failed to resolve source: %s", err) + } + + return result, nil + }). + AsSequential(). + CountBy(func(partialConfig T) (bool, error) { + for key, value := range flatmap.Flatten(partialConfig.(map[string]interface{})) { + resultEnv[key] = value + } + return true, nil + }) + + return resultEnv, err +} diff --git a/config.go b/sources/config.go similarity index 58% rename from config.go rename to sources/config.go index 716c2be..7943ede 100644 --- a/config.go +++ b/sources/config.go @@ -1,10 +1,8 @@ -package main +package sources import ( - "encoding/json" "fmt" "github.com/mitchellh/mapstructure" - "github.com/zeroturnaround/configo/sources" "reflect" ) @@ -16,27 +14,21 @@ type Source interface { } var configMappings = map[string]reflect.Type{ - "consul": reflect.TypeOf(sources.ConsulSource{}), - "dynamodb": reflect.TypeOf(sources.DynamoDBSource{}), - "etcd": reflect.TypeOf(sources.EtcdSource{}), - "file": reflect.TypeOf(sources.FileSource{}), - "http": reflect.TypeOf(sources.HTTPSource{}), - "redis": reflect.TypeOf(sources.RedisSource{}), - "shell": reflect.TypeOf(sources.ShellSource{}), - "vault": reflect.TypeOf(sources.VaultSource{}), + "composite": reflect.TypeOf(CompositeSource{}), + "consul": reflect.TypeOf(ConsulSource{}), + "dynamodb": reflect.TypeOf(DynamoDBSource{}), + "etcd": reflect.TypeOf(EtcdSource{}), + "file": reflect.TypeOf(FileSource{}), + "http": reflect.TypeOf(HTTPSource{}), + "redis": reflect.TypeOf(RedisSource{}), + "shell": reflect.TypeOf(ShellSource{}), + "vault": reflect.TypeOf(VaultSource{}), } -// GetSource resolves source by string (in JSON format). +// GetSource resolves source from a map. // Source must contain at least one property with name "type", which will be // used to select proper source implementation. -func GetSource(source string) (Source, error) { - rawSource := make(map[string]interface{}) - - sourceBytes := []byte(source) - if err := json.Unmarshal(sourceBytes, &rawSource); err != nil { - return nil, err - } - +func GetSource(rawSource map[string]interface{}) (Source, error) { sourceType, found := configMappings[rawSource["type"].(string)] if !found { return nil, fmt.Errorf("Failed to find source type: %s", rawSource["type"]) diff --git a/spec/integration/sources/composite.bats b/spec/integration/sources/composite.bats new file mode 100644 index 0000000..bd9014d --- /dev/null +++ b/spec/integration/sources/composite.bats @@ -0,0 +1,63 @@ +#!/usr/bin/env bats + +load ../test_helper + +@test "sources: composite works" { + run_container <