-
Notifications
You must be signed in to change notification settings - Fork 54
/
clusterextension_controller.go
506 lines (451 loc) · 20.2 KB
/
clusterextension_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
/*
Copyright 2023.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"errors"
"fmt"
"io/fs"
"strings"
"time"
"github.com/go-logr/logr"
"helm.sh/helm/v3/pkg/release"
"helm.sh/helm/v3/pkg/storage/driver"
"k8s.io/apimachinery/pkg/api/equality"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
crcontroller "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
crfinalizer "sigs.k8s.io/controller-runtime/pkg/finalizer"
crhandler "sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"github.com/operator-framework/api/pkg/operators/v1alpha1"
catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1"
helmclient "github.com/operator-framework/helm-operator-plugins/pkg/client"
"github.com/operator-framework/operator-registry/alpha/declcfg"
ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1"
"github.com/operator-framework/operator-controller/internal/bundleutil"
"github.com/operator-framework/operator-controller/internal/conditionsets"
"github.com/operator-framework/operator-controller/internal/contentmanager"
"github.com/operator-framework/operator-controller/internal/labels"
"github.com/operator-framework/operator-controller/internal/resolve"
rukpaksource "github.com/operator-framework/operator-controller/internal/rukpak/source"
)
const (
ClusterExtensionCleanupUnpackCacheFinalizer = "olm.operatorframework.io/cleanup-unpack-cache"
ClusterExtensionCleanupContentManagerCacheFinalizer = "olm.operatorframework.io/cleanup-contentmanager-cache"
)
// ClusterExtensionReconciler reconciles a ClusterExtension object
type ClusterExtensionReconciler struct {
client.Client
Resolver resolve.Resolver
Unpacker rukpaksource.Unpacker
Applier Applier
Manager contentmanager.Manager
controller crcontroller.Controller
cache cache.Cache
InstalledBundleGetter InstalledBundleGetter
Finalizers crfinalizer.Finalizers
}
type Applier interface {
// Apply applies the content in the provided fs.FS using the configuration of the provided ClusterExtension.
// It also takes in a map[string]string to be applied to all applied resources as labels and another
// map[string]string used to create a unique identifier for a stored reference to the resources created.
Apply(context.Context, fs.FS, *ocv1alpha1.ClusterExtension, map[string]string, map[string]string) ([]client.Object, string, error)
}
type InstalledBundleGetter interface {
GetInstalledBundle(ctx context.Context, ext *ocv1alpha1.ClusterExtension) (*InstalledBundle, error)
}
//+kubebuilder:rbac:groups=olm.operatorframework.io,resources=clusterextensions,verbs=get;list;watch;update;patch
//+kubebuilder:rbac:groups=olm.operatorframework.io,resources=clusterextensions/status,verbs=update;patch
//+kubebuilder:rbac:groups=olm.operatorframework.io,resources=clusterextensions/finalizers,verbs=update
//+kubebuilder:rbac:namespace=system,groups=core,resources=secrets,verbs=create;update;patch;delete;deletecollection;get;list;watch
//+kubebuilder:rbac:groups=core,resources=serviceaccounts/token,verbs=create
//+kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get
//+kubebuilder:rbac:groups=olm.operatorframework.io,resources=clustercatalogs,verbs=list;watch
// The operator controller needs to watch all the bundle objects and reconcile accordingly. Though not ideal, but these permissions are required.
// This has been taken from rukpak, and an issue was created before to discuss it: https://github.com/operator-framework/rukpak/issues/800.
func (r *ClusterExtensionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
l := log.FromContext(ctx).WithName("operator-controller")
ctx = log.IntoContext(ctx, l)
l.Info("reconcile starting")
defer l.Info("reconcile ending")
existingExt := &ocv1alpha1.ClusterExtension{}
if err := r.Client.Get(ctx, req.NamespacedName, existingExt); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
reconciledExt := existingExt.DeepCopy()
res, reconcileErr := r.reconcile(ctx, reconciledExt)
// Do checks before any Update()s, as Update() may modify the resource structure!
updateStatus := !equality.Semantic.DeepEqual(existingExt.Status, reconciledExt.Status)
updateFinalizers := !equality.Semantic.DeepEqual(existingExt.Finalizers, reconciledExt.Finalizers)
// If any unexpected fields have changed, panic before updating the resource
unexpectedFieldsChanged := checkForUnexpectedFieldChange(*existingExt, *reconciledExt)
if unexpectedFieldsChanged {
panic("spec or metadata changed by reconciler")
}
// Save the finalizers off to the side. If we update the status, the reconciledExt will be updated
// to contain the new state of the ClusterExtension, which contains the status update, but (critically)
// does not contain the finalizers. After the status update, we need to re-add the finalizers to the
// reconciledExt before updating the object.
finalizers := reconciledExt.Finalizers
if updateStatus {
if err := r.Client.Status().Update(ctx, reconciledExt); err != nil {
reconcileErr = errors.Join(reconcileErr, fmt.Errorf("error updating status: %v", err))
}
}
reconciledExt.Finalizers = finalizers
if updateFinalizers {
if err := r.Client.Update(ctx, reconciledExt); err != nil {
reconcileErr = errors.Join(reconcileErr, fmt.Errorf("error updating finalizers: %v", err))
}
}
return res, reconcileErr
}
// ensureAllConditionsWithReason checks that all defined condition types exist in the given ClusterExtension,
// and assigns a specified reason and custom message to any missing condition.
func ensureAllConditionsWithReason(ext *ocv1alpha1.ClusterExtension, reason v1alpha1.ConditionReason, message string) {
for _, condType := range conditionsets.ConditionTypes {
cond := apimeta.FindStatusCondition(ext.Status.Conditions, condType)
if cond == nil {
// Create a new condition with a valid reason and add it
newCond := metav1.Condition{
Type: condType,
Status: metav1.ConditionFalse,
Reason: string(reason),
Message: message,
ObservedGeneration: ext.GetGeneration(),
LastTransitionTime: metav1.NewTime(time.Now()),
}
ext.Status.Conditions = append(ext.Status.Conditions, newCond)
}
}
}
// Compare resources - ignoring status & metadata.finalizers
func checkForUnexpectedFieldChange(a, b ocv1alpha1.ClusterExtension) bool {
a.Status, b.Status = ocv1alpha1.ClusterExtensionStatus{}, ocv1alpha1.ClusterExtensionStatus{}
a.Finalizers, b.Finalizers = []string{}, []string{}
return !equality.Semantic.DeepEqual(a, b)
}
// Helper function to do the actual reconcile
//
// Today we always return ctrl.Result{} and an error.
// But in the future we might update this function
// to return different results (e.g. requeue).
//
/* The reconcile functions performs the following major tasks:
1. Resolution: Run the resolution to find the bundle from the catalog which needs to be installed.
2. Validate: Ensure that the bundle returned from the resolution for install meets our requirements.
3. Unpack: Unpack the contents from the bundle and store in a localdir in the pod.
4. Install: The process of installing involves:
4.1 Converting the CSV in the bundle into a set of plain k8s objects.
4.2 Generating a chart from k8s objects.
4.3 Apply the release on cluster.
*/
//nolint:unparam
func (r *ClusterExtensionReconciler) reconcile(ctx context.Context, ext *ocv1alpha1.ClusterExtension) (ctrl.Result, error) {
l := log.FromContext(ctx)
l.Info("handling finalizers")
finalizeResult, err := r.Finalizers.Finalize(ctx, ext)
if err != nil {
setStatusProgressing(ext, err)
return ctrl.Result{}, err
}
if finalizeResult.Updated || finalizeResult.StatusUpdated {
// On create: make sure the finalizer is applied before we do anything
// On delete: make sure we do nothing after the finalizer is removed
return ctrl.Result{}, nil
}
l.Info("getting installed bundle")
installedBundle, err := r.InstalledBundleGetter.GetInstalledBundle(ctx, ext)
if err != nil {
setInstallStatus(ext, nil)
setInstalledStatusConditionUnknown(ext, err.Error())
setStatusProgressing(ext, errors.New("retrying to get installed bundle"))
return ctrl.Result{}, err
}
// run resolution
l.Info("resolving bundle")
var bm *ocv1alpha1.BundleMetadata
if installedBundle != nil {
bm = &installedBundle.BundleMetadata
}
resolvedBundle, resolvedBundleVersion, resolvedDeprecation, err := r.Resolver.Resolve(ctx, ext, bm)
if err != nil {
// Note: We don't distinguish between resolution-specific errors and generic errors
setStatusProgressing(ext, err)
setInstalledStatusFromBundle(ext, installedBundle)
ensureAllConditionsWithReason(ext, ocv1alpha1.ReasonFailed, err.Error())
return ctrl.Result{}, err
}
// set deprecation status after _successful_ resolution
// TODO:
// 1. It seems like deprecation status should reflect the currently installed bundle, not the resolved
// bundle. So perhaps we should set package and channel deprecations directly after resolution, but
// defer setting the bundle deprecation until we successfully install the bundle.
// 2. If resolution fails because it can't find a bundle, that doesn't mean we wouldn't be able to find
// a deprecation for the ClusterExtension's spec.packageName. Perhaps we should check for a non-nil
// resolvedDeprecation even if resolution returns an error. If present, we can still update some of
// our deprecation status.
// - Open question though: what if different catalogs have different opinions of what's deprecated.
// If we can't resolve a bundle, how do we know which catalog to trust for deprecation information?
// Perhaps if the package shows up in multiple catalogs and deprecations don't match, we can set
// the deprecation status to unknown? Or perhaps we somehow combine the deprecation information from
// all catalogs?
SetDeprecationStatus(ext, resolvedBundle.Name, resolvedDeprecation)
resolvedBundleMetadata := bundleutil.MetadataFor(resolvedBundle.Name, *resolvedBundleVersion)
bundleSource := &rukpaksource.BundleSource{
Name: ext.GetName(),
Type: rukpaksource.SourceTypeImage,
Image: &rukpaksource.ImageSource{
Ref: resolvedBundle.Image,
},
}
l.Info("unpacking resolved bundle")
unpackResult, err := r.Unpacker.Unpack(ctx, bundleSource)
if err != nil {
// Wrap the error passed to this with the resolution information until we have successfully
// installed since we intend for the progressing condition to replace the resolved condition
// and will be removing the .status.resolution field from the ClusterExtension status API
setStatusProgressing(ext, wrapErrorWithResolutionInfo(resolvedBundleMetadata, err))
setInstalledStatusFromBundle(ext, installedBundle)
return ctrl.Result{}, err
}
if unpackResult.State != rukpaksource.StateUnpacked {
panic(fmt.Sprintf("unexpected unpack state %q", unpackResult.State))
}
objLbls := map[string]string{
labels.OwnerKindKey: ocv1alpha1.ClusterExtensionKind,
labels.OwnerNameKey: ext.GetName(),
}
storeLbls := map[string]string{
labels.BundleNameKey: resolvedBundle.Name,
labels.PackageNameKey: resolvedBundle.Package,
labels.BundleVersionKey: resolvedBundleVersion.String(),
labels.BundleReferenceKey: resolvedBundle.Image,
}
l.Info("applying bundle contents")
// NOTE: We need to be cautious of eating errors here.
// We should always return any error that occurs during an
// attempt to apply content to the cluster. Only when there is
// a verifiable reason to eat the error (i.e it is recoverable)
// should an exception be made.
// The following kinds of errors should be returned up the stack
// to ensure exponential backoff can occur:
// - Permission errors (it is not possible to watch changes to permissions.
// The only way to eventually recover from permission errors is to keep retrying).
managedObjs, _, err := r.Applier.Apply(ctx, unpackResult.Bundle, ext, objLbls, storeLbls)
if err != nil {
setStatusProgressing(ext, wrapErrorWithResolutionInfo(resolvedBundleMetadata, err))
// Now that we're actually trying to install, use the error
setInstalledStatusFromBundle(ext, installedBundle)
return ctrl.Result{}, err
}
newInstalledBundle := &InstalledBundle{
BundleMetadata: resolvedBundleMetadata,
Image: resolvedBundle.Image,
}
// Successful install
setInstalledStatusFromBundle(ext, newInstalledBundle)
l.Info("watching managed objects")
cache, err := r.Manager.Get(ctx, ext)
if err != nil {
// No need to wrap error with resolution information here (or beyond) since the
// bundle was successfully installed and the information will be present in
// the .status.installed field
setStatusProgressing(ext, err)
return ctrl.Result{}, err
}
if err := cache.Watch(ctx, r.controller, managedObjs...); err != nil {
setStatusProgressing(ext, err)
return ctrl.Result{}, err
}
// If we made it here, we have successfully reconciled the ClusterExtension
// and have reached the desired state. Since the Progressing status should reflect
// our progress towards the desired state, we also set it when we have reached
// the desired state by providing a nil error value.
setStatusProgressing(ext, nil)
return ctrl.Result{}, nil
}
// SetDeprecationStatus will set the appropriate deprecation statuses for a ClusterExtension
// based on the provided bundle
func SetDeprecationStatus(ext *ocv1alpha1.ClusterExtension, bundleName string, deprecation *declcfg.Deprecation) {
deprecations := map[string][]declcfg.DeprecationEntry{}
channelSet := sets.New[string]()
if ext.Spec.Source.Catalog != nil {
for _, channel := range ext.Spec.Source.Catalog.Channels {
channelSet.Insert(channel)
}
}
if deprecation != nil {
for _, entry := range deprecation.Entries {
switch entry.Reference.Schema {
case declcfg.SchemaPackage:
deprecations[ocv1alpha1.TypePackageDeprecated] = []declcfg.DeprecationEntry{entry}
case declcfg.SchemaChannel:
if channelSet.Has(entry.Reference.Name) {
deprecations[ocv1alpha1.TypeChannelDeprecated] = append(deprecations[ocv1alpha1.TypeChannelDeprecated], entry)
}
case declcfg.SchemaBundle:
if bundleName != entry.Reference.Name {
continue
}
deprecations[ocv1alpha1.TypeBundleDeprecated] = []declcfg.DeprecationEntry{entry}
}
}
}
// first get ordered deprecation messages that we'll join in the Deprecated condition message
var deprecationMessages []string
for _, conditionType := range []string{
ocv1alpha1.TypePackageDeprecated,
ocv1alpha1.TypeChannelDeprecated,
ocv1alpha1.TypeBundleDeprecated,
} {
if entries, ok := deprecations[conditionType]; ok {
for _, entry := range entries {
deprecationMessages = append(deprecationMessages, entry.Message)
}
}
}
// next, set the Deprecated condition
status, reason, message := metav1.ConditionFalse, ocv1alpha1.ReasonDeprecated, ""
if len(deprecationMessages) > 0 {
status, reason, message = metav1.ConditionTrue, ocv1alpha1.ReasonDeprecated, strings.Join(deprecationMessages, ";")
}
apimeta.SetStatusCondition(&ext.Status.Conditions, metav1.Condition{
Type: ocv1alpha1.TypeDeprecated,
Reason: reason,
Status: status,
Message: message,
ObservedGeneration: ext.Generation,
})
// finally, set the individual deprecation conditions for package, channel, and bundle
for _, conditionType := range []string{
ocv1alpha1.TypePackageDeprecated,
ocv1alpha1.TypeChannelDeprecated,
ocv1alpha1.TypeBundleDeprecated,
} {
entries, ok := deprecations[conditionType]
status, reason, message := metav1.ConditionFalse, ocv1alpha1.ReasonDeprecated, ""
if ok {
status, reason = metav1.ConditionTrue, ocv1alpha1.ReasonDeprecated
for _, entry := range entries {
message = fmt.Sprintf("%s\n%s", message, entry.Message)
}
}
apimeta.SetStatusCondition(&ext.Status.Conditions, metav1.Condition{
Type: conditionType,
Reason: reason,
Status: status,
Message: message,
ObservedGeneration: ext.Generation,
})
}
}
// SetupWithManager sets up the controller with the Manager.
func (r *ClusterExtensionReconciler) SetupWithManager(mgr ctrl.Manager) error {
controller, err := ctrl.NewControllerManagedBy(mgr).
For(&ocv1alpha1.ClusterExtension{}).
Watches(&catalogd.ClusterCatalog{},
crhandler.EnqueueRequestsFromMapFunc(clusterExtensionRequestsForCatalog(mgr.GetClient(), mgr.GetLogger())),
builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool {
oldObject, isOldCatalog := ue.ObjectOld.(*catalogd.ClusterCatalog)
newObject, isNewCatalog := ue.ObjectNew.(*catalogd.ClusterCatalog)
if !isOldCatalog || !isNewCatalog {
return true
}
if oldObject.Status.ResolvedSource != nil && newObject.Status.ResolvedSource != nil {
if oldObject.Status.ResolvedSource.Image != nil && newObject.Status.ResolvedSource.Image != nil {
return oldObject.Status.ResolvedSource.Image.Ref != newObject.Status.ResolvedSource.Image.Ref
}
}
return true
},
})).
Build(r)
if err != nil {
return err
}
r.controller = controller
r.cache = mgr.GetCache()
return nil
}
func wrapErrorWithResolutionInfo(resolved ocv1alpha1.BundleMetadata, err error) error {
return fmt.Errorf("%w for resolved bundle %q with version %q", err, resolved.Name, resolved.Version)
}
// Generate reconcile requests for all cluster extensions affected by a catalog change
func clusterExtensionRequestsForCatalog(c client.Reader, logger logr.Logger) crhandler.MapFunc {
return func(ctx context.Context, _ client.Object) []reconcile.Request {
// no way of associating an extension to a catalog so create reconcile requests for everything
clusterExtensions := metav1.PartialObjectMetadataList{}
clusterExtensions.SetGroupVersionKind(ocv1alpha1.GroupVersion.WithKind("ClusterExtensionList"))
err := c.List(ctx, &clusterExtensions)
if err != nil {
logger.Error(err, "unable to enqueue cluster extensions for catalog reconcile")
return nil
}
var requests []reconcile.Request
for _, ext := range clusterExtensions.Items {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: ext.GetNamespace(),
Name: ext.GetName(),
},
})
}
return requests
}
}
type DefaultInstalledBundleGetter struct {
helmclient.ActionClientGetter
}
type InstalledBundle struct {
ocv1alpha1.BundleMetadata
Image string
}
func (d *DefaultInstalledBundleGetter) GetInstalledBundle(ctx context.Context, ext *ocv1alpha1.ClusterExtension) (*InstalledBundle, error) {
cl, err := d.ActionClientFor(ctx, ext)
if err != nil {
return nil, err
}
relhis, err := cl.History(ext.GetName())
if err != nil && !errors.Is(err, driver.ErrReleaseNotFound) {
return nil, err
}
if len(relhis) == 0 {
return nil, nil
}
// relhis[0].Info.Status is the status of the most recent install attempt.
// But we need to look for the most-recent _Deployed_ release
for _, rel := range relhis {
if rel.Info != nil && rel.Info.Status == release.StatusDeployed {
return &InstalledBundle{
BundleMetadata: ocv1alpha1.BundleMetadata{
Name: rel.Labels[labels.BundleNameKey],
Version: rel.Labels[labels.BundleVersionKey],
},
Image: rel.Labels[labels.BundleReferenceKey],
}, nil
}
}
return nil, nil
}