Skip to content

✨ wire up ServiceAccount based caching layer #1074

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"flag"
"fmt"
"net/http"
"os"
"path/filepath"
"time"
Expand Down Expand Up @@ -49,6 +50,7 @@ import (
"github.com/operator-framework/operator-controller/internal/authentication"
"github.com/operator-framework/operator-controller/internal/catalogmetadata/cache"
catalogclient "github.com/operator-framework/operator-controller/internal/catalogmetadata/client"
"github.com/operator-framework/operator-controller/internal/contentmanager"
"github.com/operator-framework/operator-controller/internal/controllers"
"github.com/operator-framework/operator-controller/internal/httputil"
"github.com/operator-framework/operator-controller/internal/labels"
Expand Down Expand Up @@ -179,12 +181,14 @@ func main() {
Name: cExt.Spec.ServiceAccount.Name,
Namespace: cExt.Spec.InstallNamespace,
}
token, err := tokenGetter.Get(ctx, namespacedName)
if err != nil {
return nil, fmt.Errorf("failed to extract SA token, %w", err)
}
tempConfig := rest.AnonymousClientConfig(c)
tempConfig.BearerToken = token
tempConfig.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
return &authentication.TokenInjectingRoundTripper{
Tripper: rt,
TokenGetter: tokenGetter,
Key: namespacedName,
}
}
return tempConfig, nil
}
cfgGetter, err := helmclient.NewActionConfigGetter(mgr.GetConfig(), mgr.GetRESTMapper(),
Expand All @@ -200,7 +204,6 @@ func main() {
acg, err := action.NewWrappedActionClientGetter(cfgGetter,
helmclient.WithFailureRollbacks(false),
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: change in whitespace

if err != nil {
setupLog.Error(err, "unable to create helm client")
os.Exit(1)
Expand Down Expand Up @@ -275,6 +278,7 @@ func main() {
Finalizers: clusterExtensionFinalizers,
CaCertPool: certPool,
Preflights: preflights,
Watcher: contentmanager.New(restConfigMapper, mgr.GetConfig(), mgr.GetRESTMapper()),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterExtension")
os.Exit(1)
Expand Down
6 changes: 0 additions & 6 deletions config/base/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,6 @@ kind: ClusterRole
metadata:
name: manager-role
rules:
- apiGroups:
- '*'
resources:
- '*'
verbs:
- '*'
- apiGroups:
- apiextensions.k8s.io
resources:
Expand Down
55 changes: 52 additions & 3 deletions hack/test/pre-upgrade-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,60 @@ metadata:
name: upgrade-e2e
rules:
- apiGroups:
- "*"
- ""
resources:
- "*"
- "secrets"
- "services"
- "serviceaccounts"
verbs:
- "*"
- "create"
- "update"
- "patch"
- "delete"
- "get"
- "list"
- "watch"
- apiGroups:
- "apps"
resources:
- "deployments"
verbs:
- "create"
- "update"
- "patch"
- "delete"
- "get"
- "list"
- "watch"
- apiGroups:
- "apiextensions.k8s.io"
resources:
- "customresourcedefinitions"
verbs:
- "create"
- "update"
- "patch"
- "delete"
- "get"
- "list"
- "watch"
- apiGroups:
- "rbac.authorization.k8s.io"
resources:
- "clusterroles"
- "clusterrolebindings"
- "roles"
- "rolebindings"
verbs:
- "create"
- "update"
- "patch"
- "delete"
- "get"
- "list"
- "watch"
- "bind"
- "escalate"
EOF

kubectl apply -f - <<EOF
Expand Down
6 changes: 6 additions & 0 deletions internal/authentication/tokengetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,9 @@ func (t *TokenGetter) reapExpiredTokens() {
}
}
}

func (t *TokenGetter) Delete(key types.NamespacedName) {
t.mu.Lock()
defer t.mu.Unlock()
delete(t.tokens, key)
}
38 changes: 38 additions & 0 deletions internal/authentication/tripper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package authentication

import (
"fmt"
"net/http"

"k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net"
)

var _ http.RoundTripper = (*TokenInjectingRoundTripper)(nil)

type TokenInjectingRoundTripper struct {
Tripper http.RoundTripper
TokenGetter *TokenGetter
Key types.NamespacedName
}

func (tt *TokenInjectingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
resp, err := tt.do(req)
if resp != nil && resp.StatusCode == http.StatusUnauthorized {
tt.TokenGetter.Delete(tt.Key)
resp, err = tt.do(req)
}
return resp, err
}

func (tt *TokenInjectingRoundTripper) do(req *http.Request) (*http.Response, error) {
reqClone := utilnet.CloneRequest(req)
token, err := tt.TokenGetter.Get(reqClone.Context(), tt.Key)
if err != nil {
return nil, err
}

// Always set the Authorization header to our retrieved token
reqClone.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
return tt.Tripper.RoundTrip(reqClone)
}
9 changes: 9 additions & 0 deletions internal/contentmanager/contentmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/operator-framework/operator-controller/api/v1alpha1"
Expand Down Expand Up @@ -156,7 +158,14 @@ func (i *instance) Watch(ctx context.Context, ctrl controller.Controller, ce *v1
scheme,
i.mapper,
ce,
handler.OnlyControllerOwner(),
),
predicate.Funcs{
CreateFunc: func(tce event.TypedCreateEvent[client.Object]) bool { return false },
UpdateFunc: func(tue event.TypedUpdateEvent[client.Object]) bool { return true },
DeleteFunc: func(tde event.TypedDeleteEvent[client.Object]) bool { return true },
GenericFunc: func(tge event.TypedGenericEvent[client.Object]) bool { return true },
},
),
)
if err != nil {
Expand Down
45 changes: 9 additions & 36 deletions internal/controllers/clusterextension_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"fmt"
"io"
"strings"
"sync"
"time"

"github.com/go-logr/logr"
Expand All @@ -39,7 +38,6 @@ import (
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
apimachyaml "k8s.io/apimachinery/pkg/util/yaml"
Expand All @@ -54,7 +52,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/operator-framework/api/pkg/operators/v1alpha1"
catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1"
Expand All @@ -65,6 +62,7 @@ import (
ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1"
"github.com/operator-framework/operator-controller/internal/bundleutil"
"github.com/operator-framework/operator-controller/internal/conditionsets"
"github.com/operator-framework/operator-controller/internal/contentmanager"
"github.com/operator-framework/operator-controller/internal/labels"
"github.com/operator-framework/operator-controller/internal/resolve"
"github.com/operator-framework/operator-controller/internal/rukpak/convert"
Expand All @@ -83,8 +81,7 @@ type ClusterExtensionReconciler struct {
Resolver resolve.Resolver
Unpacker rukpaksource.Unpacker
ActionClientGetter helmclient.ActionClientGetter
dynamicWatchMutex sync.RWMutex
dynamicWatchGVKs sets.Set[schema.GroupVersionKind]
Watcher contentmanager.Watcher
controller crcontroller.Controller
cache cache.Cache
InstalledBundleGetter InstalledBundleGetter
Expand Down Expand Up @@ -119,9 +116,6 @@ type Preflight interface {
//+kubebuilder:rbac:groups=core,resources=serviceaccounts/token,verbs=create
//+kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get

// TODO: Remove these permissions as part of resolving https://github.com/operator-framework/operator-controller/issues/975
//+kubebuilder:rbac:groups=*,resources=*,verbs=*

//+kubebuilder:rbac:groups=catalogd.operatorframework.io,resources=clustercatalogs,verbs=list;watch
//+kubebuilder:rbac:groups=catalogd.operatorframework.io,resources=catalogmetadata,verbs=list;watch

Expand All @@ -134,7 +128,7 @@ func (r *ClusterExtensionReconciler) Reconcile(ctx context.Context, req ctrl.Req
l.V(1).Info("reconcile starting")
defer l.V(1).Info("reconcile ending")

var existingExt = &ocv1alpha1.ClusterExtension{}
existingExt := &ocv1alpha1.ClusterExtension{}
if err := r.Client.Get(ctx, req.NamespacedName, existingExt); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
Expand Down Expand Up @@ -403,36 +397,17 @@ func (r *ClusterExtensionReconciler) reconcile(ctx context.Context, ext *ocv1alp
return ctrl.Result{}, err
}

for _, obj := range relObjects {
if err := func() error {
r.dynamicWatchMutex.Lock()
defer r.dynamicWatchMutex.Unlock()

_, isWatched := r.dynamicWatchGVKs[obj.GetObjectKind().GroupVersionKind()]
if !isWatched {
if err := r.controller.Watch(
source.Kind(r.cache,
obj,
crhandler.EnqueueRequestForOwner(r.Scheme(), r.RESTMapper(), ext, crhandler.OnlyControllerOwner()),
predicate.Funcs{
CreateFunc: func(ce event.CreateEvent) bool { return false },
UpdateFunc: func(ue event.UpdateEvent) bool { return true },
DeleteFunc: func(de event.DeleteEvent) bool { return true },
GenericFunc: func(ge event.GenericEvent) bool { return true },
},
),
); err != nil {
return err
}
r.dynamicWatchGVKs[obj.GetObjectKind().GroupVersionKind()] = sets.Empty{}
}
return nil
}(); err != nil {
// Only attempt to watch resources if we are
// installing / upgrading. Otherwise we may restart
// watches that have already been established
if state != stateUnchanged {
if err := r.Watcher.Watch(ctx, r.controller, ext, relObjects); err != nil {
ext.Status.InstalledBundle = nil
setInstalledStatusConditionFailed(ext, fmt.Sprintf("%s:%v", ocv1alpha1.ReasonInstallationFailed, err))
return ctrl.Result{}, err
}
}

ext.Status.InstalledBundle = bundleutil.MetadataFor(resolvedBundle.Name, *resolvedBundleVersion)
setInstalledStatusConditionSuccess(ext, fmt.Sprintf("Installed bundle %s successfully", resolvedBundle.Image))

Expand Down Expand Up @@ -533,13 +508,11 @@ func (r *ClusterExtensionReconciler) SetupWithManager(mgr ctrl.Manager) error {
},
})).
Build(r)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: whitespace-only change

if err != nil {
return err
}
r.controller = controller
r.cache = mgr.GetCache()
r.dynamicWatchGVKs = sets.New[schema.GroupVersionKind]()

return nil
}
Expand Down
Loading