Skip to content
This repository was archived by the owner on Mar 24, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions cmd/rig-ops/cmd/plugins/computeConfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package plugins

import (
"context"
"os"

"connectrpc.com/connect"
"github.com/rigdev/rig-go-api/operator/api/v1/pipeline"
"github.com/rigdev/rig/cmd/common"
"github.com/rigdev/rig/cmd/rig-ops/cmd/base"
"github.com/rigdev/rig/pkg/api/v1alpha2"
"github.com/rigdev/rig/pkg/obj"
"github.com/spf13/cobra"
"gopkg.in/yaml.v3"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (c *Cmd) computeConfig(ctx context.Context, _ *cobra.Command, args []string) error {
cfg, err := base.GetOperatorConfig(ctx, c.OperatorClient, c.Scheme)
if err != nil {
return err
}

var spec string
var capsule v1alpha2.Capsule
if len(args) > 0 {
if err := c.K8s.Get(ctx, client.ObjectKey{
Namespace: args[0],
Name: args[1],
}, &capsule); err != nil {
return err
}
} else if specPath != "" {
bytes, err := os.ReadFile(specPath)
if err != nil {
return err
}
spec = string(bytes)
if err := obj.Decode([]byte(spec), &capsule); err != nil {
return err
}
} else {
capsuleList := v1alpha2.CapsuleList{}
if err := c.K8s.List(ctx, &capsuleList); err != nil {
return err
}
var choices [][]string
for _, c := range capsuleList.Items {
choices = append(choices, []string{c.Namespace, c.Name})
}
idx, err := c.Prompter.TableSelect(
"Choose a capsule", choices, []string{"Namespace", "Capsule"}, common.SelectEnableFilterOpt,
)
if err != nil {
return err
}
choice := choices[idx]
if err := c.K8s.Get(ctx, client.ObjectKey{
Namespace: choice[0],
Name: choice[1],
}, &capsule); err != nil {
return err
}
}

cfgBytes, err := yaml.Marshal(cfg)
if err != nil {
return err
}

resp, err := c.OperatorClient.Pipeline.DryRunPluginConfig(ctx, connect.NewRequest(&pipeline.DryRunPluginConfigRequest{
Namespace: capsule.Namespace,
Capsule: capsule.Name,
OperatorConfig: string(cfgBytes),
CapsuleSpec: spec,
}))
if err != nil {
return err
}

return common.FormatPrint(resp.Msg.GetSteps(), common.OutputTypeYAML)
}
16 changes: 16 additions & 0 deletions cmd/rig-ops/cmd/plugins/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,21 @@ The dry run will be executed with the resulting list of plugins.`,
}
pluginsCmd.AddCommand(list)

computeConfig := &cobra.Command{
Use: "compute-config",
//nolint:lll
Short: "Given an operator config and a capsule spec, computes the configuration generated for each plugin.",
Args: func(_ *cobra.Command, args []string) error {
if len(args) != 0 && len(args) != 2 {
return errors.New("takes exactly 0 or 2 arguments")
}
return nil
},
RunE: cli.CtxWrap(cmd.computeConfig),
}
//nolint:lll
computeConfig.Flags().StringVar(&specPath, "spec", "", "If given, will read the capsule spec at the path instead of using the capsule spec of an existing capsule from the platform")
pluginsCmd.AddCommand(computeConfig)

parent.AddCommand(pluginsCmd)
}
60 changes: 46 additions & 14 deletions pkg/controller/plugin/external_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ func (p *pluginExecutor) WatchObjectStatus(
return p.pluginClient.WatchObjectStatus(ctx, namespace, capsule, callback, p.id)
}

func (p *pluginExecutor) ComputeConfig(ctx context.Context, req pipeline.CapsuleRequest) (string, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
return p.pluginClient.ComputeConfig(ctx, req)
}

type rigOperatorPlugin struct {
plugin.NetRPCUnsupportedPlugin
logger hclog.Logger
Expand Down Expand Up @@ -239,23 +245,10 @@ func (m *pluginClient) Initialize(ctx context.Context, pluginConfig, tag string,
}

func (m *pluginClient) Run(ctx context.Context, req pipeline.CapsuleRequest, opts pipeline.Options) error {
reqServer := &requestServer{req: req}
capsuleBytes, err := obj.Encode(req.Capsule(), req.Scheme())
s, brokerID, err := m.setupGRPCServer(req)
if err != nil {
return err
}

c := make(chan *grpc.Server)
serverFunc := func(opts []grpc.ServerOption) *grpc.Server {
s := grpc.NewServer(opts...)
apiplugin.RegisterRequestServiceServer(s, reqServer)
c <- s
return s
}

brokerID := m.broker.NextId()
go m.broker.AcceptAndServe(brokerID, serverFunc)
s := <-c
defer s.Stop()

var additionalObjects [][]byte
Expand All @@ -267,6 +260,10 @@ func (m *pluginClient) Run(ctx context.Context, req pipeline.CapsuleRequest, opt
additionalObjects = append(additionalObjects, bs)
}

capsuleBytes, err := obj.Encode(req.Capsule(), req.Scheme())
if err != nil {
return err
}
_, err = m.client.RunCapsule(ctx, &apiplugin.RunCapsuleRequest{
RunServer: brokerID,
CapsuleObject: capsuleBytes,
Expand All @@ -276,6 +273,21 @@ func (m *pluginClient) Run(ctx context.Context, req pipeline.CapsuleRequest, opt
return err
}

func (m *pluginClient) setupGRPCServer(req pipeline.CapsuleRequest) (*grpc.Server, uint32, error) {
reqServer := &requestServer{req: req}
c := make(chan *grpc.Server)
serverFunc := func(opts []grpc.ServerOption) *grpc.Server {
s := grpc.NewServer(opts...)
apiplugin.RegisterRequestServiceServer(s, reqServer)
c <- s
return s
}
brokerID := m.broker.NextId()
go m.broker.AcceptAndServe(brokerID, serverFunc)
s := <-c
return s, brokerID, nil
}

func (m *pluginClient) WatchObjectStatus(
ctx context.Context,
namespace string,
Expand All @@ -301,6 +313,26 @@ func (m *pluginClient) WatchObjectStatus(
}
}

func (m *pluginClient) ComputeConfig(ctx context.Context, req pipeline.CapsuleRequest) (string, error) {
s, brokerID, err := m.setupGRPCServer(req)
if err != nil {
return "", err
}
defer s.Stop()

capsuleBytes, err := obj.Encode(req.Capsule(), req.Scheme())
if err != nil {
return "", err
}

resp, err := m.client.ComputeConfig(ctx, &apiplugin.ComputeConfigRequest{
RunServer: brokerID,
CapsuleObject: capsuleBytes,
})

return resp.GetConfig(), err
}

type requestServer struct {
apiplugin.UnimplementedRequestServiceServer

Expand Down
53 changes: 41 additions & 12 deletions pkg/controller/plugin/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package plugin

import (
"bytes"
"encoding/json"
"html/template"

"github.com/mitchellh/mapstructure"
"github.com/rigdev/rig/pkg/obj"
"github.com/rigdev/rig/pkg/pipeline"
"sigs.k8s.io/yaml"
)

type ParseStep[T any] func(config T, req pipeline.CapsuleRequest) (string, any, error)
type ParseStep[T any] func(config T, req pipeline.CapsuleRequest) (map[string]any, error)

// ParseCapsuleTemplatedConfig parses the given data as a Go template with
// the capsule as a templating context under '.capsule'
Expand All @@ -21,25 +23,30 @@ func ParseCapsuleTemplatedConfig[T any](data []byte, req pipeline.CapsuleRequest
// Using this, we parse the config at every execution of the plugin.
// If we get performance issues due to that we can try and optimize that.
func ParseTemplatedConfig[T any](data []byte, req pipeline.CapsuleRequest, steps ...ParseStep[T]) (T, error) {
if len(data) == 0 {
data = []byte("{}")
}
var config, empty T

values := map[string]any{}
for _, step := range steps {
name, obj, err := step(config, req)
m, err := step(config, req)
if err != nil {
return empty, err
}

result := map[string]interface{}{}
d, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{TagName: "json", Result: &result})
if err != nil {
return empty, err
for k, v := range m {
result := map[string]interface{}{}
d, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{TagName: "json", Result: &result})
if err != nil {
return empty, err
}
if err := d.Decode(v); err != nil {
return empty, err
}
values[k] = result
}

if err := d.Decode(obj); err != nil {
return empty, err
}
values[name] = result
t, err := template.New("config").Parse(string(data))
if err != nil {
return empty, err
Expand All @@ -56,10 +63,32 @@ func ParseTemplatedConfig[T any](data []byte, req pipeline.CapsuleRequest, steps
return config, nil
}

func CapsuleStep[T any](_ T, req pipeline.CapsuleRequest) (string, any, error) {
return "capsule", req.Capsule(), nil
func CapsuleStep[T any](_ T, req pipeline.CapsuleRequest) (map[string]any, error) {
c := req.Capsule()
extensions := map[string]any{}
for k, v := range c.Spec.Extensions {
vv := map[string]any{}
if err := json.Unmarshal(v, &vv); err != nil {
return nil, err
}
extensions[k] = vv
}
c.Spec.Extensions = nil
return map[string]any{
"capsule": c,
"capsuleExtensions": extensions,
}, nil
}

func LoadYAMLConfig(data []byte, out any) error {
return obj.Decode(data, out)
}

func ParseCapsuleTemplatedConfigToString[T any](data []byte, req pipeline.CapsuleRequest) (string, error) {
obj, err := ParseCapsuleTemplatedConfig[T](data, req)
if err != nil {
return "", err
}
bs, err := yaml.Marshal(obj)
return string(bs), err
}
57 changes: 57 additions & 0 deletions pkg/controller/plugin/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package plugin

import (
"context"
"encoding/json"
"testing"

"github.com/go-logr/logr"
"github.com/rigdev/rig/pkg/api/v1alpha2"
"github.com/rigdev/rig/pkg/pipeline"
"github.com/rigdev/rig/pkg/scheme"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func Test_ParseCapsuleTemplatedConfig(t *testing.T) {
name, namespace := "name", "namespace"
vm := scheme.NewVersionMapperFromScheme(scheme.New())
p := pipeline.NewCapsulePipeline(nil, scheme.New(), vm, logr.FromContextOrDiscard(context.Background()))

req := pipeline.NewCapsuleRequest(p, &v1alpha2.Capsule{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: v1alpha2.CapsuleSpec{
Extensions: map[string]json.RawMessage{
"ext": json.RawMessage(`{"field": "value"}`),
},
Scale: v1alpha2.CapsuleScale{
Horizontal: v1alpha2.HorizontalScale{
Instances: v1alpha2.Instances{
Min: 69,
},
},
},
},
}, nil)

s := `hej: asdf
hej2: {{ .capsuleExtensions.ext.field }}
hej3: {{ .capsule.spec.scale.horizontal.instances.min }}`
conf, err := ParseCapsuleTemplatedConfig[config]([]byte(s), req)
require.NoError(t, err)

require.Equal(t, config{
Hej: "asdf",
Hej2: "value",
Hej3: 69,
}, conf)
}

type config struct {
Hej string `json:"hej"`
Hej2 string `json:"hej2"`
Hej3 int `json:"hej3"`
}
5 changes: 4 additions & 1 deletion pkg/controller/plugin/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,9 @@ func (m *Manager) GetPlugins() []Info {
return plugins
}

func (m *Manager) NewStep(execCtx ExecutionContext, step v1alpha1.Step, logger logr.Logger) (*Step, error) {
func (m *Manager) NewStep(
execCtx ExecutionContext, step v1alpha1.Step, logger logr.Logger, name string,
) (*Step, error) {
var err error
var ps []*pluginExecutor
defer func() {
Expand Down Expand Up @@ -274,6 +276,7 @@ func (m *Manager) NewStep(execCtx ExecutionContext, step v1alpha1.Step, logger l
logger: logger,
plugins: ps,
matcher: matcher,
name: name,
}, nil
}

Expand Down
Loading