Skip to content

Commit b73ea5c

Browse files
authored
✨ wire up ServiceAccount based caching layer (#1074)
* wire up serviceaccount based caching layer Signed-off-by: everettraven <everettraven@gmail.com> * remove body close Signed-off-by: everettraven <everettraven@gmail.com> --------- Signed-off-by: everettraven <everettraven@gmail.com>
1 parent e3e6b03 commit b73ea5c

File tree

9 files changed

+304
-58
lines changed

9 files changed

+304
-58
lines changed

cmd/manager/main.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/operator-framework/operator-controller/internal/authentication"
5151
"github.com/operator-framework/operator-controller/internal/catalogmetadata/cache"
5252
catalogclient "github.com/operator-framework/operator-controller/internal/catalogmetadata/client"
53+
"github.com/operator-framework/operator-controller/internal/contentmanager"
5354
"github.com/operator-framework/operator-controller/internal/controllers"
5455
"github.com/operator-framework/operator-controller/internal/httputil"
5556
"github.com/operator-framework/operator-controller/internal/labels"
@@ -180,12 +181,14 @@ func main() {
180181
Name: cExt.Spec.ServiceAccount.Name,
181182
Namespace: cExt.Spec.InstallNamespace,
182183
}
183-
token, err := tokenGetter.Get(ctx, namespacedName)
184-
if err != nil {
185-
return nil, fmt.Errorf("failed to extract SA token, %w", err)
186-
}
187184
tempConfig := rest.AnonymousClientConfig(c)
188-
tempConfig.BearerToken = token
185+
tempConfig.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
186+
return &authentication.TokenInjectingRoundTripper{
187+
Tripper: rt,
188+
TokenGetter: tokenGetter,
189+
Key: namespacedName,
190+
}
191+
}
189192
return tempConfig, nil
190193
}
191194
cfgGetter, err := helmclient.NewActionConfigGetter(mgr.GetConfig(), mgr.GetRESTMapper(),
@@ -201,7 +204,6 @@ func main() {
201204
acg, err := action.NewWrappedActionClientGetter(cfgGetter,
202205
helmclient.WithFailureRollbacks(false),
203206
)
204-
205207
if err != nil {
206208
setupLog.Error(err, "unable to create helm client")
207209
os.Exit(1)
@@ -272,6 +274,7 @@ func main() {
272274
InstalledBundleGetter: &controllers.DefaultInstalledBundleGetter{ActionClientGetter: acg},
273275
Finalizers: clusterExtensionFinalizers,
274276
Preflights: preflights,
277+
Watcher: contentmanager.New(restConfigMapper, mgr.GetConfig(), mgr.GetRESTMapper()),
275278
}).SetupWithManager(mgr); err != nil {
276279
setupLog.Error(err, "unable to create controller", "controller", "ClusterExtension")
277280
os.Exit(1)

config/base/rbac/role.yaml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,6 @@ kind: ClusterRole
44
metadata:
55
name: manager-role
66
rules:
7-
- apiGroups:
8-
- '*'
9-
resources:
10-
- '*'
11-
verbs:
12-
- '*'
137
- apiGroups:
148
- apiextensions.k8s.io
159
resources:

hack/test/pre-upgrade-setup.sh

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,60 @@ metadata:
4848
name: upgrade-e2e
4949
rules:
5050
- apiGroups:
51-
- "*"
51+
- ""
5252
resources:
53-
- "*"
53+
- "secrets"
54+
- "services"
55+
- "serviceaccounts"
5456
verbs:
55-
- "*"
57+
- "create"
58+
- "update"
59+
- "patch"
60+
- "delete"
61+
- "get"
62+
- "list"
63+
- "watch"
64+
- apiGroups:
65+
- "apps"
66+
resources:
67+
- "deployments"
68+
verbs:
69+
- "create"
70+
- "update"
71+
- "patch"
72+
- "delete"
73+
- "get"
74+
- "list"
75+
- "watch"
76+
- apiGroups:
77+
- "apiextensions.k8s.io"
78+
resources:
79+
- "customresourcedefinitions"
80+
verbs:
81+
- "create"
82+
- "update"
83+
- "patch"
84+
- "delete"
85+
- "get"
86+
- "list"
87+
- "watch"
88+
- apiGroups:
89+
- "rbac.authorization.k8s.io"
90+
resources:
91+
- "clusterroles"
92+
- "clusterrolebindings"
93+
- "roles"
94+
- "rolebindings"
95+
verbs:
96+
- "create"
97+
- "update"
98+
- "patch"
99+
- "delete"
100+
- "get"
101+
- "list"
102+
- "watch"
103+
- "bind"
104+
- "escalate"
56105
EOF
57106

58107
kubectl apply -f - <<EOF

internal/authentication/tokengetter.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,9 @@ func (t *TokenGetter) reapExpiredTokens() {
100100
}
101101
}
102102
}
103+
104+
func (t *TokenGetter) Delete(key types.NamespacedName) {
105+
t.mu.Lock()
106+
defer t.mu.Unlock()
107+
delete(t.tokens, key)
108+
}

internal/authentication/tripper.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package authentication
2+
3+
import (
4+
"fmt"
5+
"net/http"
6+
7+
"k8s.io/apimachinery/pkg/types"
8+
utilnet "k8s.io/apimachinery/pkg/util/net"
9+
)
10+
11+
var _ http.RoundTripper = (*TokenInjectingRoundTripper)(nil)
12+
13+
type TokenInjectingRoundTripper struct {
14+
Tripper http.RoundTripper
15+
TokenGetter *TokenGetter
16+
Key types.NamespacedName
17+
}
18+
19+
func (tt *TokenInjectingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
20+
resp, err := tt.do(req)
21+
if resp != nil && resp.StatusCode == http.StatusUnauthorized {
22+
tt.TokenGetter.Delete(tt.Key)
23+
resp, err = tt.do(req)
24+
}
25+
return resp, err
26+
}
27+
28+
func (tt *TokenInjectingRoundTripper) do(req *http.Request) (*http.Response, error) {
29+
reqClone := utilnet.CloneRequest(req)
30+
token, err := tt.TokenGetter.Get(reqClone.Context(), tt.Key)
31+
if err != nil {
32+
return nil, err
33+
}
34+
35+
// Always set the Authorization header to our retrieved token
36+
reqClone.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
37+
return tt.Tripper.RoundTrip(reqClone)
38+
}

internal/contentmanager/contentmanager.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ import (
1515
"sigs.k8s.io/controller-runtime/pkg/cache"
1616
"sigs.k8s.io/controller-runtime/pkg/client"
1717
"sigs.k8s.io/controller-runtime/pkg/controller"
18+
"sigs.k8s.io/controller-runtime/pkg/event"
1819
"sigs.k8s.io/controller-runtime/pkg/handler"
20+
"sigs.k8s.io/controller-runtime/pkg/predicate"
1921
"sigs.k8s.io/controller-runtime/pkg/source"
2022

2123
"github.com/operator-framework/operator-controller/api/v1alpha1"
@@ -156,7 +158,14 @@ func (i *instance) Watch(ctx context.Context, ctrl controller.Controller, ce *v1
156158
scheme,
157159
i.mapper,
158160
ce,
161+
handler.OnlyControllerOwner(),
159162
),
163+
predicate.Funcs{
164+
CreateFunc: func(tce event.TypedCreateEvent[client.Object]) bool { return false },
165+
UpdateFunc: func(tue event.TypedUpdateEvent[client.Object]) bool { return true },
166+
DeleteFunc: func(tde event.TypedDeleteEvent[client.Object]) bool { return true },
167+
GenericFunc: func(tge event.TypedGenericEvent[client.Object]) bool { return true },
168+
},
160169
),
161170
)
162171
if err != nil {

internal/controllers/clusterextension_controller.go

Lines changed: 9 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"fmt"
2424
"io"
2525
"strings"
26-
"sync"
2726
"time"
2827

2928
"github.com/go-logr/logr"
@@ -38,7 +37,6 @@ import (
3837
apimeta "k8s.io/apimachinery/pkg/api/meta"
3938
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4039
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
41-
"k8s.io/apimachinery/pkg/runtime/schema"
4240
"k8s.io/apimachinery/pkg/types"
4341
"k8s.io/apimachinery/pkg/util/sets"
4442
apimachyaml "k8s.io/apimachinery/pkg/util/yaml"
@@ -53,7 +51,6 @@ import (
5351
"sigs.k8s.io/controller-runtime/pkg/log"
5452
"sigs.k8s.io/controller-runtime/pkg/predicate"
5553
"sigs.k8s.io/controller-runtime/pkg/reconcile"
56-
"sigs.k8s.io/controller-runtime/pkg/source"
5754

5855
"github.com/operator-framework/api/pkg/operators/v1alpha1"
5956
catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1"
@@ -64,6 +61,7 @@ import (
6461
ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1"
6562
"github.com/operator-framework/operator-controller/internal/bundleutil"
6663
"github.com/operator-framework/operator-controller/internal/conditionsets"
64+
"github.com/operator-framework/operator-controller/internal/contentmanager"
6765
"github.com/operator-framework/operator-controller/internal/labels"
6866
"github.com/operator-framework/operator-controller/internal/resolve"
6967
"github.com/operator-framework/operator-controller/internal/rukpak/convert"
@@ -82,8 +80,7 @@ type ClusterExtensionReconciler struct {
8280
Resolver resolve.Resolver
8381
Unpacker rukpaksource.Unpacker
8482
ActionClientGetter helmclient.ActionClientGetter
85-
dynamicWatchMutex sync.RWMutex
86-
dynamicWatchGVKs sets.Set[schema.GroupVersionKind]
83+
Watcher contentmanager.Watcher
8784
controller crcontroller.Controller
8885
cache cache.Cache
8986
InstalledBundleGetter InstalledBundleGetter
@@ -117,9 +114,6 @@ type Preflight interface {
117114
//+kubebuilder:rbac:groups=core,resources=serviceaccounts/token,verbs=create
118115
//+kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get
119116

120-
// TODO: Remove these permissions as part of resolving https://github.com/operator-framework/operator-controller/issues/975
121-
//+kubebuilder:rbac:groups=*,resources=*,verbs=*
122-
123117
//+kubebuilder:rbac:groups=catalogd.operatorframework.io,resources=clustercatalogs,verbs=list;watch
124118
//+kubebuilder:rbac:groups=catalogd.operatorframework.io,resources=catalogmetadata,verbs=list;watch
125119

@@ -132,7 +126,7 @@ func (r *ClusterExtensionReconciler) Reconcile(ctx context.Context, req ctrl.Req
132126
l.V(1).Info("reconcile starting")
133127
defer l.V(1).Info("reconcile ending")
134128

135-
var existingExt = &ocv1alpha1.ClusterExtension{}
129+
existingExt := &ocv1alpha1.ClusterExtension{}
136130
if err := r.Client.Get(ctx, req.NamespacedName, existingExt); err != nil {
137131
return ctrl.Result{}, client.IgnoreNotFound(err)
138132
}
@@ -401,36 +395,17 @@ func (r *ClusterExtensionReconciler) reconcile(ctx context.Context, ext *ocv1alp
401395
return ctrl.Result{}, err
402396
}
403397

404-
for _, obj := range relObjects {
405-
if err := func() error {
406-
r.dynamicWatchMutex.Lock()
407-
defer r.dynamicWatchMutex.Unlock()
408-
409-
_, isWatched := r.dynamicWatchGVKs[obj.GetObjectKind().GroupVersionKind()]
410-
if !isWatched {
411-
if err := r.controller.Watch(
412-
source.Kind(r.cache,
413-
obj,
414-
crhandler.EnqueueRequestForOwner(r.Scheme(), r.RESTMapper(), ext, crhandler.OnlyControllerOwner()),
415-
predicate.Funcs{
416-
CreateFunc: func(ce event.CreateEvent) bool { return false },
417-
UpdateFunc: func(ue event.UpdateEvent) bool { return true },
418-
DeleteFunc: func(de event.DeleteEvent) bool { return true },
419-
GenericFunc: func(ge event.GenericEvent) bool { return true },
420-
},
421-
),
422-
); err != nil {
423-
return err
424-
}
425-
r.dynamicWatchGVKs[obj.GetObjectKind().GroupVersionKind()] = sets.Empty{}
426-
}
427-
return nil
428-
}(); err != nil {
398+
// Only attempt to watch resources if we are
399+
// installing / upgrading. Otherwise we may restart
400+
// watches that have already been established
401+
if state != stateUnchanged {
402+
if err := r.Watcher.Watch(ctx, r.controller, ext, relObjects); err != nil {
429403
ext.Status.InstalledBundle = nil
430404
setInstalledStatusConditionFailed(ext, fmt.Sprintf("%s:%v", ocv1alpha1.ReasonInstallationFailed, err))
431405
return ctrl.Result{}, err
432406
}
433407
}
408+
434409
ext.Status.InstalledBundle = bundleutil.MetadataFor(resolvedBundle.Name, *resolvedBundleVersion)
435410
setInstalledStatusConditionSuccess(ext, fmt.Sprintf("Installed bundle %s successfully", resolvedBundle.Image))
436411

@@ -531,13 +506,11 @@ func (r *ClusterExtensionReconciler) SetupWithManager(mgr ctrl.Manager) error {
531506
},
532507
})).
533508
Build(r)
534-
535509
if err != nil {
536510
return err
537511
}
538512
r.controller = controller
539513
r.cache = mgr.GetCache()
540-
r.dynamicWatchGVKs = sets.New[schema.GroupVersionKind]()
541514

542515
return nil
543516
}

0 commit comments

Comments
 (0)