Skip to content

Commit

Permalink
♻️ Add Capsule as argument to WatchObjectStatus (#1306)
Browse files Browse the repository at this point in the history
  • Loading branch information
MatiasFrank authored Oct 30, 2024
1 parent c0a670c commit 1f7ab58
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 35 deletions.
4 changes: 2 additions & 2 deletions pkg/controller/capsule_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (r *CapsuleReconciler) initializeReconciler(ctx context.Context) {
}

for _, capsule := range capsules.Items {
r.ObjectStatusService.RegisterCapsule(capsule.GetNamespace(), capsule.GetName())
r.ObjectStatusService.RegisterCapsule(capsule.GetNamespace(), &capsule)
}

break
Expand Down Expand Up @@ -367,7 +367,7 @@ func (r *CapsuleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, nil
}

r.ObjectStatusService.RegisterCapsule(capsule.GetNamespace(), capsule.GetName())
r.ObjectStatusService.RegisterCapsule(capsule.GetNamespace(), capsule)

var options []pipeline.CapsuleRequestOption
if v, _ := strconv.ParseBool(capsule.Annotations[pipeline.AnnotationOverrideOwnership]); v {
Expand Down
14 changes: 6 additions & 8 deletions pkg/controller/plugin/external_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,10 @@ func (p *pluginExecutor) Run(ctx context.Context, req pipeline.CapsuleRequest, o

func (p *pluginExecutor) WatchObjectStatus(
ctx context.Context,
namespace string,
capsule string,
capsule *v1alpha2.Capsule,
callback pipeline.ObjectStatusCallback,
) error {
return p.pluginClient.WatchObjectStatus(ctx, namespace, capsule, callback, p.id)
return p.pluginClient.WatchObjectStatus(ctx, capsule, callback, p.id)
}

func (p *pluginExecutor) ComputeConfig(ctx context.Context, req pipeline.CapsuleRequest) (string, error) {
Expand Down Expand Up @@ -284,14 +283,13 @@ func (m *pluginClient) setupGRPCServer(req pipeline.CapsuleRequest) (*grpc.Serve

func (m *pluginClient) WatchObjectStatus(
ctx context.Context,
namespace string,
capsule string,
capsule *v1alpha2.Capsule,
callback pipeline.ObjectStatusCallback,
pluginID uuid.UUID,
) error {
c, err := m.client.WatchObjectStatus(ctx, &apiplugin.WatchObjectStatusRequest{
Namespace: namespace,
Capsule: capsule,
Namespace: capsule.GetNamespace(),
Capsule: capsule.GetName(),
})
if err != nil {
return err
Expand All @@ -303,7 +301,7 @@ func (m *pluginClient) WatchObjectStatus(
return err
}

callback.UpdateStatus(namespace, capsule, pluginID, res.GetChange())
callback.UpdateStatus(capsule.GetNamespace(), capsule.GetName(), pluginID, res.GetChange())
}
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/plugin/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/gobwas/glob"
"github.com/rigdev/rig-go-api/operator/api/v1/plugin"
"github.com/rigdev/rig/pkg/api/config/v1alpha1"
"github.com/rigdev/rig/pkg/api/v1alpha2"
"github.com/rigdev/rig/pkg/errors"
"github.com/rigdev/rig/pkg/pipeline"
"github.com/rigdev/rig/pkg/uuid"
Expand Down Expand Up @@ -57,14 +58,13 @@ func (s *Step) Apply(ctx context.Context, req pipeline.CapsuleRequest, opts pipe

func (s *Step) WatchObjectStatus(
ctx context.Context,
namespace string,
capsule string,
capsule *v1alpha2.Capsule,
callback pipeline.ObjectStatusCallback,
) error {
// TODO: We need annotations here.
if !s.matcher.Match(namespace, capsule, nil) {
if !s.matcher.Match(capsule.GetNamespace(), capsule.GetName(), capsule.GetAnnotations()) {
for _, p := range s.plugins {
callback.UpdateStatus(namespace, capsule, p.id, &plugin.ObjectStatusChange{
callback.UpdateStatus(capsule.GetNamespace(), capsule.GetName(), p.id, &plugin.ObjectStatusChange{
Change: &plugin.ObjectStatusChange_Checkpoint_{},
})
}
Expand All @@ -81,9 +81,9 @@ func (s *Step) WatchObjectStatus(
wg.Add(1)
go func(p *pluginExecutor) {
defer wg.Done()
err := p.WatchObjectStatus(ctx, namespace, capsule, callback)
err := p.WatchObjectStatus(ctx, capsule, callback)
if errors.IsUnimplemented(err) {
callback.UpdateStatus(namespace, capsule, p.id, &plugin.ObjectStatusChange{
callback.UpdateStatus(capsule.GetNamespace(), capsule.GetName(), p.id, &plugin.ObjectStatusChange{
Change: &plugin.ObjectStatusChange_Checkpoint_{},
})
} else if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/project_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ func (s namespaceStep) Apply(_ context.Context, req pipeline.ProjectEnvironmentR

func (s namespaceStep) WatchObjectStatus(
_ context.Context,
_ string,
_ string,
_ *v1alpha2.Capsule,
_ pipeline.ObjectStatusCallback,
) error {
return nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/pipeline/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

apiplugin "github.com/rigdev/rig-go-api/operator/api/v1/plugin"
"github.com/rigdev/rig/pkg/api/v1alpha2"
"github.com/rigdev/rig/pkg/uuid"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -19,7 +20,7 @@ type Options struct {
type Step[T Request] interface {
Name() string
Apply(ctx context.Context, req T, opts Options) error
WatchObjectStatus(ctx context.Context, namespace, capsule string, callback ObjectStatusCallback) error
WatchObjectStatus(ctx context.Context, capsule *v1alpha2.Capsule, callback ObjectStatusCallback) error
ComputeConfig(ctx context.Context, req T) StepConfigResult
PluginIDs() []uuid.UUID
}
26 changes: 14 additions & 12 deletions pkg/service/objectstatus/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
apipipeline "github.com/rigdev/rig-go-api/operator/api/v1/pipeline"
apiplugin "github.com/rigdev/rig-go-api/operator/api/v1/plugin"
"github.com/rigdev/rig/pkg/api/config/v1alpha1"
"github.com/rigdev/rig/pkg/api/v1alpha2"
"github.com/rigdev/rig/pkg/errors"
"github.com/rigdev/rig/pkg/pipeline"
svc_pipeline "github.com/rigdev/rig/pkg/service/pipeline"
Expand All @@ -25,7 +26,7 @@ type Service interface {
Watch(ctx context.Context, namespace string, c chan<- *apipipeline.ObjectStatusChange) error

CapsulesInitialized()
RegisterCapsule(namespace string, capsule string)
RegisterCapsule(namespace string, capsule *v1alpha2.Capsule)
UnregisterCapsule(namespace string, capsule string)

UpdateStatus(namespace string, capsule string, pluginID uuid.UUID, change *apiplugin.ObjectStatusChange)
Expand Down Expand Up @@ -56,25 +57,24 @@ type service struct {
initialized bool
}

func (s *service) runForCapsule(ctx context.Context, namespace string, c *capsuleCache) {
func (s *service) runForCapsule(ctx context.Context, c *capsuleCache) {
p := s.pipeline.GetDefaultPipeline()

for _, step := range p.Steps() {
for _, pluginID := range step.PluginIDs() {
c.plugins[pluginID] = false
}
go s.runStepForCapsule(ctx, namespace, c.capsule, step)
go s.runStepForCapsule(ctx, c.capsule, step)
}
}

func (s *service) runStepForCapsule(
ctx context.Context,
namespace string,
capsule string,
capsule *v1alpha2.Capsule,
step pipeline.Step[pipeline.CapsuleRequest],
) {
for {
if err := step.WatchObjectStatus(ctx, namespace, capsule, s); errors.IsCanceled(err) {
if err := step.WatchObjectStatus(ctx, capsule, s); errors.IsCanceled(err) {
return
} else if err != nil {
s.logger.Error(err, "error getting object status")
Expand Down Expand Up @@ -136,7 +136,7 @@ func (s *service) CapsulesInitialized() {
}
}

func (s *service) RegisterCapsule(namespace string, capsule string) {
func (s *service) RegisterCapsule(namespace string, capsule *v1alpha2.Capsule) {
s.lock.Lock()
defer s.lock.Unlock()

Expand All @@ -146,17 +146,19 @@ func (s *service) RegisterCapsule(namespace string, capsule string) {
s.capsules[namespace] = cs
}

if _, ok := cs[capsule]; !ok {
if _, ok := cs[capsule.GetName()]; ok {
cs[capsule.GetName()].capsule = capsule
} else {
ctx, cancel := context.WithCancel(context.Background())
c := &capsuleCache{
plugins: map[uuid.UUID]bool{},
capsule: capsule,
cancel: cancel,
objects: map[pipeline.ObjectKey]*objectCache{},
}
cs[capsule] = c
cs[capsule.GetName()] = c

s.runForCapsule(ctx, namespace, c)
s.runForCapsule(ctx, c)
}
}

Expand Down Expand Up @@ -274,7 +276,7 @@ type capsuleCache struct {
plugins map[uuid.UUID]bool

lock sync.RWMutex
capsule string
capsule *v1alpha2.Capsule
objects map[pipeline.ObjectKey]*objectCache
cancel context.CancelFunc
}
Expand All @@ -286,7 +288,7 @@ func (c *capsuleCache) getStatuses() []*apipipeline.ObjectStatusChange {
var res []*apipipeline.ObjectStatusChange
for _, object := range c.objects {
res = append(res, &apipipeline.ObjectStatusChange{
Capsule: c.capsule,
Capsule: c.capsule.GetName(),
Change: &apipipeline.ObjectStatusChange_Updated{
Updated: object.getStatus(),
},
Expand Down
11 changes: 7 additions & 4 deletions pkg/service/pipeline/capsule_extension_step.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/rigdev/rig/pkg/api/config/v1alpha1"
"github.com/rigdev/rig/pkg/api/v1alpha2"
"github.com/rigdev/rig/pkg/errors"
"github.com/rigdev/rig/pkg/pipeline"
"github.com/rigdev/rig/pkg/uuid"
Expand All @@ -30,10 +31,12 @@ func (s *CapsuleExtensionStep) Apply(ctx context.Context, req pipeline.CapsuleRe
}

func (s *CapsuleExtensionStep) WatchObjectStatus(
ctx context.Context, namespace, capsule string, callback pipeline.ObjectStatusCallback,
ctx context.Context, capsule *v1alpha2.Capsule, callback pipeline.ObjectStatusCallback,
) error {
// TODO: We want to opt out if not relevant for this capsule.
return s.step.WatchObjectStatus(ctx, namespace, capsule, callback)
if _, ok := capsule.Spec.Extensions[s.name]; !ok {
return nil
}
return s.step.WatchObjectStatus(ctx, capsule, callback)
}

func (s *CapsuleExtensionStep) PluginIDs() []uuid.UUID {
Expand Down Expand Up @@ -73,7 +76,7 @@ func (s *CapsuleExtensionValidationStep) Apply(
}

func (s *CapsuleExtensionValidationStep) WatchObjectStatus(
_ context.Context, _, _ string, _ pipeline.ObjectStatusCallback,
_ context.Context, _ *v1alpha2.Capsule, _ pipeline.ObjectStatusCallback,
) error {
return nil
}
Expand Down

0 comments on commit 1f7ab58

Please sign in to comment.