Skip to content
This repository was archived by the owner on Dec 11, 2023. It is now read-only.

Commit 7124a7c

Browse files
author
odacremolbap
committed
enqueue triggers to broker reconciler
1 parent 10d852c commit 7124a7c

File tree

6 files changed

+369
-26
lines changed

6 files changed

+369
-26
lines changed

config/300-trigger.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,9 @@ spec:
179179
uri:
180180
description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref.
181181
type: string
182-
required:
183-
- kind
184-
- name
182+
oneOf:
183+
- required: [ref]
184+
- required: [uri]
185185

186186
delivery:
187187
description: Delivery contains the delivery spec for this specific trigger.

pkg/reconciler/redisbroker/controller.go

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ package redisbroker
66
import (
77
"context"
88

9+
"go.uber.org/zap"
10+
apierrs "k8s.io/apimachinery/pkg/api/errors"
11+
"k8s.io/apimachinery/pkg/types"
912
"k8s.io/client-go/tools/cache"
1013
kubeclient "knative.dev/pkg/client/injection/kube/client"
1114
"knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
@@ -14,9 +17,11 @@ import (
1417
"knative.dev/pkg/client/injection/kube/informers/rbac/v1/rolebinding"
1518
"knative.dev/pkg/configmap"
1619
"knative.dev/pkg/controller"
20+
"knative.dev/pkg/logging"
1721

1822
eventingv1alpha1 "github.com/triggermesh/triggermesh-core/pkg/apis/eventing/v1alpha1"
1923
rbinformer "github.com/triggermesh/triggermesh-core/pkg/client/generated/injection/informers/eventing/v1alpha1/redisbroker"
24+
trginformer "github.com/triggermesh/triggermesh-core/pkg/client/generated/injection/informers/eventing/v1alpha1/trigger"
2025
rbreconciler "github.com/triggermesh/triggermesh-core/pkg/client/generated/injection/reconciler/eventing/v1alpha1/redisbroker"
2126
)
2227

@@ -28,36 +33,81 @@ func NewController(
2833
) *controller.Impl {
2934

3035
rbInformer := rbinformer.Get(ctx)
36+
trgInformer := trginformer.Get(ctx)
3137
deploymentInformer := deployment.Get(ctx)
3238
serviceInformer := service.Get(ctx)
3339
serviceAccountInformer := serviceaccount.Get(ctx)
34-
roleBindingInformer := rolebinding.Get(ctx)
40+
_ = rolebinding.Get(ctx)
3541

3642
r := &Reconciler{
37-
kubeClientSet: kubeclient.Get(ctx),
38-
redisReconciler: newRedisReconciler(ctx, deploymentInformer.Lister(), serviceInformer.Lister()),
39-
serviceLister: serviceInformer.Lister(),
40-
serviceAccountLister: serviceAccountInformer.Lister(),
41-
roleBindingLister: roleBindingInformer.Lister(),
43+
kubeClientSet: kubeclient.Get(ctx),
44+
redisReconciler: newRedisReconciler(ctx, deploymentInformer.Lister(), serviceInformer.Lister()),
45+
brokerReconciler: newBrokerReconciler(ctx, deploymentInformer.Lister(), serviceInformer.Lister()),
4246
}
4347

4448
impl := rbreconciler.NewImpl(ctx, r)
49+
rb := &eventingv1alpha1.RedisBroker{}
50+
gvk := rb.GetGroupVersionKind()
4551

4652
rbInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
4753

4854
deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
49-
FilterFunc: controller.FilterController(&eventingv1alpha1.RedisBroker{}),
55+
FilterFunc: controller.FilterController(rb),
5056
Handler: controller.HandleAll(impl.EnqueueControllerOf),
5157
})
5258

5359
serviceInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
54-
FilterFunc: controller.FilterController(&eventingv1alpha1.RedisBroker{}),
60+
FilterFunc: controller.FilterController(rb),
5561
Handler: controller.HandleAll(impl.EnqueueControllerOf),
5662
})
5763
serviceAccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
58-
FilterFunc: controller.FilterController(&eventingv1alpha1.RedisBroker{}),
64+
FilterFunc: controller.FilterController(rb),
5965
Handler: controller.HandleAll(impl.EnqueueControllerOf),
6066
})
6167

68+
filterTriggerForRedisBroker := func(obj interface{}) bool {
69+
t, ok := obj.(*eventingv1alpha1.Trigger)
70+
if !ok {
71+
return false
72+
}
73+
74+
// TODO replace with defaulting when webhook is implemented
75+
if !(t.Spec.Broker.Group == gvk.Group || t.Spec.Broker.Group == "") ||
76+
t.Spec.Broker.Kind != gvk.Kind {
77+
return false
78+
}
79+
80+
// TODO replace with broker namespace when webhook defaulting is implemented
81+
_, err := rbInformer.Lister().RedisBrokers(t.Namespace).Get(t.Spec.Broker.Name)
82+
switch {
83+
case err == nil:
84+
return true
85+
case !apierrs.IsNotFound(err):
86+
logging.FromContext(ctx).Error("Unable to get Redis Broker", zap.Any("broker", t.Spec.Broker), zap.Error(err))
87+
}
88+
89+
return false
90+
91+
}
92+
enqueueFromTrigger := func(obj interface{}) {
93+
t, ok := obj.(*eventingv1alpha1.Trigger)
94+
if !ok {
95+
return
96+
}
97+
98+
impl.EnqueueKey(types.NamespacedName{
99+
Name: t.Spec.Broker.Name,
100+
Namespace: t.Namespace,
101+
})
102+
}
103+
104+
// Filter triggers for redisbroker
105+
// enqueue at the broker
106+
107+
trgInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
108+
FilterFunc: filterTriggerForRedisBroker,
109+
Handler: controller.HandleAll(enqueueFromTrigger),
110+
})
111+
62112
return impl
63113
}
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package redisbroker
2+
3+
import (
4+
"context"
5+
6+
"go.uber.org/zap"
7+
appsv1 "k8s.io/api/apps/v1"
8+
corev1 "k8s.io/api/core/v1"
9+
apierrs "k8s.io/apimachinery/pkg/api/errors"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/types"
12+
"k8s.io/client-go/kubernetes"
13+
appsv1listers "k8s.io/client-go/listers/apps/v1"
14+
corev1listers "k8s.io/client-go/listers/core/v1"
15+
k8sclient "knative.dev/pkg/client/injection/kube/client"
16+
"knative.dev/pkg/logging"
17+
pkgreconciler "knative.dev/pkg/reconciler"
18+
19+
eventingv1alpha1 "github.com/triggermesh/triggermesh-core/pkg/apis/eventing/v1alpha1"
20+
"github.com/triggermesh/triggermesh-core/pkg/reconciler"
21+
"github.com/triggermesh/triggermesh-core/pkg/reconciler/resources"
22+
"github.com/triggermesh/triggermesh-core/pkg/reconciler/semantic"
23+
)
24+
25+
type brokerReconciler struct {
26+
client kubernetes.Interface
27+
deploymentLister appsv1listers.DeploymentLister
28+
serviceLister corev1listers.ServiceLister
29+
}
30+
31+
func newBrokerReconciler(ctx context.Context, deploymentLister appsv1listers.DeploymentLister, serviceLister corev1listers.ServiceLister) brokerReconciler {
32+
return brokerReconciler{
33+
client: k8sclient.Get(ctx),
34+
deploymentLister: deploymentLister,
35+
serviceLister: serviceLister,
36+
}
37+
}
38+
39+
func (r *brokerReconciler) reconcile(ctx context.Context, rb *eventingv1alpha1.RedisBroker) (*appsv1.Deployment, *corev1.Service, error) {
40+
d, err := r.reconcileDeployment(ctx, rb)
41+
if err != nil {
42+
return nil, nil, err
43+
}
44+
45+
svc, err := r.reconcileService(ctx, rb)
46+
if err != nil {
47+
return d, nil, err
48+
}
49+
50+
return d, svc, nil
51+
}
52+
53+
func buildBrokerDeployment(rb *eventingv1alpha1.RedisBroker) *appsv1.Deployment {
54+
return resources.NewDeployment(rb.Namespace, rb.Name+"-redis-server",
55+
resources.DeploymentWithMetaOptions(
56+
resources.MetaAddLabel("app", "redis-server"),
57+
resources.MetaAddLabel("eventing.triggermesh.io/redis-name", rb.Name+"-redis-server"),
58+
resources.MetaAddOwner(rb, rb.GetGroupVersionKind())),
59+
resources.DeploymentAddSelectorForTemplate("eventing.triggermesh.io/redis-name", rb.Name+"-redis-server"),
60+
resources.DeploymentSetReplicas(1),
61+
resources.DeploymentWithTemplateOption(
62+
resources.PodSpecAddContainer(
63+
resources.NewContainer("redis", "redis/redis-stack-server:latest",
64+
resources.ContainerAddEnvFromValue("REDIS_ARGS", "--appendonly yes"),
65+
resources.ContainerAddPort("redis", 6379)))))
66+
}
67+
68+
func (r *brokerReconciler) reconcileDeployment(ctx context.Context, rb *eventingv1alpha1.RedisBroker) (*appsv1.Deployment, error) {
69+
desired := buildBrokerDeployment(rb)
70+
current, err := r.deploymentLister.Deployments(desired.Namespace).Get(desired.Name)
71+
switch {
72+
case err == nil:
73+
// Compare current object with desired, update if needed.
74+
if !semantic.Semantic.DeepEqual(desired, current) {
75+
desired.Status = current.Status
76+
desired.ResourceVersion = current.ResourceVersion
77+
78+
current, err = r.client.AppsV1().Deployments(desired.Namespace).Update(ctx, desired, metav1.UpdateOptions{})
79+
if err != nil {
80+
fullname := types.NamespacedName{Namespace: desired.Namespace, Name: desired.Name}
81+
logging.FromContext(ctx).Error("Unable to update the deployment", zap.String("deployment", fullname.String()), zap.Error(err))
82+
rb.Status.MarkRedisDeploymentFailed(reconciler.ReasonFailedDeploymentUpdate, "Failed to update Redis deployment")
83+
84+
return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciler.ReasonFailedDeploymentUpdate,
85+
"Failed to get Redis deployment %s: %w", fullname, err)
86+
}
87+
}
88+
89+
case !apierrs.IsNotFound(err):
90+
// An error ocurred retrieving current deployment.
91+
fullname := types.NamespacedName{Namespace: desired.Namespace, Name: desired.Name}
92+
logging.FromContext(ctx).Error("Unable to get the deployment", zap.String("deployment", fullname.String()), zap.Error(err))
93+
rb.Status.MarkRedisDeploymentFailed(reconciler.ReasonFailedDeploymentGet, "Failed to get Redis deployment")
94+
95+
return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciler.ReasonFailedDeploymentGet,
96+
"Failed to get Redis deployment %s: %w", fullname, err)
97+
98+
default:
99+
// The deployment has not been found, create it.
100+
current, err = r.client.AppsV1().Deployments(desired.Namespace).Create(ctx, desired, metav1.CreateOptions{})
101+
if err != nil {
102+
fullname := types.NamespacedName{Namespace: desired.Namespace, Name: desired.Name}
103+
logging.FromContext(ctx).Error("Unable to create the deployment", zap.String("deployment", fullname.String()), zap.Error(err))
104+
rb.Status.MarkRedisDeploymentFailed(reconciler.ReasonFailedDeploymentCreate, "Failed to create Redis deployment")
105+
106+
return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciler.ReasonFailedDeploymentCreate,
107+
"Failed to create Redis deployment %s: %w", fullname, err)
108+
}
109+
}
110+
111+
// Update status based on deployment
112+
rb.Status.PropagateRedisDeploymentAvailability(ctx, &current.Status)
113+
114+
return current, nil
115+
}
116+
117+
func buildBrokerService(rb *eventingv1alpha1.RedisBroker) *corev1.Service {
118+
return resources.NewService(rb.Namespace, rb.Name+"-redis-server",
119+
resources.ServiceWithMetaOptions(
120+
resources.MetaAddLabel("app", "redis-server"),
121+
resources.MetaAddLabel("eventing.triggermesh.io/redis-name", rb.Name+"-redis-server"),
122+
resources.MetaAddOwner(rb, rb.GetGroupVersionKind())),
123+
resources.ServiceSetType(corev1.ServiceTypeClusterIP),
124+
resources.ServiceAddSelectorLabel("eventing.triggermesh.io/redis-name", rb.Name+"-redis-server"),
125+
resources.ServiceAddPort("redis", 6379, 6379))
126+
}
127+
128+
func (r *brokerReconciler) reconcileService(ctx context.Context, rb *eventingv1alpha1.RedisBroker) (*corev1.Service, error) {
129+
desired := buildBrokerService(rb)
130+
current, err := r.serviceLister.Services(desired.Namespace).Get(desired.Name)
131+
switch {
132+
case err == nil:
133+
// Compare current object with desired, update if needed.
134+
if !semantic.Semantic.DeepEqual(desired, current) {
135+
desired.Status = current.Status
136+
desired.ResourceVersion = current.ResourceVersion
137+
138+
current, err = r.client.CoreV1().Services(desired.Namespace).Update(ctx, desired, metav1.UpdateOptions{})
139+
if err != nil {
140+
fullname := types.NamespacedName{Namespace: desired.Namespace, Name: desired.Name}
141+
logging.FromContext(ctx).Error("Unable to update the service", zap.String("service", fullname.String()), zap.Error(err))
142+
rb.Status.MarkRedisServiceFailed(reconciler.ReasonFailedServiceUpdate, "Failed to update Redis service")
143+
144+
return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciler.ReasonFailedServiceUpdate,
145+
"Failed to get Redis service %s: %w", fullname, err)
146+
}
147+
}
148+
149+
case !apierrs.IsNotFound(err):
150+
// An error ocurred retrieving current object.
151+
fullname := types.NamespacedName{Namespace: desired.Namespace, Name: desired.Name}
152+
logging.FromContext(ctx).Error("Unable to get the service", zap.String("service", fullname.String()), zap.Error(err))
153+
rb.Status.MarkRedisServiceFailed(reconciler.ReasonFailedServiceGet, "Failed to get Redis service")
154+
155+
return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciler.ReasonFailedServiceGet,
156+
"Failed to get Redis service %s: %w", fullname, err)
157+
158+
default:
159+
// The object has not been found, create it.
160+
current, err = r.client.CoreV1().Services(desired.Namespace).Create(ctx, desired, metav1.CreateOptions{})
161+
if err != nil {
162+
fullname := types.NamespacedName{Namespace: desired.Namespace, Name: desired.Name}
163+
logging.FromContext(ctx).Error("Unable to create the service", zap.String("service", fullname.String()), zap.Error(err))
164+
rb.Status.MarkRedisServiceFailed(reconciler.ReasonFailedServiceCreate, "Failed to create Redis service")
165+
166+
return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciler.ReasonFailedServiceCreate,
167+
"Failed to create Redis service %s: %w", fullname, err)
168+
}
169+
}
170+
171+
// Service exists and is up to date.
172+
rb.Status.MarkRedisServiceReady()
173+
174+
return current, nil
175+
}

pkg/reconciler/redisbroker/reconciler.go

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ import (
88

99
"go.uber.org/zap"
1010
"k8s.io/client-go/kubernetes"
11-
corev1listers "k8s.io/client-go/listers/core/v1"
12-
rbacv1listers "k8s.io/client-go/listers/rbac/v1"
1311

1412
"knative.dev/pkg/logging"
1513
"knative.dev/pkg/reconciler"
@@ -18,31 +16,29 @@ import (
1816
)
1917

2018
type Reconciler struct {
21-
kubeClientSet kubernetes.Interface
22-
redisReconciler redisReconciler
23-
serviceLister corev1listers.ServiceLister
24-
serviceAccountLister corev1listers.ServiceAccountLister
25-
roleBindingLister rbacv1listers.RoleBindingLister
19+
kubeClientSet kubernetes.Interface
20+
redisReconciler redisReconciler
21+
brokerReconciler brokerReconciler
2622
}
2723

2824
func (r *Reconciler) ReconcileKind(ctx context.Context, rb *eventingv1alpha1.RedisBroker) reconciler.Event {
2925
logging.FromContext(ctx).Infow("Reconciling", zap.Any("Broker", *rb))
3026

3127
// Clean any dangling resources
3228

33-
// Iterate triggers and create secret
34-
35-
// Make sure the Redis deployment exists and propagate the status to the Channel
29+
// Make sure the Redis deployment and service exists.
3630
_, _, err := r.redisReconciler.reconcile(ctx, rb)
3731
if err != nil {
3832
return err
3933
}
4034

41-
// create service for redis
42-
43-
// create deployment for broker
35+
// Iterate triggers and create secret
4436

45-
// create service for broker
37+
// Make sure the Broker deployment for Redis exists and that it points to the Redis service.
38+
_, _, err = r.brokerReconciler.reconcile(ctx, rb)
39+
if err != nil {
40+
return err
41+
}
4642

4743
return nil
4844
}

pkg/reconciler/resources/secret.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright 2022 TriggerMesh Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package resources
5+
6+
import (
7+
corev1 "k8s.io/api/core/v1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
)
10+
11+
type SecretOption func(*corev1.Secret)
12+
13+
func NewSecret(namespace, name string, opts ...SecretOption) *corev1.Secret {
14+
meta := NewMeta(namespace, name)
15+
s := &corev1.Secret{
16+
TypeMeta: metav1.TypeMeta{
17+
Kind: "Secret",
18+
APIVersion: corev1.SchemeGroupVersion.String(),
19+
},
20+
ObjectMeta: *meta,
21+
Type: corev1.SecretTypeOpaque,
22+
}
23+
24+
for _, opt := range opts {
25+
opt(s)
26+
}
27+
28+
return s
29+
}
30+
31+
func SecretWithMetaOptions(opts ...MetaOption) SecretOption {
32+
return func(s *corev1.Secret) {
33+
for _, opt := range opts {
34+
opt(&s.ObjectMeta)
35+
}
36+
}
37+
}
38+
39+
func SecretSetData(key string, value []byte) SecretOption {
40+
return func(s *corev1.Secret) {
41+
if s.Data == nil {
42+
s.Data = make(map[string][]byte)
43+
}
44+
s.Data[key] = value
45+
}
46+
}

0 commit comments

Comments
 (0)