From 914366c6e90bd3082760701fe0fc20d164ca154d Mon Sep 17 00:00:00 2001 From: George MacRorie Date: Sat, 22 Jul 2023 14:21:59 +0100 Subject: [PATCH] feat: add inmem fs and simple builtin controller --- pkg/api/core/resource.go | 8 +- pkg/api/server.go | 21 ++++- pkg/api/server_test.go | 81 +++++++++++++++++ {internal => pkg}/containers/option.go | 0 pkg/controller/fs.go | 16 +++- pkg/controllers/simple/simple.go | 118 +++++++++++++++++++++++++ pkg/encoding/decoding.go | 26 ++++++ pkg/encoding/encoding.go | 59 +++++++++++++ pkg/encoding/json.go | 27 ++++++ pkg/fs/mem/store.go | 74 ++++++++++++++++ 10 files changed, 423 insertions(+), 7 deletions(-) create mode 100644 pkg/api/server_test.go rename {internal => pkg}/containers/option.go (100%) create mode 100644 pkg/controllers/simple/simple.go create mode 100644 pkg/encoding/decoding.go create mode 100644 pkg/encoding/encoding.go create mode 100644 pkg/encoding/json.go create mode 100644 pkg/fs/mem/store.go diff --git a/pkg/api/core/resource.go b/pkg/api/core/resource.go index c06f73d..c9bf935 100644 --- a/pkg/api/core/resource.go +++ b/pkg/api/core/resource.go @@ -2,8 +2,6 @@ package core import ( "encoding/json" - - "github.com/xeipuuv/gojsonschema" ) // ResourceDefinition represents a definition of a particular resource Kind and its versions @@ -22,9 +20,9 @@ type Names struct { } type ResourceDefinitionSpec struct { - Group string `json:"group"` - Controller ResourceDefinitionController `json:"controller"` - Versions map[string]*gojsonschema.Schema `json:"schema"` + Group string `json:"group"` + Controller ResourceDefinitionController `json:"controller"` + Versions map[string]json.RawMessage `json:"schema,omitempty"` } type ResourceDefinitionController struct { diff --git a/pkg/api/server.go b/pkg/api/server.go index b3ef4e9..8aea1b7 100644 --- a/pkg/api/server.go +++ b/pkg/api/server.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net/http" + "path" "sort" "sync" @@ -36,6 +37,7 @@ type FilesystemStore interface { // Controller is the core controller interface for handling interactions with a // single resource type. type Controller interface { + Definition() *core.ResourceDefinition Get(context.Context, *controller.GetRequest) (*core.Resource, error) List(context.Context, *controller.ListRequest) ([]*core.Resource, error) Put(context.Context, *controller.PutRequest) error @@ -69,6 +71,7 @@ func NewServer(fs FilesystemStore) (*Server, error) { return s, nil } +// ServeHTTP delegates to the underlying chi.Mux router. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.mu.RLock() defer s.mu.RUnlock() @@ -76,10 +79,23 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.mux.ServeHTTP(w, r) } -func (s *Server) RegisterController(source string, def *core.ResourceDefinition, cntl Controller) { +func (s *Server) addDefinition(source string, gvk string, def *core.ResourceDefinition) { + src, ok := s.sources[source] + if !ok { + src = map[string]*core.ResourceDefinition{} + s.sources[source] = src + } + + src[gvk] = def +} + +// RegisterController adds a new controller and definition for a particular source to the server. +// This potentially will happen dynamically in the future, so it is guarded with a write lock. +func (s *Server) RegisterController(source string, cntl Controller) { s.mu.Lock() defer s.mu.Unlock() + def := cntl.Definition() for version := range def.Spec.Versions { var ( version = version @@ -87,6 +103,9 @@ func (s *Server) RegisterController(source string, def *core.ResourceDefinition, named = prefix + "/{name}" ) + // update sources map + s.addDefinition(source, path.Join(def.Spec.Group, version, def.Names.Kind), def) + // list kind s.mux.Get(prefix, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if err := s.fs.View(r.Context(), source, s.rev, func(f controller.FSConfig) error { diff --git a/pkg/api/server_test.go b/pkg/api/server_test.go new file mode 100644 index 0000000..8316eaf --- /dev/null +++ b/pkg/api/server_test.go @@ -0,0 +1,81 @@ +package api_test + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.flipt.io/cup/pkg/api" + "go.flipt.io/cup/pkg/api/core" + "go.flipt.io/cup/pkg/controllers/simple" + "go.flipt.io/cup/pkg/fs/mem" +) + +var testDef = &core.ResourceDefinition{ + APIVersion: "cup.flipt.io/v1alpha1", + Kind: "ResourceDefinition", + Metadata: core.Metadata{ + Name: "resources.test.cup.flipt.io", + }, + Names: core.Names{ + Kind: "Resource", + Singular: "resource", + Plural: "resources", + }, + Spec: core.ResourceDefinitionSpec{ + Group: "test.cup.flipt.io", + Controller: core.ResourceDefinitionController{}, + Versions: map[string]json.RawMessage{ + "v1alpha1": []byte("null"), + }, + }, +} + +func Test_Server_Source(t *testing.T) { + fss := mem.New() + server, err := api.NewServer(fss) + require.NoError(t, err) + + cntrl := simple.New(testDef) + server.RegisterController("cup", cntrl) + + srv := httptest.NewServer(server) + t.Cleanup(srv.Close) + + resp, err := http.Get(srv.URL + "/apis") + require.NoError(t, err) + + defer resp.Body.Close() + + var sources []string + require.NoError(t, json.NewDecoder(resp.Body).Decode(&sources)) + + assert.Equal(t, []string{"cup"}, sources) +} + +func Test_Server_SourceDefinitions(t *testing.T) { + fss := mem.New() + server, err := api.NewServer(fss) + require.NoError(t, err) + + cntrl := simple.New(testDef) + server.RegisterController("cup", cntrl) + + srv := httptest.NewServer(server) + t.Cleanup(srv.Close) + + resp, err := http.Get(srv.URL + "/apis/cup") + require.NoError(t, err) + + defer resp.Body.Close() + + var definitions map[string]*core.ResourceDefinition + require.NoError(t, json.NewDecoder(resp.Body).Decode(&definitions)) + + assert.Equal(t, map[string]*core.ResourceDefinition{ + "test.cup.flipt.io/v1alpha1/Resource": testDef, + }, definitions) +} diff --git a/internal/containers/option.go b/pkg/containers/option.go similarity index 100% rename from internal/containers/option.go rename to pkg/containers/option.go diff --git a/pkg/controller/fs.go b/pkg/controller/fs.go index 940efcc..4d306f1 100644 --- a/pkg/controller/fs.go +++ b/pkg/controller/fs.go @@ -1,6 +1,9 @@ package controller -import "io/fs" +import ( + "io/fs" + "os" +) // FSConfig encapsulates the configuration required to establish the root // directory of the wazero runtime when performing controller actions. @@ -21,3 +24,14 @@ func NewDirFSConfig(dir string) FSConfig { dir: &dir, } } + +// ToFS returns either the configured fs.FS implementation or it +// adapts the desired directory into an fs.FS using os.DirFS +// depending on how the config was configured +func (c *FSConfig) ToFS() fs.FS { + if c.dir != nil { + return os.DirFS(*c.dir) + } + + return c.fs +} diff --git a/pkg/controllers/simple/simple.go b/pkg/controllers/simple/simple.go new file mode 100644 index 0000000..4c5e9ae --- /dev/null +++ b/pkg/controllers/simple/simple.go @@ -0,0 +1,118 @@ +package simple + +import ( + "context" + "fmt" + "io" + "io/fs" + "path" + + "go.flipt.io/cup/pkg/api/core" + "go.flipt.io/cup/pkg/containers" + "go.flipt.io/cup/pkg/controller" + "go.flipt.io/cup/pkg/encoding" +) + +type ResourceEncoding interface { + Extension() string + NewEncoder(io.Writer) encoding.TypedEncoder[core.Resource] + NewDecoder(io.Reader) encoding.TypedDecoder[core.Resource] +} + +// Controller is mostly used for testing purposes (for now). +// It is a built-in controller implementation for cup. +// It simply organizes resources on the underlying filesystem by { namespace }/{ name } +// encoding them using the provided marshaller. +type Controller struct { + definition *core.ResourceDefinition + encoding ResourceEncoding +} + +// New constructs and configures a new *Controller. +// By default it uses a JSON encoding which can be overriden via WithResourceEncoding. +func New(def *core.ResourceDefinition, opts ...containers.Option[Controller]) *Controller { + controller := &Controller{ + definition: def, + encoding: encoding.NewJSONEncoding[core.Resource](), + } + + containers.ApplyAll(controller, opts...) + + return controller +} + +// WithResourceEncoding overrides the default resource encoding. +func WithResourceEncoding(e ResourceEncoding) containers.Option[Controller] { + return func(c *Controller) { + c.encoding = e + } +} + +// Definition returns the core resource definition handled by the Controller. +func (c *Controller) Definition() *core.ResourceDefinition { + return c.definition +} + +func (c *Controller) Get(_ context.Context, req *controller.GetRequest) (*core.Resource, error) { + fi, err := req.FSConfig.ToFS().Open(path.Join(req.Namespace, req.Name+"."+c.encoding.Extension())) + if err != nil { + return nil, fmt.Errorf("get: %w", err) + } + defer fi.Close() + + return c.encoding.NewDecoder(fi).Decode() +} + +// List finds all the resources on the provided FS in the folder { namespace } +// The result set is filtered by any specified labels. +func (c *Controller) List(_ context.Context, req *controller.ListRequest) (resources []*core.Resource, _ error) { + ffs := req.FSConfig.ToFS() + return resources, fs.WalkDir(req.FSConfig.ToFS(), req.Namespace, func(p string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + + if d.IsDir() { + return fs.SkipDir + } + + if ext := path.Ext(p); ext == "" || ext[1:] != c.encoding.Extension() { + // skip files without expected extension + return nil + } + + fi, err := ffs.Open(p) + if err != nil { + return err + } + + defer fi.Close() + + resource, err := c.encoding.NewDecoder(fi).Decode() + if err != nil { + return err + } + + for _, kv := range req.Labels { + // skip adding resource if any of the specified labels + // do not match as expected + if v, ok := resource.Metadata.Labels[kv[0]]; !ok || v != kv[1] { + return nil + } + } + + resources = append(resources, resource) + + return nil + }) +} + +// Put for now is a silent noop as we dont have a writable filesystem abstraction +func (c *Controller) Put(_ context.Context, _ *controller.PutRequest) error { + return nil +} + +// Delete for now is a silent noop as we dont have a writable filesystem abstraction +func (c *Controller) Delete(_ context.Context, _ *controller.DeleteRequest) error { + return nil +} diff --git a/pkg/encoding/decoding.go b/pkg/encoding/decoding.go new file mode 100644 index 0000000..019d556 --- /dev/null +++ b/pkg/encoding/decoding.go @@ -0,0 +1,26 @@ +package encoding + +import "io" + +type AnyDecoder interface { + Decode(any) error +} + +type DecodeBuilder interface { + NewDecoder(io.Reader) AnyDecoder +} + +type Decoder[B DecodeBuilder, T any] struct { + decoder AnyDecoder +} + +func NewDecoder[B DecodeBuilder, T any](r io.Reader) Decoder[B, T] { + var b DecodeBuilder + + return Decoder[B, T]{decoder: b.NewDecoder(r)} +} + +func (d Decoder[B, T]) Decode() (*T, error) { + var t T + return &t, d.decoder.Decode(&t) +} diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go new file mode 100644 index 0000000..3c8df61 --- /dev/null +++ b/pkg/encoding/encoding.go @@ -0,0 +1,59 @@ +package encoding + +import ( + "io" +) + +type AnyEncoder interface { + Encode(any) error +} + +type EncoderBuilder interface { + NewEncoder(io.Writer) AnyEncoder +} + +type Encoder[B EncoderBuilder, T any] struct { + encoder AnyEncoder +} + +func NewEncoder[B EncoderBuilder, T any](w io.Writer) Encoder[B, T] { + var b EncoderBuilder + + return Encoder[B, T]{encoder: b.NewEncoder(w)} +} + +func (e Encoder[B, T]) Encode(t *T) error { + return e.encoder.Encode(t) +} + +type AnyEncodingBuilder interface { + Extension() string + EncoderBuilder + DecodeBuilder +} + +type TypedEncoder[T any] interface { + Encode(*T) error +} + +type TypedDecoder[T any] interface { + Decode() (*T, error) +} + +type EncodingBuilder[B AnyEncodingBuilder, T any] struct { + b B + Encoder[B, T] + Decoder[B, T] +} + +func (e EncodingBuilder[B, T]) Extension() string { + return e.b.Extension() +} + +func (e EncodingBuilder[B, T]) NewEncoder(w io.Writer) TypedEncoder[T] { + return NewEncoder[B, T](w) +} + +func (e EncodingBuilder[B, T]) NewDecoder(r io.Reader) TypedDecoder[T] { + return NewDecoder[B, T](r) +} diff --git a/pkg/encoding/json.go b/pkg/encoding/json.go new file mode 100644 index 0000000..69b06e4 --- /dev/null +++ b/pkg/encoding/json.go @@ -0,0 +1,27 @@ +package encoding + +import ( + "encoding/json" + "io" +) + +var _ AnyEncodingBuilder = (*JSON)(nil) + +type JSON struct { + *json.Encoder + *json.Decoder +} + +func NewJSONEncoding[T any]() EncodingBuilder[JSON, T] { + return EncodingBuilder[JSON, T]{} +} + +func (j JSON) Extension() string { return "json" } + +func (j JSON) NewEncoder(w io.Writer) AnyEncoder { + return &JSON{Encoder: json.NewEncoder(w)} +} + +func (j JSON) NewDecoder(r io.Reader) AnyDecoder { + return &JSON{Decoder: json.NewDecoder(r)} +} diff --git a/pkg/fs/mem/store.go b/pkg/fs/mem/store.go new file mode 100644 index 0000000..af66df5 --- /dev/null +++ b/pkg/fs/mem/store.go @@ -0,0 +1,74 @@ +package mem + +import ( + "context" + "fmt" + "io/fs" + + "go.flipt.io/cup/pkg/api" + "go.flipt.io/cup/pkg/controller" +) + +// FilesystemStore is primarily used for testing. +// It approximates a real implementation using a set of fs.FS implementations. +// The implementations are indexed by source and revision internally. +// It does not truly support updates as of now as we have no way to virtualize +// a writeable FS for Wazero. It just supplies a read-only FS and the assumes +// no writes will be attempted. +// When the FS interface in Wazero is available we can change this behaviour. +type FilesystemStore struct { + store map[string]map[string]fs.FS +} + +// New constructs a new instance of FilesystemStore +func New() *FilesystemStore { + return &FilesystemStore{store: map[string]map[string]fs.FS{}} +} + +// AddFS registers a new fs.FS to be supplied on calls to View and Update +func (f *FilesystemStore) AddFS(source, revision string, ffs fs.FS) { + src, ok := f.store[source] + if !ok { + src = map[string]fs.FS{} + f.store[source] = src + } + + src[revision] = ffs +} + +// View invokes the provided function with an FSConfig which should enforce +// a read-only view for the requested source and revision +func (f *FilesystemStore) View(_ context.Context, source string, revision string, fn api.FSFunc) error { + fs, err := f.fs(source, revision) + if err != nil { + return fmt.Errorf("view: %w", err) + } + + return fn(controller.NewFSConfig(fs)) +} + +// Update invokes the provided function with an FSConfig which can be written to +// Any writes performed to the target during the execution of fn will be added, +// comitted, pushed and proposed for review on a target SCM +func (f *FilesystemStore) Update(_ context.Context, source string, revision string, fn api.FSFunc) (*api.Result, error) { + fs, err := f.fs(source, revision) + if err != nil { + return nil, fmt.Errorf("update: %w", err) + } + + return &api.Result{}, fn(controller.NewFSConfig(fs)) +} + +func (f *FilesystemStore) fs(source, revision string) (fs.FS, error) { + src, ok := f.store[source] + if !ok { + return nil, fmt.Errorf("source not found: %q", source) + } + + fs, ok := src[revision] + if !ok { + return nil, fmt.Errorf("revision not found: \"%s:%s\"", source, revision) + } + + return fs, nil +}