Skip to content

Commit e6d006b

Browse files
committed
Implement events and metrics helpers from runtime
This commit loosely implements the events and metrics helpers that have been newly introduced to `fluxcd/pkg/runtime`, and heavily reduces code duplication. It is in preparation of a much bigger overhaul to implement the work pending in fluxcd/pkg#101. While implementing, I ran into little annoyances that likely should be addressed before the "official" `runtime` MINOR release: - Passing `nil` every time there isn't any metadata for an event quickly becomes cumbersome; we should look into an `EventWithMetadata` and/or `EventfWithMetadata`, or some other way to _optionally_ provide metadata without annoying the consumer. - There is an inconsistency in the method names of the metric helper, i.e. `RecordReadinessMetric` vs `RecordSuspend`. We either need to append or remove the `Metric` suffix on all recording methods. Signed-off-by: Hidde Beydals <hello@hidde.co>
1 parent be5d10e commit e6d006b

File tree

8 files changed

+124
-399
lines changed

8 files changed

+124
-399
lines changed

controllers/bucket_controller.go

Lines changed: 17 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -26,26 +26,21 @@ import (
2626
"strings"
2727
"time"
2828

29-
"github.com/go-logr/logr"
3029
"github.com/minio/minio-go/v7"
3130
"github.com/minio/minio-go/v7/pkg/credentials"
3231
"github.com/minio/minio-go/v7/pkg/s3utils"
3332
corev1 "k8s.io/api/core/v1"
3433
apimeta "k8s.io/apimachinery/pkg/api/meta"
35-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36-
"k8s.io/apimachinery/pkg/runtime"
3734
"k8s.io/apimachinery/pkg/types"
38-
kuberecorder "k8s.io/client-go/tools/record"
39-
"k8s.io/client-go/tools/reference"
4035
ctrl "sigs.k8s.io/controller-runtime"
4136
"sigs.k8s.io/controller-runtime/pkg/client"
4237
"sigs.k8s.io/controller-runtime/pkg/controller"
4338
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
4439
"sigs.k8s.io/controller-runtime/pkg/predicate"
4540

4641
"github.com/fluxcd/pkg/apis/meta"
42+
helper "github.com/fluxcd/pkg/runtime/controller"
4743
"github.com/fluxcd/pkg/runtime/events"
48-
"github.com/fluxcd/pkg/runtime/metrics"
4944
"github.com/fluxcd/pkg/runtime/predicates"
5045

5146
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
@@ -60,11 +55,10 @@ import (
6055
// BucketReconciler reconciles a Bucket object
6156
type BucketReconciler struct {
6257
client.Client
63-
Scheme *runtime.Scheme
64-
Storage *Storage
65-
EventRecorder kuberecorder.EventRecorder
66-
ExternalEventRecorder *events.Recorder
67-
MetricsRecorder *metrics.Recorder
58+
helper.Events
59+
helper.Metrics
60+
61+
Storage *Storage
6862
}
6963

7064
type BucketReconcilerOptions struct {
@@ -93,7 +87,7 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
9387
}
9488

9589
// Record suspended status metric
96-
defer r.recordSuspension(ctx, bucket)
90+
defer r.Metrics.RecordSuspend(ctx, &bucket, bucket.Spec.Suspend)
9791

9892
// Add our finalizer if it does not exist
9993
if !controllerutil.ContainsFinalizer(&bucket, sourcev1.SourceFinalizer) {
@@ -116,13 +110,7 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
116110
}
117111

118112
// record reconciliation duration
119-
if r.MetricsRecorder != nil {
120-
objRef, err := reference.GetReference(r.Scheme, &bucket)
121-
if err != nil {
122-
return ctrl.Result{}, err
123-
}
124-
defer r.MetricsRecorder.RecordDuration(*objRef, start)
125-
}
113+
defer r.Metrics.RecordDuration(ctx, &bucket, start)
126114

127115
// set initial status
128116
if resetBucket, ok := r.resetStatus(bucket); ok {
@@ -131,7 +119,7 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
131119
log.Error(err, "unable to update status")
132120
return ctrl.Result{Requeue: true}, err
133121
}
134-
r.recordReadiness(ctx, bucket)
122+
r.Metrics.RecordReadinessMetric(ctx, &bucket)
135123
}
136124

137125
// record the value of the reconciliation request, if any
@@ -157,16 +145,18 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
157145

158146
// if reconciliation failed, record the failure and requeue immediately
159147
if reconcileErr != nil {
160-
r.event(ctx, reconciledBucket, events.EventSeverityError, reconcileErr.Error())
161-
r.recordReadiness(ctx, reconciledBucket)
148+
r.Events.Event(ctx, &reconciledBucket, nil, events.EventSeverityError, "ReconciliationFailed", reconcileErr.Error())
149+
r.Metrics.RecordReadinessMetric(ctx, &bucket)
162150
return ctrl.Result{Requeue: true}, reconcileErr
163151
}
164152

165153
// emit revision change event
166154
if bucket.Status.Artifact == nil || reconciledBucket.Status.Artifact.Revision != bucket.Status.Artifact.Revision {
167-
r.event(ctx, reconciledBucket, events.EventSeverityInfo, sourcev1.BucketReadyMessage(reconciledBucket))
155+
r.Events.Event(ctx, &reconciledBucket, map[string]string{
156+
"revision": reconciledBucket.GetArtifact().Revision,
157+
}, events.EventSeverityInfo, "NewRevision", sourcev1.BucketReadyMessage(reconciledBucket))
168158
}
169-
r.recordReadiness(ctx, reconciledBucket)
159+
r.Metrics.RecordReadinessMetric(ctx, &bucket)
170160

171161
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
172162
time.Now().Sub(start).String(),
@@ -297,14 +287,14 @@ func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket
297287

298288
func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) {
299289
if err := r.gc(bucket); err != nil {
300-
r.event(ctx, bucket, events.EventSeverityError,
301-
fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
290+
r.Events.Eventf(ctx, &bucket, nil, events.EventSeverityError, "GarbageCollectionFailed",
291+
"garbage collection for deleted resource failed: %s", err.Error())
302292
// Return the error so we retry the failed garbage collection
303293
return ctrl.Result{}, err
304294
}
305295

306296
// Record deleted status
307-
r.recordReadiness(ctx, bucket)
297+
r.Metrics.RecordReadinessMetric(ctx, &bucket)
308298

309299
// Remove our finalizer from the list and update it
310300
controllerutil.RemoveFinalizer(&bucket, sourcev1.SourceFinalizer)
@@ -413,65 +403,6 @@ func (r *BucketReconciler) gc(bucket sourcev1.Bucket) error {
413403
return nil
414404
}
415405

416-
// event emits a Kubernetes event and forwards the event to notification controller if configured
417-
func (r *BucketReconciler) event(ctx context.Context, bucket sourcev1.Bucket, severity, msg string) {
418-
log := logr.FromContext(ctx)
419-
if r.EventRecorder != nil {
420-
r.EventRecorder.Eventf(&bucket, "Normal", severity, msg)
421-
}
422-
if r.ExternalEventRecorder != nil {
423-
objRef, err := reference.GetReference(r.Scheme, &bucket)
424-
if err != nil {
425-
log.Error(err, "unable to send event")
426-
return
427-
}
428-
429-
if err := r.ExternalEventRecorder.Eventf(*objRef, nil, severity, severity, msg); err != nil {
430-
log.Error(err, "unable to send event")
431-
return
432-
}
433-
}
434-
}
435-
436-
func (r *BucketReconciler) recordReadiness(ctx context.Context, bucket sourcev1.Bucket) {
437-
log := logr.FromContext(ctx)
438-
if r.MetricsRecorder == nil {
439-
return
440-
}
441-
objRef, err := reference.GetReference(r.Scheme, &bucket)
442-
if err != nil {
443-
log.Error(err, "unable to record readiness metric")
444-
return
445-
}
446-
if rc := apimeta.FindStatusCondition(bucket.Status.Conditions, meta.ReadyCondition); rc != nil {
447-
r.MetricsRecorder.RecordCondition(*objRef, *rc, !bucket.DeletionTimestamp.IsZero())
448-
} else {
449-
r.MetricsRecorder.RecordCondition(*objRef, metav1.Condition{
450-
Type: meta.ReadyCondition,
451-
Status: metav1.ConditionUnknown,
452-
}, !bucket.DeletionTimestamp.IsZero())
453-
}
454-
}
455-
456-
func (r *BucketReconciler) recordSuspension(ctx context.Context, bucket sourcev1.Bucket) {
457-
if r.MetricsRecorder == nil {
458-
return
459-
}
460-
log := logr.FromContext(ctx)
461-
462-
objRef, err := reference.GetReference(r.Scheme, &bucket)
463-
if err != nil {
464-
log.Error(err, "unable to record suspended metric")
465-
return
466-
}
467-
468-
if !bucket.DeletionTimestamp.IsZero() {
469-
r.MetricsRecorder.RecordSuspend(*objRef, false)
470-
} else {
471-
r.MetricsRecorder.RecordSuspend(*objRef, bucket.Spec.Suspend)
472-
}
473-
}
474-
475406
func (r *BucketReconciler) updateStatus(ctx context.Context, req ctrl.Request, newStatus sourcev1.BucketStatus) error {
476407
var bucket sourcev1.Bucket
477408
if err := r.Get(ctx, req.NamespacedName, &bucket); err != nil {

controllers/gitrepository_controller.go

Lines changed: 22 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,7 @@ import (
2929
"github.com/go-logr/logr"
3030
corev1 "k8s.io/api/core/v1"
3131
apimeta "k8s.io/apimachinery/pkg/api/meta"
32-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33-
"k8s.io/apimachinery/pkg/runtime"
3432
"k8s.io/apimachinery/pkg/types"
35-
kuberecorder "k8s.io/client-go/tools/record"
36-
"k8s.io/client-go/tools/reference"
3733
ctrl "sigs.k8s.io/controller-runtime"
3834
"sigs.k8s.io/controller-runtime/pkg/builder"
3935
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -42,8 +38,8 @@ import (
4238
"sigs.k8s.io/controller-runtime/pkg/predicate"
4339

4440
"github.com/fluxcd/pkg/apis/meta"
41+
helper "github.com/fluxcd/pkg/runtime/controller"
4542
"github.com/fluxcd/pkg/runtime/events"
46-
"github.com/fluxcd/pkg/runtime/metrics"
4743
"github.com/fluxcd/pkg/runtime/predicates"
4844

4945
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
@@ -60,12 +56,12 @@ import (
6056
// GitRepositoryReconciler reconciles a GitRepository object
6157
type GitRepositoryReconciler struct {
6258
client.Client
63-
requeueDependency time.Duration
64-
Scheme *runtime.Scheme
65-
Storage *Storage
66-
EventRecorder kuberecorder.EventRecorder
67-
ExternalEventRecorder *events.Recorder
68-
MetricsRecorder *metrics.Recorder
59+
helper.Events
60+
helper.Metrics
61+
62+
Storage *Storage
63+
64+
requeueDependency time.Duration
6965
}
7066

7167
type GitRepositoryReconcilerOptions struct {
@@ -98,7 +94,7 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
9894
}
9995

10096
// Record suspended status metric
101-
defer r.recordSuspension(ctx, repository)
97+
defer r.RecordSuspend(ctx, &repository, repository.Spec.Suspend)
10298

10399
// Add our finalizer if it does not exist
104100
if !controllerutil.ContainsFinalizer(&repository, sourcev1.SourceFinalizer) {
@@ -132,21 +128,15 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
132128
// instead we requeue on a fix interval.
133129
msg := fmt.Sprintf("Dependencies do not meet ready condition, retrying in %s", r.requeueDependency.String())
134130
log.Info(msg)
135-
r.event(ctx, repository, events.EventSeverityInfo, msg)
136-
r.recordReadiness(ctx, repository)
131+
r.Events.Event(ctx, &repository, nil, events.EventSeverityInfo, "DependencyNotReady", msg)
132+
r.Metrics.RecordReadinessMetric(ctx, &repository)
137133
return ctrl.Result{RequeueAfter: r.requeueDependency}, nil
138134
}
139135
log.Info("All dependencies area ready, proceeding with reconciliation")
140136
}
141137

142138
// record reconciliation duration
143-
if r.MetricsRecorder != nil {
144-
objRef, err := reference.GetReference(r.Scheme, &repository)
145-
if err != nil {
146-
return ctrl.Result{}, err
147-
}
148-
defer r.MetricsRecorder.RecordDuration(*objRef, start)
149-
}
139+
defer r.Metrics.RecordDuration(ctx, &repository, start)
150140

151141
// set initial status
152142
if resetRepository, ok := r.resetStatus(repository); ok {
@@ -155,7 +145,7 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
155145
log.Error(err, "unable to update status")
156146
return ctrl.Result{Requeue: true}, err
157147
}
158-
r.recordReadiness(ctx, repository)
148+
r.Metrics.RecordReadinessMetric(ctx, &repository)
159149
}
160150

161151
// record the value of the reconciliation request, if any
@@ -181,16 +171,18 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
181171

182172
// if reconciliation failed, record the failure and requeue immediately
183173
if reconcileErr != nil {
184-
r.event(ctx, reconciledRepository, events.EventSeverityError, reconcileErr.Error())
185-
r.recordReadiness(ctx, reconciledRepository)
174+
r.Events.Event(ctx, &reconciledRepository, nil, events.EventSeverityError, "ReconciliationFailed", reconcileErr.Error())
175+
r.Metrics.RecordReadinessMetric(ctx, &reconciledRepository)
186176
return ctrl.Result{Requeue: true}, reconcileErr
187177
}
188178

189179
// emit revision change event
190-
if repository.Status.Artifact == nil || reconciledRepository.Status.Artifact.Revision != repository.Status.Artifact.Revision {
191-
r.event(ctx, reconciledRepository, events.EventSeverityInfo, sourcev1.GitRepositoryReadyMessage(reconciledRepository))
180+
if repository.GetArtifact() == nil || reconciledRepository.GetArtifact().Revision != repository.GetArtifact().Revision {
181+
r.Events.Event(ctx, &reconciledRepository, map[string]string{
182+
"revision": reconciledRepository.GetArtifact().Revision,
183+
}, events.EventSeverityInfo, "NewRevision", sourcev1.GitRepositoryReadyMessage(reconciledRepository))
192184
}
193-
r.recordReadiness(ctx, reconciledRepository)
185+
r.Metrics.RecordReadinessMetric(ctx, &reconciledRepository)
194186

195187
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
196188
time.Now().Sub(start).String(),
@@ -376,14 +368,14 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, repository sour
376368

377369
func (r *GitRepositoryReconciler) reconcileDelete(ctx context.Context, repository sourcev1.GitRepository) (ctrl.Result, error) {
378370
if err := r.gc(repository); err != nil {
379-
r.event(ctx, repository, events.EventSeverityError,
380-
fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
371+
r.Events.Eventf(ctx, &repository, nil, events.EventSeverityError, "GarbageCollectionFailed",
372+
"garbage collection for deleted resource failed: %s", err.Error())
381373
// Return the error so we retry the failed garbage collection
382374
return ctrl.Result{}, err
383375
}
384376

385377
// Record deleted status
386-
r.recordReadiness(ctx, repository)
378+
r.Metrics.RecordReadinessMetric(ctx, &repository)
387379

388380
// Remove our finalizer from the list and update it
389381
controllerutil.RemoveFinalizer(&repository, sourcev1.SourceFinalizer)
@@ -424,66 +416,6 @@ func (r *GitRepositoryReconciler) gc(repository sourcev1.GitRepository) error {
424416
return nil
425417
}
426418

427-
// event emits a Kubernetes event and forwards the event to notification controller if configured
428-
func (r *GitRepositoryReconciler) event(ctx context.Context, repository sourcev1.GitRepository, severity, msg string) {
429-
log := logr.FromContext(ctx)
430-
431-
if r.EventRecorder != nil {
432-
r.EventRecorder.Eventf(&repository, "Normal", severity, msg)
433-
}
434-
if r.ExternalEventRecorder != nil {
435-
objRef, err := reference.GetReference(r.Scheme, &repository)
436-
if err != nil {
437-
log.Error(err, "unable to send event")
438-
return
439-
}
440-
441-
if err := r.ExternalEventRecorder.Eventf(*objRef, nil, severity, severity, msg); err != nil {
442-
log.Error(err, "unable to send event")
443-
return
444-
}
445-
}
446-
}
447-
448-
func (r *GitRepositoryReconciler) recordReadiness(ctx context.Context, repository sourcev1.GitRepository) {
449-
log := logr.FromContext(ctx)
450-
if r.MetricsRecorder == nil {
451-
return
452-
}
453-
objRef, err := reference.GetReference(r.Scheme, &repository)
454-
if err != nil {
455-
log.Error(err, "unable to record readiness metric")
456-
return
457-
}
458-
if rc := apimeta.FindStatusCondition(repository.Status.Conditions, meta.ReadyCondition); rc != nil {
459-
r.MetricsRecorder.RecordCondition(*objRef, *rc, !repository.DeletionTimestamp.IsZero())
460-
} else {
461-
r.MetricsRecorder.RecordCondition(*objRef, metav1.Condition{
462-
Type: meta.ReadyCondition,
463-
Status: metav1.ConditionUnknown,
464-
}, !repository.DeletionTimestamp.IsZero())
465-
}
466-
}
467-
468-
func (r *GitRepositoryReconciler) recordSuspension(ctx context.Context, gitrepository sourcev1.GitRepository) {
469-
if r.MetricsRecorder == nil {
470-
return
471-
}
472-
log := logr.FromContext(ctx)
473-
474-
objRef, err := reference.GetReference(r.Scheme, &gitrepository)
475-
if err != nil {
476-
log.Error(err, "unable to record suspended metric")
477-
return
478-
}
479-
480-
if !gitrepository.DeletionTimestamp.IsZero() {
481-
r.MetricsRecorder.RecordSuspend(*objRef, false)
482-
} else {
483-
r.MetricsRecorder.RecordSuspend(*objRef, gitrepository.Spec.Suspend)
484-
}
485-
}
486-
487419
func (r *GitRepositoryReconciler) updateStatus(ctx context.Context, req ctrl.Request, newStatus sourcev1.GitRepositoryStatus) error {
488420
var repository sourcev1.GitRepository
489421
if err := r.Get(ctx, req.NamespacedName, &repository); err != nil {

0 commit comments

Comments
 (0)