Skip to content
This repository was archived by the owner on Dec 11, 2023. It is now read-only.

Commit ad93456

Browse files
author
odacremolbap
committed
add broker service endpoint reconcile
1 parent a084b70 commit ad93456

File tree

8 files changed

+116
-26
lines changed

8 files changed

+116
-26
lines changed

config/200-clusterroles.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,12 @@ rules:
3434
- delete
3535
- patch
3636

37-
# Manage broker services and configuration as secrets
37+
# Manage broker services, endpoints and secrets (for configuration)
3838
- apiGroups:
3939
- ''
4040
resources:
4141
- services
42+
- endpoints
4243
- secrets
4344
verbs:
4445
- get

pkg/apis/eventing/v1alpha1/redisbroker_lifecycle.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,26 @@ import (
1818
// RedisBrokerBroker refers to the TriggerMesh Broker that manages events on top of Redis.
1919

2020
const (
21-
RedisBrokerConditionReady = apis.ConditionReady
22-
RedisBrokerRedisDeployment apis.ConditionType = "RedisDeploymentReady"
23-
RedisBrokerRedisService apis.ConditionType = "RedisServiceReady"
24-
RedisBrokerBrokerDeployment apis.ConditionType = "BrokerDeploymentReady"
25-
RedisBrokerBrokerService apis.ConditionType = "BrokerServiceReady"
26-
RedisBrokerConfigSecret apis.ConditionType = "BrokerConfigSecretReady"
27-
RedisBrokerConditionAddressable apis.ConditionType = "Addressable"
21+
RedisBrokerConditionReady = apis.ConditionReady
22+
RedisBrokerRedisDeployment apis.ConditionType = "RedisDeploymentReady"
23+
RedisBrokerRedisService apis.ConditionType = "RedisServiceReady"
24+
RedisBrokerRedisServiceConditionEndpointsReady apis.ConditionType = "RedisEndpointsReady"
25+
RedisBrokerBrokerDeployment apis.ConditionType = "BrokerDeploymentReady"
26+
RedisBrokerBrokerService apis.ConditionType = "BrokerServiceReady"
27+
28+
// RedisBrokerBrokerServiceConditionEndpointsReady has status True when a k8s Service Endpoints
29+
// are backed by at least one endpoint.
30+
RedisBrokerBrokerServiceConditionEndpointsReady apis.ConditionType = "BrokerEndpointsReady"
31+
RedisBrokerConfigSecret apis.ConditionType = "BrokerConfigSecretReady"
32+
RedisBrokerConditionAddressable apis.ConditionType = "Addressable"
2833
)
2934

3035
var redisBrokerCondSet = apis.NewLivingConditionSet(
3136
RedisBrokerRedisDeployment,
3237
RedisBrokerRedisService,
3338
RedisBrokerBrokerDeployment,
3439
RedisBrokerBrokerService,
40+
RedisBrokerBrokerServiceConditionEndpointsReady,
3541
RedisBrokerConfigSecret,
3642

3743
// TODO RedisBrokerConditionAddressable,
@@ -149,8 +155,8 @@ func (bs *RedisBrokerStatus) MarkRedisServiceReady() {
149155
redisBrokerCondSet.Manage(bs).MarkTrue(RedisBrokerRedisService)
150156
}
151157

152-
// Manage Redis broker state for both
153-
// Service and Deployment
158+
// Manage Redis broker state for
159+
// Deployment, Service and Endpoint
154160

155161
func (bs *RedisBrokerStatus) MarkBrokerDeploymentFailed(reason, messageFormat string, messageA ...interface{}) {
156162
redisBrokerCondSet.Manage(bs).MarkFalse(RedisBrokerBrokerDeployment, reason, messageFormat, messageA...)
@@ -188,3 +194,15 @@ func (bs *RedisBrokerStatus) MarkBrokerServiceUnknown(reason, messageFormat stri
188194
func (bs *RedisBrokerStatus) MarkBrokerServiceReady() {
189195
redisBrokerCondSet.Manage(bs).MarkTrue(RedisBrokerBrokerService)
190196
}
197+
198+
func (bs *RedisBrokerStatus) MarkBrokerEndpointsFailed(reason, messageFormat string, messageA ...interface{}) {
199+
redisBrokerCondSet.Manage(bs).MarkFalse(RedisBrokerBrokerServiceConditionEndpointsReady, reason, messageFormat, messageA...)
200+
}
201+
202+
func (bs *RedisBrokerStatus) MarkBrokerEndpointsUnknown(reason, messageFormat string, messageA ...interface{}) {
203+
redisBrokerCondSet.Manage(bs).MarkUnknown(RedisBrokerBrokerServiceConditionEndpointsReady, reason, messageFormat, messageA...)
204+
}
205+
206+
func (bs *RedisBrokerStatus) MarkBrokerEndpointsTrue() {
207+
redisBrokerCondSet.Manage(bs).MarkTrue(RedisBrokerBrokerServiceConditionEndpointsReady)
208+
}

pkg/reconciler/events.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ const (
2828

2929
ReasonFailedTriggerList = "FailedTriggerList"
3030
ReasonFailedConfigSerialize = "FailedConfigSerialize"
31+
32+
ReasonUnavailableEndpoints = "UnavailableEndpoints"
33+
ReasonFailedEndpointsGet = "FailedEndpointsGet"
3134
// ReasonServiceUpdate = "UpdateService"
3235
// ReasonFailedServiceGet = "FailedServiceGet"
3336
// ReasonFailedServiceCreate = "FailedServiceCreate"

pkg/reconciler/redisbroker/controller.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@ import (
88

99
"github.com/kelseyhightower/envconfig"
1010
"go.uber.org/zap"
11+
12+
corev1 "k8s.io/api/core/v1"
1113
apierrs "k8s.io/apimachinery/pkg/api/errors"
1214
"k8s.io/apimachinery/pkg/types"
1315
"k8s.io/client-go/tools/cache"
1416
kubeclient "knative.dev/pkg/client/injection/kube/client"
1517
"knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
18+
endpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints"
1619
"knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
1720
"knative.dev/pkg/client/injection/kube/informers/core/v1/service"
1821
"knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount"
@@ -52,6 +55,7 @@ func NewController(
5255
secretInformer := secret.Get(ctx)
5356
deploymentInformer := deployment.Get(ctx)
5457
serviceInformer := service.Get(ctx)
58+
endpointsInformer := endpointsinformer.Get(ctx)
5559
serviceAccountInformer := serviceaccount.Get(ctx)
5660
// TODO rolebinding
5761
_ = rolebinding.Get(ctx)
@@ -60,7 +64,13 @@ func NewController(
6064
kubeClientSet: kubeclient.Get(ctx),
6165
secretReconciler: newSecretReconciler(ctx, secretInformer.Lister(), trgInformer.Lister()),
6266
redisReconciler: newRedisReconciler(ctx, deploymentInformer.Lister(), serviceInformer.Lister(), env.RedisImage),
63-
brokerReconciler: newBrokerReconciler(ctx, deploymentInformer.Lister(), serviceInformer.Lister(), env.BrokerImage),
67+
brokerReconciler: brokerReconciler{
68+
client: kubeclient.Get(ctx),
69+
deploymentLister: deploymentInformer.Lister(),
70+
serviceLister: serviceInformer.Lister(),
71+
endpointsLister: endpointsInformer.Lister(),
72+
image: env.BrokerImage,
73+
},
6474
}
6575

6676
impl := rbreconciler.NewImpl(ctx, r)
@@ -81,6 +91,32 @@ func NewController(
8191
FilterFunc: controller.FilterController(rb),
8292
Handler: controller.HandleAll(impl.EnqueueControllerOf),
8393
})
94+
endpointsInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
95+
FilterFunc: func(obj interface{}) bool {
96+
ep, ok := obj.(*corev1.Endpoints)
97+
if !ok || ep.Labels != nil || ep.Labels[appAnnotation] == appAnnotationValue {
98+
return false
99+
}
100+
101+
return true
102+
},
103+
Handler: controller.HandleAll(func(obj interface{}) {
104+
ep, ok := obj.(*corev1.Endpoints)
105+
if !ok {
106+
return
107+
}
108+
109+
svc, err := serviceInformer.Lister().Services(ep.Namespace).Get(ep.Name)
110+
if err != nil {
111+
// no matter the error, if we cannot retrieve the service we cannot
112+
// read the owner and enqueue the key.
113+
return
114+
}
115+
116+
impl.EnqueueControllerOf(svc)
117+
}),
118+
})
119+
84120
serviceAccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
85121
FilterFunc: controller.FilterController(rb),
86122
Handler: controller.HandleAll(impl.EnqueueControllerOf),

pkg/reconciler/redisbroker/reconcile_broker.go

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"path"
77

88
"go.uber.org/zap"
9+
910
appsv1 "k8s.io/api/apps/v1"
1011
corev1 "k8s.io/api/core/v1"
1112
apierrs "k8s.io/apimachinery/pkg/api/errors"
@@ -14,7 +15,7 @@ import (
1415
"k8s.io/client-go/kubernetes"
1516
appsv1listers "k8s.io/client-go/listers/apps/v1"
1617
corev1listers "k8s.io/client-go/listers/core/v1"
17-
k8sclient "knative.dev/pkg/client/injection/kube/client"
18+
"knative.dev/eventing/pkg/apis/duck"
1819
"knative.dev/pkg/logging"
1920
pkgreconciler "knative.dev/pkg/reconciler"
2021

@@ -39,18 +40,10 @@ type brokerReconciler struct {
3940
client kubernetes.Interface
4041
deploymentLister appsv1listers.DeploymentLister
4142
serviceLister corev1listers.ServiceLister
43+
endpointsLister corev1listers.EndpointsLister
4244
image string
4345
}
4446

45-
func newBrokerReconciler(ctx context.Context, deploymentLister appsv1listers.DeploymentLister, serviceLister corev1listers.ServiceLister, image string) brokerReconciler {
46-
return brokerReconciler{
47-
client: k8sclient.Get(ctx),
48-
deploymentLister: deploymentLister,
49-
serviceLister: serviceLister,
50-
image: image,
51-
}
52-
}
53-
5447
func (r *brokerReconciler) reconcile(ctx context.Context, rb *eventingv1alpha1.RedisBroker, redis *corev1.Service, secret *corev1.Secret) (*appsv1.Deployment, *corev1.Service, error) {
5548
d, err := r.reconcileDeployment(ctx, rb, redis, secret)
5649
if err != nil {
@@ -62,6 +55,12 @@ func (r *brokerReconciler) reconcile(ctx context.Context, rb *eventingv1alpha1.R
6255
return d, nil, err
6356
}
6457

58+
// update endpoint statuses
59+
_, err = r.reconcileEndpoints(ctx, svc, rb)
60+
if err != nil {
61+
return d, nil, err
62+
}
63+
6564
return d, svc, nil
6665
}
6766

@@ -76,7 +75,7 @@ func buildBrokerDeployment(rb *eventingv1alpha1.RedisBroker, redis *corev1.Servi
7675

7776
return resources.NewDeployment(rb.Namespace, rb.Name+"-"+brokerResourceSuffix,
7877
resources.DeploymentWithMetaOptions(
79-
resources.MetaAddLabel("app", appAnnotationValue),
78+
resources.MetaAddLabel(appAnnotation, appAnnotationValue),
8079
resources.MetaAddLabel("component", brokerResourceSuffix),
8180
resources.MetaAddLabel(resourceNameAnnotation, rb.Name+"-"+brokerResourceSuffix),
8281
resources.MetaAddOwner(rb, rb.GetGroupVersionKind())),
@@ -145,7 +144,7 @@ func (r *brokerReconciler) reconcileDeployment(ctx context.Context, rb *eventing
145144
func buildBrokerService(rb *eventingv1alpha1.RedisBroker) *corev1.Service {
146145
return resources.NewService(rb.Namespace, rb.Name+"-"+brokerResourceSuffix,
147146
resources.ServiceWithMetaOptions(
148-
resources.MetaAddLabel("app", appAnnotationValue),
147+
resources.MetaAddLabel(appAnnotation, appAnnotationValue),
149148
resources.MetaAddLabel("component", brokerResourceSuffix),
150149
resources.MetaAddLabel(resourceNameAnnotation, rb.Name+"-"+brokerResourceSuffix),
151150
resources.MetaAddOwner(rb, rb.GetGroupVersionKind())),
@@ -159,6 +158,7 @@ func (r *brokerReconciler) reconcileService(ctx context.Context, rb *eventingv1a
159158
current, err := r.serviceLister.Services(desired.Namespace).Get(desired.Name)
160159
switch {
161160
case err == nil:
161+
// Set Status
162162
// Compare current object with desired, update if needed.
163163
if !semantic.Semantic.DeepEqual(desired, current) {
164164
desired.Status = current.Status
@@ -202,3 +202,31 @@ func (r *brokerReconciler) reconcileService(ctx context.Context, rb *eventingv1a
202202

203203
return current, nil
204204
}
205+
206+
func (r *brokerReconciler) reconcileEndpoints(ctx context.Context, service *corev1.Service, rb *eventingv1alpha1.RedisBroker) (*corev1.Endpoints, error) {
207+
ep, err := r.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
208+
switch {
209+
case err == nil:
210+
if duck.EndpointsAreAvailable(ep) {
211+
rb.Status.MarkBrokerEndpointsTrue()
212+
return ep, nil
213+
}
214+
215+
rb.Status.MarkBrokerEndpointsFailed(reconciler.ReasonUnavailableEndpoints, "Endpoints for broker service are not available")
216+
return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciler.ReasonUnavailableEndpoints,
217+
"Endpoints for broker service are not available %s",
218+
types.NamespacedName{Namespace: ep.Namespace, Name: ep.Name})
219+
220+
case apierrs.IsNotFound(err):
221+
rb.Status.MarkBrokerEndpointsFailed(reconciler.ReasonUnavailableEndpoints, "Endpoints for broker service do not exist")
222+
return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciler.ReasonUnavailableEndpoints,
223+
"Endpoints for broker service do not exist %s",
224+
types.NamespacedName{Namespace: ep.Namespace, Name: ep.Name})
225+
}
226+
227+
fullname := types.NamespacedName{Namespace: ep.Namespace, Name: ep.Name}
228+
rb.Status.MarkBrokerEndpointsFailed(reconciler.ReasonFailedEndpointsGet, "Could not retrieve endpoints for broker service")
229+
logging.FromContext(ctx).Error("Unable to get the service endpoints", zap.String("endpoint", fullname.String()), zap.Error(err))
230+
return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciler.ReasonFailedEndpointsGet,
231+
"Failed to get broker service ednpoints %s: %w", fullname, err)
232+
}

pkg/reconciler/redisbroker/reconcile_redis.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type redisReconciler struct {
3030
client kubernetes.Interface
3131
deploymentLister appsv1listers.DeploymentLister
3232
serviceLister corev1listers.ServiceLister
33+
endpointsLister corev1listers.EndpointsLister
3334
image string
3435
}
3536

@@ -53,13 +54,15 @@ func (r *redisReconciler) reconcile(ctx context.Context, rb *eventingv1alpha1.Re
5354
return d, nil, err
5455
}
5556

57+
// update endpoint statuses
58+
5659
return d, svc, nil
5760
}
5861

5962
func buildRedisDeployment(rb *eventingv1alpha1.RedisBroker, image string) *appsv1.Deployment {
6063
return resources.NewDeployment(rb.Namespace, rb.Name+"-"+redisResourceSuffix,
6164
resources.DeploymentWithMetaOptions(
62-
resources.MetaAddLabel("app", appAnnotationValue),
65+
resources.MetaAddLabel(appAnnotation, appAnnotationValue),
6366
resources.MetaAddLabel("component", redisResourceSuffix),
6467
resources.MetaAddLabel(resourceNameAnnotation, rb.Name+"-"+redisResourceSuffix),
6568
resources.MetaAddOwner(rb, rb.GetGroupVersionKind())),
@@ -124,7 +127,7 @@ func (r *redisReconciler) reconcileDeployment(ctx context.Context, rb *eventingv
124127
func buildRedisService(rb *eventingv1alpha1.RedisBroker) *corev1.Service {
125128
return resources.NewService(rb.Namespace, rb.Name+"-"+redisResourceSuffix,
126129
resources.ServiceWithMetaOptions(
127-
resources.MetaAddLabel("app", appAnnotationValue),
130+
resources.MetaAddLabel(appAnnotation, appAnnotationValue),
128131
resources.MetaAddLabel("component", redisResourceSuffix),
129132
resources.MetaAddLabel(resourceNameAnnotation, rb.Name+"-"+redisResourceSuffix),
130133
resources.MetaAddOwner(rb, rb.GetGroupVersionKind())),

pkg/reconciler/redisbroker/reconcile_secret.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func (r *secretReconciler) buildConfigSecret(ctx context.Context, rb *eventingv1
165165

166166
return resources.NewSecret(rb.Namespace, rb.Name,
167167
resources.SecretWithMetaOptions(
168-
resources.MetaAddLabel("app", redisResourceSuffix),
168+
resources.MetaAddLabel(appAnnotation, redisResourceSuffix),
169169
resources.MetaAddLabel(resourceNameAnnotation, rb.Name+"-"+"redisbroker-config"),
170170
resources.MetaAddOwner(rb, rb.GetGroupVersionKind())),
171171
resources.SecretSetData(configSecretKey, b)), nil

pkg/reconciler/redisbroker/reconciler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
)
1717

1818
const (
19+
appAnnotation = "app"
1920
appAnnotationValue = "redisbroker"
2021
resourceNameAnnotation = "eventing.triggermesh.io/name"
2122
)

0 commit comments

Comments
 (0)