Skip to content

Commit

Permalink
✨ Add compute-config rig-ops command to dry-run plugin configuration (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
MatiasFrank authored Sep 12, 2024
1 parent 50d800d commit 07f8597
Show file tree
Hide file tree
Showing 31 changed files with 511 additions and 38 deletions.
82 changes: 82 additions & 0 deletions cmd/rig-ops/cmd/plugins/computeConfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package plugins

import (
"context"
"os"

"connectrpc.com/connect"
"github.com/rigdev/rig-go-api/operator/api/v1/pipeline"
"github.com/rigdev/rig/cmd/common"
"github.com/rigdev/rig/cmd/rig-ops/cmd/base"
"github.com/rigdev/rig/pkg/api/v1alpha2"
"github.com/rigdev/rig/pkg/obj"
"github.com/spf13/cobra"
"gopkg.in/yaml.v3"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (c *Cmd) computeConfig(ctx context.Context, _ *cobra.Command, args []string) error {
cfg, err := base.GetOperatorConfig(ctx, c.OperatorClient, c.Scheme)
if err != nil {
return err
}

var spec string
var capsule v1alpha2.Capsule
if len(args) > 0 {
if err := c.K8s.Get(ctx, client.ObjectKey{
Namespace: args[0],
Name: args[1],
}, &capsule); err != nil {
return err
}
} else if specPath != "" {
bytes, err := os.ReadFile(specPath)
if err != nil {
return err
}
spec = string(bytes)
if err := obj.Decode([]byte(spec), &capsule); err != nil {
return err
}
} else {
capsuleList := v1alpha2.CapsuleList{}
if err := c.K8s.List(ctx, &capsuleList); err != nil {
return err
}
var choices [][]string
for _, c := range capsuleList.Items {
choices = append(choices, []string{c.Namespace, c.Name})
}
idx, err := c.Prompter.TableSelect(
"Choose a capsule", choices, []string{"Namespace", "Capsule"}, common.SelectEnableFilterOpt,
)
if err != nil {
return err
}
choice := choices[idx]
if err := c.K8s.Get(ctx, client.ObjectKey{
Namespace: choice[0],
Name: choice[1],
}, &capsule); err != nil {
return err
}
}

cfgBytes, err := yaml.Marshal(cfg)
if err != nil {
return err
}

resp, err := c.OperatorClient.Pipeline.DryRunPluginConfig(ctx, connect.NewRequest(&pipeline.DryRunPluginConfigRequest{
Namespace: capsule.Namespace,
Capsule: capsule.Name,
OperatorConfig: string(cfgBytes),
CapsuleSpec: spec,
}))
if err != nil {
return err
}

return common.FormatPrint(resp.Msg.GetSteps(), common.OutputTypeYAML)
}
16 changes: 16 additions & 0 deletions cmd/rig-ops/cmd/plugins/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,21 @@ The dry run will be executed with the resulting list of plugins.`,
}
pluginsCmd.AddCommand(list)

computeConfig := &cobra.Command{
Use: "compute-config",
//nolint:lll
Short: "Given an operator config and a capsule spec, computes the configuration generated for each plugin.",
Args: func(_ *cobra.Command, args []string) error {
if len(args) != 0 && len(args) != 2 {
return errors.New("takes exactly 0 or 2 arguments")
}
return nil
},
RunE: cli.CtxWrap(cmd.computeConfig),
}
//nolint:lll
computeConfig.Flags().StringVar(&specPath, "spec", "", "If given, will read the capsule spec at the path instead of using the capsule spec of an existing capsule from the platform")
pluginsCmd.AddCommand(computeConfig)

parent.AddCommand(pluginsCmd)
}
60 changes: 46 additions & 14 deletions pkg/controller/plugin/external_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ func (p *pluginExecutor) WatchObjectStatus(
return p.pluginClient.WatchObjectStatus(ctx, namespace, capsule, callback, p.id)
}

func (p *pluginExecutor) ComputeConfig(ctx context.Context, req pipeline.CapsuleRequest) (string, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
return p.pluginClient.ComputeConfig(ctx, req)
}

type rigOperatorPlugin struct {
plugin.NetRPCUnsupportedPlugin
logger hclog.Logger
Expand Down Expand Up @@ -239,23 +245,10 @@ func (m *pluginClient) Initialize(ctx context.Context, pluginConfig, tag string,
}

func (m *pluginClient) Run(ctx context.Context, req pipeline.CapsuleRequest, opts pipeline.Options) error {
reqServer := &requestServer{req: req}
capsuleBytes, err := obj.Encode(req.Capsule(), req.Scheme())
s, brokerID, err := m.setupGRPCServer(req)
if err != nil {
return err
}

c := make(chan *grpc.Server)
serverFunc := func(opts []grpc.ServerOption) *grpc.Server {
s := grpc.NewServer(opts...)
apiplugin.RegisterRequestServiceServer(s, reqServer)
c <- s
return s
}

brokerID := m.broker.NextId()
go m.broker.AcceptAndServe(brokerID, serverFunc)
s := <-c
defer s.Stop()

var additionalObjects [][]byte
Expand All @@ -267,6 +260,10 @@ func (m *pluginClient) Run(ctx context.Context, req pipeline.CapsuleRequest, opt
additionalObjects = append(additionalObjects, bs)
}

capsuleBytes, err := obj.Encode(req.Capsule(), req.Scheme())
if err != nil {
return err
}
_, err = m.client.RunCapsule(ctx, &apiplugin.RunCapsuleRequest{
RunServer: brokerID,
CapsuleObject: capsuleBytes,
Expand All @@ -276,6 +273,21 @@ func (m *pluginClient) Run(ctx context.Context, req pipeline.CapsuleRequest, opt
return err
}

func (m *pluginClient) setupGRPCServer(req pipeline.CapsuleRequest) (*grpc.Server, uint32, error) {
reqServer := &requestServer{req: req}
c := make(chan *grpc.Server)
serverFunc := func(opts []grpc.ServerOption) *grpc.Server {
s := grpc.NewServer(opts...)
apiplugin.RegisterRequestServiceServer(s, reqServer)
c <- s
return s
}
brokerID := m.broker.NextId()
go m.broker.AcceptAndServe(brokerID, serverFunc)
s := <-c
return s, brokerID, nil
}

func (m *pluginClient) WatchObjectStatus(
ctx context.Context,
namespace string,
Expand All @@ -301,6 +313,26 @@ func (m *pluginClient) WatchObjectStatus(
}
}

func (m *pluginClient) ComputeConfig(ctx context.Context, req pipeline.CapsuleRequest) (string, error) {
s, brokerID, err := m.setupGRPCServer(req)
if err != nil {
return "", err
}
defer s.Stop()

capsuleBytes, err := obj.Encode(req.Capsule(), req.Scheme())
if err != nil {
return "", err
}

resp, err := m.client.ComputeConfig(ctx, &apiplugin.ComputeConfigRequest{
RunServer: brokerID,
CapsuleObject: capsuleBytes,
})

return resp.GetConfig(), err
}

type requestServer struct {
apiplugin.UnimplementedRequestServiceServer

Expand Down
53 changes: 41 additions & 12 deletions pkg/controller/plugin/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package plugin

import (
"bytes"
"encoding/json"
"html/template"

"github.com/mitchellh/mapstructure"
"github.com/rigdev/rig/pkg/obj"
"github.com/rigdev/rig/pkg/pipeline"
"sigs.k8s.io/yaml"
)

type ParseStep[T any] func(config T, req pipeline.CapsuleRequest) (string, any, error)
type ParseStep[T any] func(config T, req pipeline.CapsuleRequest) (map[string]any, error)

// ParseCapsuleTemplatedConfig parses the given data as a Go template with
// the capsule as a templating context under '.capsule'
Expand All @@ -21,25 +23,30 @@ func ParseCapsuleTemplatedConfig[T any](data []byte, req pipeline.CapsuleRequest
// Using this, we parse the config at every execution of the plugin.
// If we get performance issues due to that we can try and optimize that.
func ParseTemplatedConfig[T any](data []byte, req pipeline.CapsuleRequest, steps ...ParseStep[T]) (T, error) {
if len(data) == 0 {
data = []byte("{}")
}
var config, empty T

values := map[string]any{}
for _, step := range steps {
name, obj, err := step(config, req)
m, err := step(config, req)
if err != nil {
return empty, err
}

result := map[string]interface{}{}
d, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{TagName: "json", Result: &result})
if err != nil {
return empty, err
for k, v := range m {
result := map[string]interface{}{}
d, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{TagName: "json", Result: &result})
if err != nil {
return empty, err
}
if err := d.Decode(v); err != nil {
return empty, err
}
values[k] = result
}

if err := d.Decode(obj); err != nil {
return empty, err
}
values[name] = result
t, err := template.New("config").Parse(string(data))
if err != nil {
return empty, err
Expand All @@ -56,10 +63,32 @@ func ParseTemplatedConfig[T any](data []byte, req pipeline.CapsuleRequest, steps
return config, nil
}

func CapsuleStep[T any](_ T, req pipeline.CapsuleRequest) (string, any, error) {
return "capsule", req.Capsule(), nil
func CapsuleStep[T any](_ T, req pipeline.CapsuleRequest) (map[string]any, error) {
c := req.Capsule()
extensions := map[string]any{}
for k, v := range c.Spec.Extensions {
vv := map[string]any{}
if err := json.Unmarshal(v, &vv); err != nil {
return nil, err
}
extensions[k] = vv
}
c.Spec.Extensions = nil
return map[string]any{
"capsule": c,
"capsuleExtensions": extensions,
}, nil
}

func LoadYAMLConfig(data []byte, out any) error {
return obj.Decode(data, out)
}

func ParseCapsuleTemplatedConfigToString[T any](data []byte, req pipeline.CapsuleRequest) (string, error) {
obj, err := ParseCapsuleTemplatedConfig[T](data, req)
if err != nil {
return "", err
}
bs, err := yaml.Marshal(obj)
return string(bs), err
}
57 changes: 57 additions & 0 deletions pkg/controller/plugin/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package plugin

import (
"context"
"encoding/json"
"testing"

"github.com/go-logr/logr"
"github.com/rigdev/rig/pkg/api/v1alpha2"
"github.com/rigdev/rig/pkg/pipeline"
"github.com/rigdev/rig/pkg/scheme"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func Test_ParseCapsuleTemplatedConfig(t *testing.T) {
name, namespace := "name", "namespace"
vm := scheme.NewVersionMapperFromScheme(scheme.New())
p := pipeline.NewCapsulePipeline(nil, scheme.New(), vm, logr.FromContextOrDiscard(context.Background()))

req := pipeline.NewCapsuleRequest(p, &v1alpha2.Capsule{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: v1alpha2.CapsuleSpec{
Extensions: map[string]json.RawMessage{
"ext": json.RawMessage(`{"field": "value"}`),
},
Scale: v1alpha2.CapsuleScale{
Horizontal: v1alpha2.HorizontalScale{
Instances: v1alpha2.Instances{
Min: 69,
},
},
},
},
}, nil)

s := `hej: asdf
hej2: {{ .capsuleExtensions.ext.field }}
hej3: {{ .capsule.spec.scale.horizontal.instances.min }}`
conf, err := ParseCapsuleTemplatedConfig[config]([]byte(s), req)
require.NoError(t, err)

require.Equal(t, config{
Hej: "asdf",
Hej2: "value",
Hej3: 69,
}, conf)
}

type config struct {
Hej string `json:"hej"`
Hej2 string `json:"hej2"`
Hej3 int `json:"hej3"`
}
5 changes: 4 additions & 1 deletion pkg/controller/plugin/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,9 @@ func (m *Manager) GetPlugins() []Info {
return plugins
}

func (m *Manager) NewStep(execCtx ExecutionContext, step v1alpha1.Step, logger logr.Logger) (*Step, error) {
func (m *Manager) NewStep(
execCtx ExecutionContext, step v1alpha1.Step, logger logr.Logger, name string,
) (*Step, error) {
var err error
var ps []*pluginExecutor
defer func() {
Expand Down Expand Up @@ -274,6 +276,7 @@ func (m *Manager) NewStep(execCtx ExecutionContext, step v1alpha1.Step, logger l
logger: logger,
plugins: ps,
matcher: matcher,
name: name,
}, nil
}

Expand Down
Loading

0 comments on commit 07f8597

Please sign in to comment.