Skip to content
This repository was archived by the owner on Dec 11, 2023. It is now read-only.

Commit 9fcfb60

Browse files
author
odacremolbap
committed
support memorybrokers at triggers
1 parent 00c180f commit 9fcfb60

File tree

2 files changed

+79
-10
lines changed

2 files changed

+79
-10
lines changed

pkg/reconciler/trigger/controller.go

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ import (
1212
"k8s.io/client-go/tools/cache"
1313
"knative.dev/pkg/configmap"
1414
"knative.dev/pkg/controller"
15+
"knative.dev/pkg/kmeta"
1516
"knative.dev/pkg/logging"
1617
"knative.dev/pkg/resolver"
1718

1819
eventingv1alpha1 "github.com/triggermesh/triggermesh-core/pkg/apis/eventing/v1alpha1"
20+
mbinformer "github.com/triggermesh/triggermesh-core/pkg/client/generated/injection/informers/eventing/v1alpha1/memorybroker"
1921
rbinformer "github.com/triggermesh/triggermesh-core/pkg/client/generated/injection/informers/eventing/v1alpha1/redisbroker"
2022
tginformer "github.com/triggermesh/triggermesh-core/pkg/client/generated/injection/informers/eventing/v1alpha1/trigger"
2123
tgreconciler "github.com/triggermesh/triggermesh-core/pkg/client/generated/injection/reconciler/eventing/v1alpha1/trigger"
@@ -29,9 +31,11 @@ func NewController(
2931
) *controller.Impl {
3032
tgInformer := tginformer.Get(ctx)
3133
rbInformer := rbinformer.Get(ctx)
34+
mbInformer := mbinformer.Get(ctx)
3235

3336
r := &Reconciler{
3437
rbLister: rbInformer.Lister(),
38+
mbLister: mbInformer.Lister(),
3539
}
3640

3741
impl := tgreconciler.NewImpl(ctx, r)
@@ -40,22 +44,29 @@ func NewController(
4044

4145
tgInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
4246

43-
// Filter Redis brokers that are referenced by triggers.
47+
// Filter brokers that are referenced by triggers.
4448
filterBroker := func(obj interface{}) bool {
45-
// TODO other brokers should be supported.
49+
// TODO duck
50+
var accessor kmeta.OwnerRefableAccessor
4651
rb, ok := obj.(*eventingv1alpha1.RedisBroker)
4752
if !ok {
48-
return false
53+
mb, ok := obj.(*eventingv1alpha1.MemoryBroker)
54+
if !ok {
55+
return false
56+
}
57+
accessor = kmeta.OwnerRefableAccessor(mb)
58+
} else {
59+
accessor = kmeta.OwnerRefableAccessor(rb)
4960
}
5061

51-
tgl, err := tgInformer.Lister().Triggers(rb.Namespace).List(labels.Everything())
62+
tgl, err := tgInformer.Lister().Triggers(accessor.GetNamespace()).List(labels.Everything())
5263
if err != nil {
5364
logging.FromContext(ctx).Error("Unable to list Triggers", zap.Error(err))
5465
return false
5566
}
5667

5768
for _, tg := range tgl {
58-
if tg.ReferencesBroker(rb) {
69+
if tg.ReferencesBroker(accessor) {
5970
return true
6071
}
6172
}
@@ -64,20 +75,27 @@ func NewController(
6475
}
6576

6677
enqueueFromBroker := func(obj interface{}) {
67-
// TODO check GVK if other brokers are supported.
78+
// TODO duck
79+
var accessor kmeta.OwnerRefableAccessor
6880
rb, ok := obj.(*eventingv1alpha1.RedisBroker)
6981
if !ok {
70-
return
82+
mb, ok := obj.(*eventingv1alpha1.MemoryBroker)
83+
if !ok {
84+
return
85+
}
86+
accessor = kmeta.OwnerRefableAccessor(mb)
87+
} else {
88+
accessor = kmeta.OwnerRefableAccessor(rb)
7189
}
7290

73-
tgl, err := tgInformer.Lister().Triggers(rb.Namespace).List(labels.Everything())
91+
tgl, err := tgInformer.Lister().Triggers(accessor.GetNamespace()).List(labels.Everything())
7492
if err != nil {
7593
logging.FromContext(ctx).Error("Unable to list Triggers", zap.Error(err))
7694
return
7795
}
7896

7997
for _, tg := range tgl {
80-
if tg.ReferencesBroker(rb) {
98+
if tg.ReferencesBroker(accessor) {
8199
impl.EnqueueKey(types.NamespacedName{
82100
Name: tg.Name,
83101
Namespace: tg.Namespace,
@@ -91,5 +109,10 @@ func NewController(
91109
Handler: controller.HandleAll(enqueueFromBroker),
92110
})
93111

112+
mbInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
113+
FilterFunc: filterBroker,
114+
Handler: controller.HandleAll(enqueueFromBroker),
115+
})
116+
94117
return impl
95118
}

pkg/reconciler/trigger/reconciler.go

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ import (
2121
)
2222

2323
type Reconciler struct {
24+
// TODO duck brokers
2425
rbLister eventingv1alpha1listers.RedisBrokerLister
26+
mbLister eventingv1alpha1listers.MemoryBrokerLister
2527
uriResolver *resolver.URIResolver
2628
}
2729

@@ -40,7 +42,26 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1alpha1.Trig
4042
}
4143

4244
func (r *Reconciler) resolveBroker(ctx context.Context, t *eventingv1alpha1.Trigger) pkgreconciler.Event {
43-
// TODO, use any broker, not RedisBrokers
45+
// TODO duck
46+
// TODO move to webhook
47+
if t.Spec.Broker.Group != "" && t.Spec.Broker.Group != eventingv1alpha1.SchemeGroupVersion.Group {
48+
return controller.NewPermanentError(fmt.Errorf("not supported Broker Group %q", t.Spec.Broker.Group))
49+
}
50+
51+
var rb *eventingv1alpha1.RedisBroker
52+
if t.Spec.Broker.Kind == rb.GetGroupVersionKind().Kind {
53+
return r.resolveRedisBroker(ctx, t)
54+
}
55+
56+
var mb *eventingv1alpha1.MemoryBroker
57+
if t.Spec.Broker.Kind != mb.GetGroupVersionKind().Kind {
58+
return controller.NewPermanentError(fmt.Errorf("not supported Broker Kind %q", t.Spec.Broker.Kind))
59+
}
60+
61+
return r.resolveMemoryBroker(ctx, t)
62+
}
63+
64+
func (r *Reconciler) resolveRedisBroker(ctx context.Context, t *eventingv1alpha1.Trigger) pkgreconciler.Event {
4465
rb, err := r.rbLister.RedisBrokers(t.Namespace).Get(t.Spec.Broker.Name)
4566
if err != nil {
4667
if apierrs.IsNotFound(err) {
@@ -65,6 +86,31 @@ func (r *Reconciler) resolveBroker(ctx context.Context, t *eventingv1alpha1.Trig
6586
return nil
6687
}
6788

89+
func (r *Reconciler) resolveMemoryBroker(ctx context.Context, t *eventingv1alpha1.Trigger) pkgreconciler.Event {
90+
mb, err := r.mbLister.MemoryBrokers(t.Namespace).Get(t.Spec.Broker.Name)
91+
if err != nil {
92+
if apierrs.IsNotFound(err) {
93+
logging.FromContext(ctx).Errorw(fmt.Sprintf("Trigger %s/%s references non existing broker %q", t.Namespace, t.Name, t.Spec.Broker.Name))
94+
t.Status.MarkBrokerFailed(common.ReasonBrokerDoesNotExist, "Broker %q does not exist", t.Spec.Broker.Name)
95+
// No need to requeue, we will be notified when if broker is created.
96+
return controller.NewPermanentError(err)
97+
}
98+
99+
t.Status.MarkBrokerFailed(common.ReasonFailedBrokerGet, "Failed to get broker %q : %s", t.Spec.Broker, err)
100+
return pkgreconciler.NewEvent(corev1.EventTypeWarning, common.ReasonFailedBrokerGet,
101+
"Failed to get broker for trigger %s/%s: %w", t.Namespace, t.Name, err)
102+
}
103+
104+
t.Status.PropagateBrokerCondition(mb.Status.GetTopLevelCondition())
105+
106+
// No need to requeue, we'll get requeued when broker changes status.
107+
if !mb.IsReady() {
108+
logging.FromContext(ctx).Errorw(fmt.Sprintf("Trigger %s/%s references non ready broker %q", t.Namespace, t.Name, t.Spec.Broker.Name))
109+
}
110+
111+
return nil
112+
}
113+
68114
func (r *Reconciler) resolveTarget(ctx context.Context, t *eventingv1alpha1.Trigger) pkgreconciler.Event {
69115
if t.Spec.Target.Ref != nil && t.Spec.Target.Ref.Namespace == "" {
70116
// To call URIFromDestinationV1(ctx context.Context, dest v1.Destination, parent interface{}), dest.Ref must have a Namespace

0 commit comments

Comments
 (0)