Skip to content

Commit 274df7e

Browse files
committed
wire up serviceaccount based caching layer
Signed-off-by: everettraven <everettraven@gmail.com>
1 parent 245436c commit 274df7e

File tree

9 files changed

+310
-59
lines changed

9 files changed

+310
-59
lines changed

cmd/manager/main.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"flag"
2222
"fmt"
23+
"net/http"
2324
"os"
2425
"path/filepath"
2526
"time"
@@ -49,6 +50,7 @@ import (
4950
"github.com/operator-framework/operator-controller/internal/authentication"
5051
"github.com/operator-framework/operator-controller/internal/catalogmetadata/cache"
5152
catalogclient "github.com/operator-framework/operator-controller/internal/catalogmetadata/client"
53+
"github.com/operator-framework/operator-controller/internal/contentmanager"
5254
"github.com/operator-framework/operator-controller/internal/controllers"
5355
"github.com/operator-framework/operator-controller/internal/httputil"
5456
"github.com/operator-framework/operator-controller/internal/labels"
@@ -179,12 +181,14 @@ func main() {
179181
Name: cExt.Spec.ServiceAccount.Name,
180182
Namespace: cExt.Spec.InstallNamespace,
181183
}
182-
token, err := tokenGetter.Get(ctx, namespacedName)
183-
if err != nil {
184-
return nil, fmt.Errorf("failed to extract SA token, %w", err)
185-
}
186184
tempConfig := rest.AnonymousClientConfig(c)
187-
tempConfig.BearerToken = token
185+
tempConfig.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
186+
return &authentication.TokenInjectingRoundTripper{
187+
Tripper: rt,
188+
TokenGetter: tokenGetter,
189+
Key: namespacedName,
190+
}
191+
}
188192
return tempConfig, nil
189193
}
190194
cfgGetter, err := helmclient.NewActionConfigGetter(mgr.GetConfig(), mgr.GetRESTMapper(),
@@ -200,7 +204,6 @@ func main() {
200204
acg, err := action.NewWrappedActionClientGetter(cfgGetter,
201205
helmclient.WithFailureRollbacks(false),
202206
)
203-
204207
if err != nil {
205208
setupLog.Error(err, "unable to create helm client")
206209
os.Exit(1)
@@ -275,6 +278,7 @@ func main() {
275278
Finalizers: clusterExtensionFinalizers,
276279
CaCertPool: certPool,
277280
Preflights: preflights,
281+
Watcher: contentmanager.New(restConfigMapper, mgr.GetConfig(), mgr.GetRESTMapper()),
278282
}).SetupWithManager(mgr); err != nil {
279283
setupLog.Error(err, "unable to create controller", "controller", "ClusterExtension")
280284
os.Exit(1)

config/base/rbac/role.yaml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,6 @@ kind: ClusterRole
44
metadata:
55
name: manager-role
66
rules:
7-
- apiGroups:
8-
- '*'
9-
resources:
10-
- '*'
11-
verbs:
12-
- '*'
137
- apiGroups:
148
- apiextensions.k8s.io
159
resources:

hack/test/pre-upgrade-setup.sh

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,60 @@ metadata:
4848
name: upgrade-e2e
4949
rules:
5050
- apiGroups:
51-
- "*"
51+
- ""
5252
resources:
53-
- "*"
53+
- "secrets"
54+
- "services"
55+
- "serviceaccounts"
5456
verbs:
55-
- "*"
57+
- "create"
58+
- "update"
59+
- "patch"
60+
- "delete"
61+
- "get"
62+
- "list"
63+
- "watch"
64+
- apiGroups:
65+
- "apps"
66+
resources:
67+
- "deployments"
68+
verbs:
69+
- "create"
70+
- "update"
71+
- "patch"
72+
- "delete"
73+
- "get"
74+
- "list"
75+
- "watch"
76+
- apiGroups:
77+
- "apiextensions.k8s.io"
78+
resources:
79+
- "customresourcedefinitions"
80+
verbs:
81+
- "create"
82+
- "update"
83+
- "patch"
84+
- "delete"
85+
- "get"
86+
- "list"
87+
- "watch"
88+
- apiGroups:
89+
- "rbac.authorization.k8s.io"
90+
resources:
91+
- "clusterroles"
92+
- "clusterrolebindings"
93+
- "roles"
94+
- "rolebindings"
95+
verbs:
96+
- "create"
97+
- "update"
98+
- "patch"
99+
- "delete"
100+
- "get"
101+
- "list"
102+
- "watch"
103+
- "bind"
104+
- "escalate"
56105
EOF
57106

58107
kubectl apply -f - <<EOF

internal/authentication/tokengetter.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,9 @@ func (t *TokenGetter) reapExpiredTokens() {
100100
}
101101
}
102102
}
103+
104+
func (t *TokenGetter) Delete(key types.NamespacedName) {
105+
t.mu.Lock()
106+
defer t.mu.Unlock()
107+
delete(t.tokens, key)
108+
}

internal/authentication/tripper.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package authentication
2+
3+
import (
4+
"fmt"
5+
"net/http"
6+
7+
"k8s.io/apimachinery/pkg/types"
8+
utilnet "k8s.io/apimachinery/pkg/util/net"
9+
)
10+
11+
var _ http.RoundTripper = (*TokenInjectingRoundTripper)(nil)
12+
13+
type TokenInjectingRoundTripper struct {
14+
Tripper http.RoundTripper
15+
TokenGetter *TokenGetter
16+
Key types.NamespacedName
17+
}
18+
19+
func (tt *TokenInjectingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
20+
resp, err := tt.do(req)
21+
if resp != nil && resp.StatusCode == http.StatusUnauthorized {
22+
tt.TokenGetter.Delete(tt.Key)
23+
resp, err = tt.do(req)
24+
}
25+
// RoundTrip must always close the response body even on errors.
26+
// For more information see https://pkg.go.dev/net/http#RoundTripper
27+
if resp != nil {
28+
resp.Body.Close()
29+
}
30+
return resp, err
31+
}
32+
33+
func (tt *TokenInjectingRoundTripper) do(req *http.Request) (*http.Response, error) {
34+
reqClone := utilnet.CloneRequest(req)
35+
token, err := tt.TokenGetter.Get(reqClone.Context(), tt.Key)
36+
if err != nil {
37+
return nil, err
38+
}
39+
40+
// Always set the Authorization header to our retrieved token
41+
reqClone.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
42+
return tt.Tripper.RoundTrip(reqClone)
43+
}

internal/contentmanager/contentmanager.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ import (
1515
"sigs.k8s.io/controller-runtime/pkg/cache"
1616
"sigs.k8s.io/controller-runtime/pkg/client"
1717
"sigs.k8s.io/controller-runtime/pkg/controller"
18+
"sigs.k8s.io/controller-runtime/pkg/event"
1819
"sigs.k8s.io/controller-runtime/pkg/handler"
20+
"sigs.k8s.io/controller-runtime/pkg/predicate"
1921
"sigs.k8s.io/controller-runtime/pkg/source"
2022

2123
"github.com/operator-framework/operator-controller/api/v1alpha1"
@@ -156,7 +158,14 @@ func (i *instance) Watch(ctx context.Context, ctrl controller.Controller, ce *v1
156158
scheme,
157159
i.mapper,
158160
ce,
161+
handler.OnlyControllerOwner(),
159162
),
163+
predicate.Funcs{
164+
CreateFunc: func(tce event.TypedCreateEvent[client.Object]) bool { return false },
165+
UpdateFunc: func(tue event.TypedUpdateEvent[client.Object]) bool { return true },
166+
DeleteFunc: func(tde event.TypedDeleteEvent[client.Object]) bool { return true },
167+
GenericFunc: func(tge event.TypedGenericEvent[client.Object]) bool { return true },
168+
},
160169
),
161170
)
162171
if err != nil {

internal/controllers/clusterextension_controller.go

Lines changed: 9 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"fmt"
2525
"io"
2626
"strings"
27-
"sync"
2827
"time"
2928

3029
"github.com/go-logr/logr"
@@ -39,7 +38,6 @@ import (
3938
apimeta "k8s.io/apimachinery/pkg/api/meta"
4039
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4140
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
42-
"k8s.io/apimachinery/pkg/runtime/schema"
4341
"k8s.io/apimachinery/pkg/types"
4442
"k8s.io/apimachinery/pkg/util/sets"
4543
apimachyaml "k8s.io/apimachinery/pkg/util/yaml"
@@ -54,7 +52,6 @@ import (
5452
"sigs.k8s.io/controller-runtime/pkg/log"
5553
"sigs.k8s.io/controller-runtime/pkg/predicate"
5654
"sigs.k8s.io/controller-runtime/pkg/reconcile"
57-
"sigs.k8s.io/controller-runtime/pkg/source"
5855

5956
"github.com/operator-framework/api/pkg/operators/v1alpha1"
6057
catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1"
@@ -65,6 +62,7 @@ import (
6562
ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1"
6663
"github.com/operator-framework/operator-controller/internal/bundleutil"
6764
"github.com/operator-framework/operator-controller/internal/conditionsets"
65+
"github.com/operator-framework/operator-controller/internal/contentmanager"
6866
"github.com/operator-framework/operator-controller/internal/labels"
6967
"github.com/operator-framework/operator-controller/internal/resolve"
7068
"github.com/operator-framework/operator-controller/internal/rukpak/convert"
@@ -83,8 +81,7 @@ type ClusterExtensionReconciler struct {
8381
Resolver resolve.Resolver
8482
Unpacker rukpaksource.Unpacker
8583
ActionClientGetter helmclient.ActionClientGetter
86-
dynamicWatchMutex sync.RWMutex
87-
dynamicWatchGVKs sets.Set[schema.GroupVersionKind]
84+
Watcher contentmanager.Watcher
8885
controller crcontroller.Controller
8986
cache cache.Cache
9087
InstalledBundleGetter InstalledBundleGetter
@@ -119,9 +116,6 @@ type Preflight interface {
119116
//+kubebuilder:rbac:groups=core,resources=serviceaccounts/token,verbs=create
120117
//+kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get
121118

122-
// TODO: Remove these permissions as part of resolving https://github.com/operator-framework/operator-controller/issues/975
123-
//+kubebuilder:rbac:groups=*,resources=*,verbs=*
124-
125119
//+kubebuilder:rbac:groups=catalogd.operatorframework.io,resources=clustercatalogs,verbs=list;watch
126120
//+kubebuilder:rbac:groups=catalogd.operatorframework.io,resources=catalogmetadata,verbs=list;watch
127121

@@ -134,7 +128,7 @@ func (r *ClusterExtensionReconciler) Reconcile(ctx context.Context, req ctrl.Req
134128
l.V(1).Info("reconcile starting")
135129
defer l.V(1).Info("reconcile ending")
136130

137-
var existingExt = &ocv1alpha1.ClusterExtension{}
131+
existingExt := &ocv1alpha1.ClusterExtension{}
138132
if err := r.Client.Get(ctx, req.NamespacedName, existingExt); err != nil {
139133
return ctrl.Result{}, client.IgnoreNotFound(err)
140134
}
@@ -404,36 +398,16 @@ func (r *ClusterExtensionReconciler) reconcile(ctx context.Context, ext *ocv1alp
404398
return ctrl.Result{}, err
405399
}
406400

407-
for _, obj := range relObjects {
408-
if err := func() error {
409-
r.dynamicWatchMutex.Lock()
410-
defer r.dynamicWatchMutex.Unlock()
411-
412-
_, isWatched := r.dynamicWatchGVKs[obj.GetObjectKind().GroupVersionKind()]
413-
if !isWatched {
414-
if err := r.controller.Watch(
415-
source.Kind(r.cache,
416-
obj,
417-
crhandler.EnqueueRequestForOwner(r.Scheme(), r.RESTMapper(), ext, crhandler.OnlyControllerOwner()),
418-
predicate.Funcs{
419-
CreateFunc: func(ce event.CreateEvent) bool { return false },
420-
UpdateFunc: func(ue event.UpdateEvent) bool { return true },
421-
DeleteFunc: func(de event.DeleteEvent) bool { return true },
422-
GenericFunc: func(ge event.GenericEvent) bool { return true },
423-
},
424-
),
425-
); err != nil {
426-
return err
427-
}
428-
r.dynamicWatchGVKs[obj.GetObjectKind().GroupVersionKind()] = sets.Empty{}
429-
}
430-
return nil
431-
}(); err != nil {
432-
ext.Status.InstalledBundle = nil
401+
// Only attempt to watch resources if we are
402+
// installing / upgrading. Otherwise we may restart
403+
// watches that have already been established
404+
if state != stateUnchanged {
405+
if err := r.Watcher.Watch(ctx, r.controller, ext, relObjects); err != nil {
433406
setInstalledStatusConditionFailed(ext, fmt.Sprintf("%s:%v", ocv1alpha1.ReasonCreateDynamicWatchFailed, err))
434407
return ctrl.Result{}, err
435408
}
436409
}
410+
437411
ext.Status.InstalledBundle = bundleutil.MetadataFor(resolvedBundle.Name, *resolvedBundleVersion)
438412
setInstalledStatusConditionSuccess(ext, fmt.Sprintf("Installed bundle %s successfully", resolvedBundle.Image))
439413

@@ -534,13 +508,11 @@ func (r *ClusterExtensionReconciler) SetupWithManager(mgr ctrl.Manager) error {
534508
},
535509
})).
536510
Build(r)
537-
538511
if err != nil {
539512
return err
540513
}
541514
r.controller = controller
542515
r.cache = mgr.GetCache()
543-
r.dynamicWatchGVKs = sets.New[schema.GroupVersionKind]()
544516

545517
return nil
546518
}

0 commit comments

Comments
 (0)