Skip to content

Commit

Permalink
feat: support runtime configurations in workspace
Browse files Browse the repository at this point in the history
  • Loading branch information
liu-hm19 committed Jul 10, 2024
1 parent db7f6f1 commit 9671d5e
Show file tree
Hide file tree
Showing 18 changed files with 225 additions and 38 deletions.
7 changes: 6 additions & 1 deletion pkg/apis/api.kusion.io/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type Workspace struct {
// SecretStore represents a secure external location for storing secrets.
SecretStore *SecretStore `yaml:"secretStore,omitempty" json:"secretStore,omitempty"`

// Context contains workspace-level configurations, such as topologies, server endpoints, metadata, etc.
// Context contains workspace-level configurations, such as runtimes, topologies, and metadata, etc.
Context GenericConfig `yaml:"context,omitempty" json:"context,omitempty"`
}

Expand Down Expand Up @@ -475,6 +475,9 @@ const (
EnvAwsSecretAccessKey = "AWS_SECRET_ACCESS_KEY"
EnvAwsDefaultRegion = "AWS_DEFAULT_REGION"
EnvAwsRegion = "AWS_REGION"
EnvAlicloudAccessKey = "ALICLOUD_ACCESS_KEY"
EnvAlicloudSecretKey = "ALICLOUD_SECRET_KEY"
EnvAlicloudRegion = "ALICLOUD_REGION"
)

// BackendConfigs contains the configuration of multiple backends and the current backend.
Expand Down Expand Up @@ -861,6 +864,8 @@ type Spec struct {
Resources Resources `yaml:"resources" json:"resources"`
// SecretSore represents a external secret store location for storing secrets.
SecretStore *SecretStore `yaml:"secretStore" json:"secretStore"`
// Context contains workspace-level configurations, such as runtimes, topologies, and metadata, etc.
Context GenericConfig `yaml:"context,omitempty" json:"context,omitempty"`
}

// State is a record of an operation's result. It is a mapping between resources in KCL and the actual
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ func Watch(
watchErrCh <- *err
}()
// Init the runtimes according to the resource types.
runtimes, s := runtimeinit.Runtimes(toBeWatched)
runtimes, s := runtimeinit.Runtimes(*rel.Spec)
if v1.IsErr(s) {
panic(fmt.Errorf("failed to init runtimes: %s", s.String()))
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/engine/operation/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,7 @@ func (ao *ApplyOperation) Apply(req *ApplyRequest) (rsp *ApplyResponse, s v1.Sta
stateResourceIndex[k] = v
}

resources := req.Release.Spec.Resources
resources = append(resources, priorState.Resources...)
runtimesMap, s := runtimeinit.Runtimes(resources)
runtimesMap, s := runtimeinit.Runtimes(*req.Release.Spec)
if v1.IsErr(s) {
return nil, s
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/operation/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestApplyOperation_Apply(t *testing.T) {
return nil
}).Build()
mockey.Mock(runtimeinit.Runtimes).To(func(
resources apiv1.Resources,
spec apiv1.Spec,
) (map[apiv1.Type]runtime.Runtime, v1.Status) {
return map[apiv1.Type]runtime.Runtime{runtime.Kubernetes: &kubernetes.KubernetesRuntime{}}, nil
}).Build()
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/operation/destory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (do *DestroyOperation) Destroy(req *DestroyRequest) (rsp *DestroyResponse,

// only destroy resources we have recorded
resources := priorState.Resources
runtimesMap, s := runtimeinit.Runtimes(resources)
runtimesMap, s := runtimeinit.Runtimes(*req.Release.Spec)
if v1.IsErr(s) {
return nil, s
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/engine/operation/graph/resource_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (rn *ResourceNode) replaceK8sSecretRefs(o *models.Operation) v1.Status {
continue
}

externalSecretRef, err := parseExternalSecretDataRef(ref)
externalSecretRef, err := ParseExternalSecretDataRef(ref)
if err != nil {
return v1.NewErrorStatus(err)
}
Expand Down Expand Up @@ -513,8 +513,8 @@ func ReplaceRef(
return result, v, nil
}

// parseExternalSecretDataRef knows how to parse the remote data ref string, returns the corresponding ExternalSecretRef object.
func parseExternalSecretDataRef(dataRefStr string) (*apiv1.ExternalSecretRef, error) {
// ParseExternalSecretDataRef knows how to parse the remote data ref string, returns the corresponding ExternalSecretRef object.
func ParseExternalSecretDataRef(dataRefStr string) (*apiv1.ExternalSecretRef, error) {
uri, err := url.Parse(dataRefStr)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/operation/graph/resource_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func TestParseExternalSecretDataRef(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseExternalSecretDataRef(tt.dataRefStr)
got, err := ParseExternalSecretDataRef(tt.dataRefStr)
if (err != nil) != tt.wantErr {
t.Errorf("parseExternalSecretDataRef() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
6 changes: 2 additions & 4 deletions pkg/engine/operation/preview.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ func (po *PreviewOperation) Preview(req *PreviewRequest) (rsp *PreviewResponse,
priorState := req.State

// Kusion is a multi-runtime system. We initialize runtimes dynamically by resource types
resources := req.Spec.Resources
resources = append(resources, priorState.Resources...)
runtimesMap, s := runtimeinit.Runtimes(resources)
runtimesMap, s := runtimeinit.Runtimes(*req.Spec)
if v1.IsErr(s) {
return nil, s
}
Expand All @@ -75,7 +73,7 @@ func (po *PreviewOperation) Preview(req *PreviewRequest) (rsp *PreviewResponse,
priorStateResourceIndex = priorState.Resources.Index()
ag, s = newApplyGraph(req.Spec, priorState)
case models.DestroyPreview:
resources = req.Spec.Resources
resources := req.Spec.Resources
priorStateResourceIndex = resources.Index()
ag, s = newDestroyGraph(resources)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/operation/preview_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func TestPreviewOperation_Preview(t *testing.T) {
}

mockey.Mock(runtimeinit.Runtimes).To(func(
resources apiv1.Resources,
spec apiv1.Spec,
) (map[apiv1.Type]runtime.Runtime, v1.Status) {
return map[apiv1.Type]runtime.Runtime{runtime.Kubernetes: &fakePreviewRuntime{}}, nil
}).Build()
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/operation/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (wo *WatchOperation) Watch(req *WatchRequest) error {

// init runtimes
resources := req.Spec.Resources
runtimes, s := runtimeinit.Runtimes(resources)
runtimes, s := runtimeinit.Runtimes(*req.Spec)
if v1.IsErr(s) {
return errors.New(s.Message())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/operation/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestWatchOperation_Watch(t *testing.T) {
},
}
mockey.Mock(runtimeinit.Runtimes).To(func(
resources apiv1.Resources,
spec apiv1.Spec,
) (map[apiv1.Type]runtime.Runtime, v1.Status) {
return map[apiv1.Type]runtime.Runtime{runtime.Kubernetes: fooRuntime}, nil
}).Build()
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/release/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func CreateDestroyRelease(storage Storage, project, stack, workspace string) (*v

resources := make([]v1.Resource, len(lastRelease.State.Resources))
copy(resources, lastRelease.State.Resources)
spec := &v1.Spec{Resources: resources}
spec := &v1.Spec{Resources: resources, SecretStore: lastRelease.Spec.SecretStore, Context: lastRelease.Spec.Context}
// if no resource managed, set phase to Succeeded directly.
phase := v1.ReleasePhasePreviewing
if len(resources) == 0 {
Expand Down
73 changes: 70 additions & 3 deletions pkg/engine/runtime/init/init.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,45 @@
package init

import (
"context"
"errors"
"fmt"
"reflect"
"strings"

apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1"
v1 "kusionstack.io/kusion/pkg/apis/status/v1"
"kusionstack.io/kusion/pkg/engine/operation/graph"
"kusionstack.io/kusion/pkg/engine/runtime"
"kusionstack.io/kusion/pkg/engine/runtime/kubernetes"
"kusionstack.io/kusion/pkg/engine/runtime/kubernetes/kubeops"
"kusionstack.io/kusion/pkg/engine/runtime/terraform"
"kusionstack.io/kusion/pkg/secrets"
"kusionstack.io/kusion/pkg/workspace"
)

var SupportRuntimes = map[apiv1.Type]InitFn{
runtime.Kubernetes: kubernetes.NewKubernetesRuntime,
runtime.Terraform: terraform.NewTerraformRuntime,
}

var contextKeys = []string{
kubeops.KubeConfigContentKey,
apiv1.EnvAwsAccessKeyID,
apiv1.EnvAwsSecretAccessKey,
apiv1.EnvAlicloudAccessKey,
apiv1.EnvAlicloudSecretKey,
}

// InitFn runtime init func
type InitFn func(resource *apiv1.Resource) (runtime.Runtime, error)
type InitFn func(spec apiv1.Spec) (runtime.Runtime, error)

func Runtimes(resources apiv1.Resources) (map[apiv1.Type]runtime.Runtime, v1.Status) {
func Runtimes(spec apiv1.Spec) (map[apiv1.Type]runtime.Runtime, v1.Status) {
// Parse the secret ref in the Context of Spec.
if err := parseContextSecretRef(&spec); err != nil {
return nil, v1.NewErrorStatus(err)
}
resources := spec.Resources
runtimesMap := map[apiv1.Type]runtime.Runtime{}
if resources == nil {
return runtimesMap, nil
Expand All @@ -32,7 +51,7 @@ func Runtimes(resources apiv1.Resources) (map[apiv1.Type]runtime.Runtime, v1.Sta
for _, resource := range resources {
rt := resource.Type
if runtimesMap[rt] == nil {
r, err := SupportRuntimes[rt](&resource)
r, err := SupportRuntimes[rt](spec)
if err != nil {
return nil, v1.NewErrorStatus(fmt.Errorf("init %s runtime failed. %w", rt, err))
}
Expand Down Expand Up @@ -65,3 +84,51 @@ func validResources(resources apiv1.Resources) v1.Status {
}
return nil
}

// parseContextSecretRef parses the external secret ref of the credentials
// in the Context of Spec.
func parseContextSecretRef(spec *apiv1.Spec) error {
// Copy the Context of Spec.
parsedContext := apiv1.GenericConfig{}
for k, v := range spec.Context {
parsedContext[k] = v
}

// Retrieve the context with the specified keys from spec and parse the external secret ref.
for _, key := range contextKeys {
contextStr, err := workspace.GetStringFromGenericConfig(spec.Context, key)
if err != nil {
return err
}

if contextStr != "" {
// Replace the secret store ref.
if strings.HasPrefix(contextStr, graph.SecretRefPrefix) {
externalSecretRef, err := graph.ParseExternalSecretDataRef(contextStr)
if err != nil {
return err
}

provider, exist := secrets.GetProvider(spec.SecretStore.Provider)
if !exist {
return errors.New("no matched secret store found, please check workspace yaml")
}
secretStore, err := provider.NewSecretStore(spec.SecretStore)
if err != nil {
return err
}
secretData, err := secretStore.GetSecret(context.Background(), *externalSecretRef)
if err != nil {
return err
}

parsedContext[key] = string(secretData)
}
}
}

// Reset the Context with the parsed values.
spec.Context = parsedContext

return nil
}
5 changes: 5 additions & 0 deletions pkg/engine/runtime/kubernetes/kubeops/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ const (
RecommendedKubeConfigFileName = "config"
)

const (
KubeConfigPathKey = "kubeconfig_path"
KubeConfigContentKey = "kubeconfig_content"
)

var (
RecommendedConfigDir = filepath.Join(homedir.HomeDir(), RecommendedHomeDir)
RecommendedKubeConfigFile = filepath.Join(RecommendedConfigDir, RecommendedKubeConfigFileName)
Expand Down
52 changes: 46 additions & 6 deletions pkg/engine/runtime/kubernetes/kubernetes_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"kusionstack.io/kusion/pkg/engine/runtime/kubernetes/kubeops"
"kusionstack.io/kusion/pkg/log"
jsonutil "kusionstack.io/kusion/pkg/util/json"
"kusionstack.io/kusion/pkg/workspace"
)

var _ runtime.Runtime = (*KubernetesRuntime)(nil)
Expand All @@ -45,8 +46,8 @@ type KubernetesRuntime struct {
}

// NewKubernetesRuntime create a new KubernetesRuntime
func NewKubernetesRuntime(resource *apiv1.Resource) (runtime.Runtime, error) {
client, mapper, err := getKubernetesClient(resource)
func NewKubernetesRuntime(spec apiv1.Spec) (runtime.Runtime, error) {
client, mapper, err := getKubernetesClient(spec)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -394,11 +395,50 @@ func (k *KubernetesRuntime) Watch(ctx context.Context, request *runtime.WatchReq
}

// getKubernetesClient get kubernetes client
func getKubernetesClient(resource *apiv1.Resource) (dynamic.Interface, meta.RESTMapper, error) {
func getKubernetesClient(spec apiv1.Spec) (dynamic.Interface, meta.RESTMapper, error) {
// build config
cfg, err := clientcmd.BuildConfigFromFlags("", kubeops.GetKubeConfig(resource))
if err != nil {
return nil, nil, err
var err error
var cfg *rest.Config

if spec.Context != nil {
kubeConfigPath, err := workspace.GetStringFromGenericConfig(spec.Context, kubeops.KubeConfigPathKey)
if err != nil {
return nil, nil, err
}
kubeConfigContent, err := workspace.GetStringFromGenericConfig(spec.Context, kubeops.KubeConfigContentKey)
if err != nil {
return nil, nil, err
}
if kubeConfigContent != "" {
clientCfg, err := clientcmd.NewClientConfigFromBytes([]byte(kubeConfigContent))
if err != nil {
return nil, nil, err
}

cfg, err = clientCfg.ClientConfig()
if err != nil {
return nil, nil, err
}
} else {
cfg, err = clientcmd.BuildConfigFromFlags("", kubeConfigPath)
if err != nil {
return nil, nil, err
}
}
} else {
var kubeConfigFromRes string
for _, res := range spec.Resources {
if res.Type == apiv1.Kubernetes {
kubeConfigFromRes = kubeops.GetKubeConfig(&res)
}
if kubeConfigFromRes != "" {
break
}
}
cfg, err = clientcmd.BuildConfigFromFlags("", kubeConfigFromRes)
if err != nil {
return nil, nil, err
}
}

// DynamicRESTMapper can discover resource types at runtime dynamically
Expand Down
14 changes: 8 additions & 6 deletions pkg/engine/runtime/terraform/terraform_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ var _ runtime.Runtime = &Runtime{}
var tfEvents = cache.New(cache.NoExpiration, cache.NoExpiration)

type Runtime struct {
mutex *sync.Mutex
mutex *sync.Mutex
context apiv1.GenericConfig
}

func NewTerraformRuntime(_ *apiv1.Resource) (runtime.Runtime, error) {
func NewTerraformRuntime(spec apiv1.Spec) (runtime.Runtime, error) {
TFRuntime := &Runtime{
mutex: &sync.Mutex{},
mutex: &sync.Mutex{},
context: spec.Context,
}
return TFRuntime, nil
}
Expand All @@ -42,7 +44,7 @@ func (t *Runtime) Apply(ctx context.Context, request *runtime.ApplyRequest) *run
stackPath := request.Stack.Path
key := plan.ResourceKey()
tfCacheDir := buildTFCacheDir(stackPath, key)
ws := tfops.NewWorkSpace(plan, stackPath, tfCacheDir, t.mutex)
ws := tfops.NewWorkSpace(plan, stackPath, tfCacheDir, t.mutex, t.context)

if err := ws.WriteHCL(); err != nil {
return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
Expand Down Expand Up @@ -199,7 +201,7 @@ func (t *Runtime) Read(ctx context.Context, request *runtime.ReadRequest) *runti
stackPath := request.Stack.Path
tfCacheDir := buildTFCacheDir(stackPath, planResource.ResourceKey())

ws := tfops.NewWorkSpace(planResource, stackPath, tfCacheDir, t.mutex)
ws := tfops.NewWorkSpace(planResource, stackPath, tfCacheDir, t.mutex, t.context)
if err := ws.WriteHCL(); err != nil {
return &runtime.ReadResponse{Resource: nil, Status: v1.NewErrorStatus(err)}
}
Expand Down Expand Up @@ -282,7 +284,7 @@ func (t *Runtime) Delete(ctx context.Context, request *runtime.DeleteRequest) (r
stackPath := request.Stack.Path
tfCacheDir := buildTFCacheDir(stackPath, request.Resource.ResourceKey())

ws := tfops.NewWorkSpace(request.Resource, stackPath, tfCacheDir, t.mutex)
ws := tfops.NewWorkSpace(request.Resource, stackPath, tfCacheDir, t.mutex, t.context)
if err := ws.Destroy(ctx); err != nil {
return &runtime.DeleteResponse{Status: v1.NewErrorStatus(err)}
}
Expand Down
Loading

0 comments on commit 9671d5e

Please sign in to comment.