Skip to content
This repository was archived by the owner on Dec 11, 2023. It is now read-only.

Commit 90b63de

Browse files
author
odacremolbap
committed
reconcile redis services
1 parent ef83f01 commit 90b63de

File tree

9 files changed

+889
-36
lines changed

9 files changed

+889
-36
lines changed

pkg/apis/eventing/v1alpha1/redisbroker_lifecycle.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
package v1alpha1
55

66
import (
7+
"context"
78
"sync"
89

10+
appsv1 "k8s.io/api/apps/v1"
11+
corev1 "k8s.io/api/core/v1"
912
"k8s.io/apimachinery/pkg/runtime/schema"
10-
1113
"knative.dev/pkg/apis"
1214
duckv1 "knative.dev/pkg/apis/duck/v1"
1315
)
@@ -97,6 +99,39 @@ func (bs *RedisBrokerStatus) InitializeConditions() {
9799
bs.GetConditionSet().Manage(bs).InitializeConditions()
98100
}
99101

100-
func (bs *RedisBrokerStatus) MarkRedisBrokerFailed(reason, messageFormat string, messageA ...interface{}) {
102+
func (bs *RedisBrokerStatus) MarkRedisDeploymentFailed(reason, messageFormat string, messageA ...interface{}) {
101103
redisBrokerCondSet.Manage(bs).MarkFalse(RedisBrokerRedisDeployment, reason, messageFormat, messageA...)
102104
}
105+
106+
func (bs *RedisBrokerStatus) MarkRedisDeploymentUnknown(reason, messageFormat string, messageA ...interface{}) {
107+
redisBrokerCondSet.Manage(bs).MarkUnknown(RedisBrokerRedisDeployment, reason, messageFormat, messageA...)
108+
}
109+
110+
func (bs *RedisBrokerStatus) PropagateRedisDeploymentAvailability(ctx context.Context, ds *appsv1.DeploymentStatus) {
111+
for _, cond := range ds.Conditions {
112+
113+
if cond.Type == appsv1.DeploymentAvailable {
114+
switch cond.Status {
115+
case corev1.ConditionTrue:
116+
redisBrokerCondSet.Manage(bs).MarkTrue(RedisBrokerRedisDeployment)
117+
case corev1.ConditionFalse:
118+
bs.MarkRedisDeploymentFailed("RedisDeploymentFalse", "The status of Redis Deployment is False: %s : %s", cond.Reason, cond.Message)
119+
default:
120+
// expected corev1.ConditionUnknown
121+
bs.MarkRedisDeploymentUnknown("RedisDeploymentUnknown", "The status of Redis Deployment is Unknown: %s : %s", cond.Reason, cond.Message)
122+
}
123+
}
124+
}
125+
}
126+
127+
func (bs *RedisBrokerStatus) MarkRedisServiceFailed(reason, messageFormat string, messageA ...interface{}) {
128+
redisBrokerCondSet.Manage(bs).MarkFalse(RedisBrokerRedisService, reason, messageFormat, messageA...)
129+
}
130+
131+
func (bs *RedisBrokerStatus) MarkRedisServiceUnknown(reason, messageFormat string, messageA ...interface{}) {
132+
redisBrokerCondSet.Manage(bs).MarkUnknown(RedisBrokerRedisService, reason, messageFormat, messageA...)
133+
}
134+
135+
func (bs *RedisBrokerStatus) MarkRedisServiceReady() {
136+
redisBrokerCondSet.Manage(bs).MarkTrue(RedisBrokerRedisService)
137+
}

pkg/reconciler/events.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,16 @@ const (
1515
// ReasonFailedRBACUpdate = "FailedRBACUpdate"
1616

1717
ReasonDeploymentCreate = "CreateDeployment"
18-
ReasonDeploymentGet = "GetDeployment"
1918
ReasonDeploymentUpdate = "UpdateDeployment"
19+
ReasonFailedDeploymentGet = "FailedDeploymentGet"
2020
ReasonFailedDeploymentCreate = "FailedDeploymentCreate"
2121
ReasonFailedDeploymentUpdate = "FailedDeploymentUpdate"
2222

23-
// // ReasonServiceCreate indicates that an Service object was successfully created.
24-
// ReasonServiceCreate = "CreateService"
25-
// // ReasonServiceUpdate indicates that an Service object was successfully updated.
26-
// ReasonServiceUpdate = "UpdateService"
27-
// // ReasonFailedServiceCreate indicates that the creation of an Service object failed.
28-
// ReasonFailedServiceCreate = "FailedServiceCreate"
29-
// // ReasonFailedServiceUpdate indicates that the update of an Service object failed.
30-
// ReasonFailedServiceUpdate = "FailedServiceUpdate"
23+
ReasonServiceCreate = "CreateService"
24+
ReasonServiceUpdate = "UpdateService"
25+
ReasonFailedServiceGet = "FailedServiceGet"
26+
ReasonFailedServiceCreate = "FailedServiceCreate"
27+
ReasonFailedServiceUpdate = "FailedServiceUpdate"
3128

3229
// // ReasonBadSinkURI indicates that the URI of a sink can't be determined.
3330
// ReasonBadSinkURI = "BadSinkURI"

pkg/reconciler/redisbroker/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func NewController(
3535

3636
r := &Reconciler{
3737
kubeClientSet: kubeclient.Get(ctx),
38-
redisReconciler: newRedisReconciler(ctx, deploymentInformer.Lister()),
38+
redisReconciler: newRedisReconciler(ctx, deploymentInformer.Lister(), serviceInformer.Lister()),
3939
serviceLister: serviceInformer.Lister(),
4040
serviceAccountLister: serviceAccountInformer.Lister(),
4141
roleBindingLister: roleBindingInformer.Lister(),

pkg/reconciler/redisbroker/reconcile_redis.go

Lines changed: 124 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,55 +7,47 @@ import (
77
appsv1 "k8s.io/api/apps/v1"
88
corev1 "k8s.io/api/core/v1"
99
apierrs "k8s.io/apimachinery/pkg/api/errors"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1011
"k8s.io/apimachinery/pkg/types"
1112
"k8s.io/client-go/kubernetes"
1213
appsv1listers "k8s.io/client-go/listers/apps/v1"
14+
corev1listers "k8s.io/client-go/listers/core/v1"
1315
k8sclient "knative.dev/pkg/client/injection/kube/client"
1416
"knative.dev/pkg/logging"
1517
pkgreconciler "knative.dev/pkg/reconciler"
1618

1719
eventingv1alpha1 "github.com/triggermesh/triggermesh-core/pkg/apis/eventing/v1alpha1"
1820
"github.com/triggermesh/triggermesh-core/pkg/reconciler"
1921
"github.com/triggermesh/triggermesh-core/pkg/reconciler/resources"
22+
"github.com/triggermesh/triggermesh-core/pkg/reconciler/semantic"
2023
)
2124

2225
type redisReconciler struct {
2326
client kubernetes.Interface
2427
deploymentLister appsv1listers.DeploymentLister
28+
serviceLister corev1listers.ServiceLister
2529
}
2630

27-
func newRedisReconciler(ctx context.Context, deploymentLister appsv1listers.DeploymentLister) redisReconciler {
31+
func newRedisReconciler(ctx context.Context, deploymentLister appsv1listers.DeploymentLister, serviceLister corev1listers.ServiceLister) redisReconciler {
2832
return redisReconciler{
2933
client: k8sclient.Get(ctx),
3034
deploymentLister: deploymentLister,
35+
serviceLister: serviceLister,
3136
}
3237
}
3338

34-
func (r *redisReconciler) Reconcile(ctx context.Context, rb *eventingv1alpha1.RedisBroker) (*appsv1.Deployment, error) {
35-
36-
desired := buildRedisDeployment(rb)
37-
current, err := r.deploymentLister.Deployments(desired.Namespace).Get(desired.Name)
38-
switch {
39-
case err == nil:
40-
// TODO compare
41-
// TODO if equal return
42-
// continue
43-
case apierrs.IsNotFound(err):
44-
// continue
45-
default:
46-
fullname := types.NamespacedName{Namespace: desired.Namespace, Name: desired.Name}
47-
logging.FromContext(ctx).Error("Unable to get the deployment", zap.String("deployment", fullname.String()), zap.Error(err))
48-
rb.Status.MarkRedisBrokerFailed("DeploymentGetFailed", "Failed to get Redis deployment")
49-
50-
return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciler.ReasonDeploymentGet,
51-
"Failed to get Redis deployment %s: %w", fullname, err)
39+
func (r *redisReconciler) reconcile(ctx context.Context, rb *eventingv1alpha1.RedisBroker) (*appsv1.Deployment, *corev1.Service, error) {
40+
d, err := r.reconcileDeployment(ctx, rb)
41+
if err != nil {
42+
return nil, nil, err
5243
}
5344

54-
// Create
55-
56-
// TODO Update statuses
45+
svc, err := r.reconcileService(ctx, rb)
46+
if err != nil {
47+
return d, nil, err
48+
}
5749

58-
return current, nil
50+
return d, svc, nil
5951
}
6052

6153
func buildRedisDeployment(rb *eventingv1alpha1.RedisBroker) *appsv1.Deployment {
@@ -72,3 +64,112 @@ func buildRedisDeployment(rb *eventingv1alpha1.RedisBroker) *appsv1.Deployment {
7264
resources.ContainerAddEnvFromValue("REDIS_ARGS", "--appendonly yes"),
7365
resources.ContainerAddPort("redis", 6379)))))
7466
}
67+
68+
func (r *redisReconciler) reconcileDeployment(ctx context.Context, rb *eventingv1alpha1.RedisBroker) (*appsv1.Deployment, error) {
69+
desired := buildRedisDeployment(rb)
70+
current, err := r.deploymentLister.Deployments(desired.Namespace).Get(desired.Name)
71+
switch {
72+
case err == nil:
73+
// Compare current object with desired, update if needed.
74+
if !semantic.Semantic.DeepEqual(desired, current) {
75+
desired.Status = current.Status
76+
desired.ResourceVersion = current.ResourceVersion
77+
78+
current, err = r.client.AppsV1().Deployments(desired.Namespace).Update(ctx, desired, metav1.UpdateOptions{})
79+
if err != nil {
80+
fullname := types.NamespacedName{Namespace: desired.Namespace, Name: desired.Name}
81+
logging.FromContext(ctx).Error("Unable to update the deployment", zap.String("deployment", fullname.String()), zap.Error(err))
82+
rb.Status.MarkRedisDeploymentFailed(reconciler.ReasonFailedDeploymentUpdate, "Failed to update Redis deployment")
83+
84+
return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciler.ReasonFailedDeploymentUpdate,
85+
"Failed to get Redis deployment %s: %w", fullname, err)
86+
}
87+
}
88+
89+
case !apierrs.IsNotFound(err):
90+
// An error ocurred retrieving current deployment.
91+
fullname := types.NamespacedName{Namespace: desired.Namespace, Name: desired.Name}
92+
logging.FromContext(ctx).Error("Unable to get the deployment", zap.String("deployment", fullname.String()), zap.Error(err))
93+
rb.Status.MarkRedisDeploymentFailed(reconciler.ReasonFailedDeploymentGet, "Failed to get Redis deployment")
94+
95+
return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciler.ReasonFailedDeploymentGet,
96+
"Failed to get Redis deployment %s: %w", fullname, err)
97+
98+
default:
99+
// The deployment has not been found, create it.
100+
current, err = r.client.AppsV1().Deployments(desired.Namespace).Create(ctx, desired, metav1.CreateOptions{})
101+
if err != nil {
102+
fullname := types.NamespacedName{Namespace: desired.Namespace, Name: desired.Name}
103+
logging.FromContext(ctx).Error("Unable to create the deployment", zap.String("deployment", fullname.String()), zap.Error(err))
104+
rb.Status.MarkRedisDeploymentFailed(reconciler.ReasonFailedDeploymentCreate, "Failed to create Redis deployment")
105+
106+
return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciler.ReasonFailedDeploymentCreate,
107+
"Failed to create Redis deployment %s: %w", fullname, err)
108+
}
109+
}
110+
111+
// Update status based on deployment
112+
rb.Status.PropagateRedisDeploymentAvailability(ctx, &current.Status)
113+
114+
return current, nil
115+
}
116+
117+
func buildRedisService(rb *eventingv1alpha1.RedisBroker) *corev1.Service {
118+
return resources.NewService(rb.Namespace, rb.Name+"-redis-server",
119+
resources.ServiceWithMetaOptions(
120+
resources.MetaAddLabel("app", "redis-server"),
121+
resources.MetaAddLabel("eventing.triggermesh.io/redis-name", rb.Name+"-redis-server"),
122+
resources.MetaAddOwner(rb, rb.GetGroupVersionKind())),
123+
resources.ServiceSetType(corev1.ServiceTypeClusterIP),
124+
resources.ServiceAddSelectorLabel("eventing.triggermesh.io/redis-name", rb.Name+"-redis-server"),
125+
resources.ServiceAddPort("redis", 6379, 6379))
126+
}
127+
128+
func (r *redisReconciler) reconcileService(ctx context.Context, rb *eventingv1alpha1.RedisBroker) (*corev1.Service, error) {
129+
desired := buildRedisService(rb)
130+
current, err := r.serviceLister.Services(desired.Namespace).Get(desired.Name)
131+
switch {
132+
case err == nil:
133+
// Compare current object with desired, update if needed.
134+
if !semantic.Semantic.DeepEqual(desired, current) {
135+
desired.Status = current.Status
136+
desired.ResourceVersion = current.ResourceVersion
137+
138+
current, err = r.client.CoreV1().Services(desired.Namespace).Update(ctx, desired, metav1.UpdateOptions{})
139+
if err != nil {
140+
fullname := types.NamespacedName{Namespace: desired.Namespace, Name: desired.Name}
141+
logging.FromContext(ctx).Error("Unable to update the service", zap.String("service", fullname.String()), zap.Error(err))
142+
rb.Status.MarkRedisServiceFailed(reconciler.ReasonFailedServiceUpdate, "Failed to update Redis service")
143+
144+
return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciler.ReasonFailedServiceUpdate,
145+
"Failed to get Redis service %s: %w", fullname, err)
146+
}
147+
}
148+
149+
case !apierrs.IsNotFound(err):
150+
// An error ocurred retrieving current object.
151+
fullname := types.NamespacedName{Namespace: desired.Namespace, Name: desired.Name}
152+
logging.FromContext(ctx).Error("Unable to get the service", zap.String("service", fullname.String()), zap.Error(err))
153+
rb.Status.MarkRedisServiceFailed(reconciler.ReasonFailedServiceGet, "Failed to get Redis service")
154+
155+
return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciler.ReasonFailedServiceGet,
156+
"Failed to get Redis service %s: %w", fullname, err)
157+
158+
default:
159+
// The object has not been found, create it.
160+
current, err = r.client.CoreV1().Services(desired.Namespace).Create(ctx, desired, metav1.CreateOptions{})
161+
if err != nil {
162+
fullname := types.NamespacedName{Namespace: desired.Namespace, Name: desired.Name}
163+
logging.FromContext(ctx).Error("Unable to create the service", zap.String("service", fullname.String()), zap.Error(err))
164+
rb.Status.MarkRedisServiceFailed(reconciler.ReasonFailedServiceCreate, "Failed to create Redis service")
165+
166+
return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciler.ReasonFailedServiceCreate,
167+
"Failed to create Redis service %s: %w", fullname, err)
168+
}
169+
}
170+
171+
// Service exists and is up to date.
172+
rb.Status.MarkRedisServiceReady()
173+
174+
return current, nil
175+
}

pkg/reconciler/redisbroker/reconciler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, rb *eventingv1alpha1.Red
3333
// Iterate triggers and create secret
3434

3535
// Make sure the Redis deployment exists and propagate the status to the Channel
36-
_, err := r.redisReconciler.Reconcile(ctx, rb)
36+
_, _, err := r.redisReconciler.reconcile(ctx, rb)
3737
if err != nil {
3838
return err
3939
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright 2022 TriggerMesh Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package resources
5+
6+
import (
7+
corev1 "k8s.io/api/core/v1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/apimachinery/pkg/util/intstr"
10+
)
11+
12+
type ServiceOption func(*corev1.Service)
13+
14+
func NewService(namespace, name string, opts ...ServiceOption) *corev1.Service {
15+
meta := NewMeta(namespace, name)
16+
d := &corev1.Service{
17+
TypeMeta: metav1.TypeMeta{
18+
Kind: "Service",
19+
APIVersion: corev1.SchemeGroupVersion.String(),
20+
},
21+
ObjectMeta: *meta,
22+
}
23+
24+
for _, opt := range opts {
25+
opt(d)
26+
}
27+
28+
return d
29+
}
30+
31+
func ServiceWithMetaOptions(opts ...MetaOption) ServiceOption {
32+
return func(s *corev1.Service) {
33+
for _, opt := range opts {
34+
opt(&s.ObjectMeta)
35+
}
36+
}
37+
}
38+
39+
func ServiceAddSelectorLabel(key, value string) ServiceOption {
40+
return func(s *corev1.Service) {
41+
if s.Spec.Selector == nil {
42+
s.Spec.Selector = make(map[string]string, 1)
43+
}
44+
45+
s.Spec.Selector[key] = value
46+
}
47+
}
48+
49+
func ServiceAddPort(name string, port int32, targetPort int) ServiceOption {
50+
return func(s *corev1.Service) {
51+
if s.Spec.Ports == nil {
52+
s.Spec.Ports = make([]corev1.ServicePort, 0, 1)
53+
}
54+
55+
s.Spec.Ports = append(s.Spec.Ports, corev1.ServicePort{
56+
Name: name,
57+
Port: port,
58+
TargetPort: intstr.FromInt(targetPort),
59+
})
60+
}
61+
}
62+
63+
func ServiceSetType(st corev1.ServiceType) ServiceOption {
64+
return func(s *corev1.Service) {
65+
s.Spec.Type = st
66+
}
67+
}

0 commit comments

Comments
 (0)