Skip to content

Commit

Permalink
Populate/update cache on ClusterCatalog reconcile (#1284)
Browse files Browse the repository at this point in the history
Currently we fetch catalog data and populate
cache on demand during ClusterExtension reconciliation.
This works but the first reconciliation after ClusterCatalog
creation or update is slow due to the need to fetch data.

With this change we proactively populate cache on ClusterCatalog
creation and check if cache needs to be updated
on ClusterCatalog update.

Signed-off-by: Mikalai Radchuk <mradchuk@redhat.com>
  • Loading branch information
m1kola authored Nov 1, 2024
1 parent cd0b096 commit a0f12e6
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 75 deletions.
5 changes: 3 additions & 2 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,9 @@ func main() {
}

if err = (&controllers.ClusterCatalogReconciler{
Client: cl,
Cache: catalogClientBackend,
Client: cl,
CatalogCache: catalogClientBackend,
CatalogCachePopulator: catalogClient,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCatalog")
os.Exit(1)
Expand Down
11 changes: 2 additions & 9 deletions internal/catalogmetadata/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,10 @@ func (c *Client) GetPackage(ctx context.Context, catalog *catalogd.ClusterCatalo

catalogFsys, err := c.cache.Get(catalog.Name, catalog.Status.ResolvedSource.Image.Ref)
if err != nil {
return nil, fmt.Errorf("error retrieving catalog cache: %v", err)
return nil, fmt.Errorf("error retrieving cache for catalog %q: %v", catalog.Name, err)
}
if catalogFsys == nil {
// TODO: https://github.com/operator-framework/operator-controller/pull/1284
// For now we are still populating cache (if absent) on-demand,
// but we might end up just returning a "cache not found" error here
// once we implement cache population in the controller.
catalogFsys, err = c.PopulateCache(ctx, catalog)
if err != nil {
return nil, fmt.Errorf("error fetching catalog contents: %v", err)
}
return nil, fmt.Errorf("cache for catalog %q not found", catalog.Name)
}

pkgFsys, err := fs.Sub(catalogFsys, pkgName)
Expand Down
19 changes: 2 additions & 17 deletions internal/catalogmetadata/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestClientGetPackage(t *testing.T) {
catalog: defaultCatalog,
cache: &fakeCache{getErr: errors.New("fetch error")},
assert: func(t *testing.T, dc *declcfg.DeclarativeConfig, err error) {
assert.ErrorContains(t, err, `error retrieving catalog cache`)
assert.ErrorContains(t, err, `error retrieving cache for catalog "catalog-1"`)
},
},
{
Expand Down Expand Up @@ -114,18 +114,7 @@ func TestClientGetPackage(t *testing.T) {
return testFS, nil
}},
assert: func(t *testing.T, fbc *declcfg.DeclarativeConfig, err error) {
require.NoError(t, err)
assert.Equal(t, &declcfg.DeclarativeConfig{Packages: []declcfg.Package{{Schema: declcfg.SchemaPackage, Name: "pkg-present"}}}, fbc)
},
},
{
name: "cache unpopulated and fails to populate",
catalog: defaultCatalog,
pkgName: "pkg-present",
cache: &fakeCache{putErr: errors.New("fake cache put error")},
assert: func(t *testing.T, fbc *declcfg.DeclarativeConfig, err error) {
assert.Nil(t, fbc)
assert.ErrorContains(t, err, "error fetching catalog contents")
assert.ErrorContains(t, err, `cache for catalog "catalog-1" not found`)
},
},
} {
Expand Down Expand Up @@ -278,7 +267,6 @@ type fakeCache struct {
getErr error

putFunc func(source string, errToCache error) (fs.FS, error)
putErr error
}

func (c *fakeCache) Get(catalogName, resolvedRef string) (fs.FS, error) {
Expand All @@ -293,9 +281,6 @@ func (c *fakeCache) Put(catalogName, resolvedRef string, source io.Reader, errTo
}
return c.putFunc(buf.String(), errToCache)
}
if c.putErr != nil {
return nil, c.putErr
}

return nil, errors.New("unexpected error")
}
Expand Down
58 changes: 38 additions & 20 deletions internal/controllers/clustercatalog_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,30 @@ package controllers

import (
"context"
"fmt"
"io/fs"

apierrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1"
)

type CatalogCacheRemover interface {
type CatalogCache interface {
Get(catalogName, resolvedRef string) (fs.FS, error)
Remove(catalogName string) error
}

type CatalogCachePopulator interface {
PopulateCache(ctx context.Context, catalog *catalogd.ClusterCatalog) (fs.FS, error)
}

// ClusterCatalogReconciler reconciles a ClusterCatalog object
type ClusterCatalogReconciler struct {
client.Client
Cache CatalogCacheRemover
CatalogCache CatalogCache
CatalogCachePopulator CatalogCachePopulator
}

//+kubebuilder:rbac:groups=olm.operatorframework.io,resources=clustercatalogs,verbs=get;list;watch
Expand All @@ -45,31 +50,44 @@ func (r *ClusterCatalogReconciler) Reconcile(ctx context.Context, req ctrl.Reque
existingCatalog := &catalogd.ClusterCatalog{}
err := r.Client.Get(ctx, req.NamespacedName, existingCatalog)
if apierrors.IsNotFound(err) {
return ctrl.Result{}, r.Cache.Remove(req.Name)
if err := r.CatalogCache.Remove(req.Name); err != nil {
return ctrl.Result{}, fmt.Errorf("error removing cache for catalog %q: %v", req.Name, err)
}
return ctrl.Result{}, nil
}
if err != nil {
return ctrl.Result{}, err
}

if existingCatalog.Status.ResolvedSource == nil ||
existingCatalog.Status.ResolvedSource.Image == nil ||
existingCatalog.Status.ResolvedSource.Image.Ref == "" {
// Reference is not known yet - skip cache population with no error.
// Once the reference is resolved another reconcile cycle
// will be triggered and we will progress further.
return ctrl.Result{}, nil
}

catalogFsys, err := r.CatalogCache.Get(existingCatalog.Name, existingCatalog.Status.ResolvedSource.Image.Ref)
if err != nil {
return ctrl.Result{}, fmt.Errorf("error retrieving cache for catalog %q: %v", existingCatalog.Name, err)
}
if catalogFsys != nil {
// Cache already exists so we do not need to populate it
return ctrl.Result{}, nil
}

if _, err = r.CatalogCachePopulator.PopulateCache(ctx, existingCatalog); err != nil {
return ctrl.Result{}, fmt.Errorf("error populating cache for catalog %q: %v", existingCatalog.Name, err)
}

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *ClusterCatalogReconciler) SetupWithManager(mgr ctrl.Manager) error {
_, err := ctrl.NewControllerManagedBy(mgr).
For(&catalogd.ClusterCatalog{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return false
},
UpdateFunc: func(e event.UpdateEvent) bool {
return false
},
DeleteFunc: func(e event.DeleteEvent) bool {
return true
},
GenericFunc: func(e event.GenericEvent) bool {
return false
},
})).
For(&catalogd.ClusterCatalog{}).
Build(r)

return err
Expand Down
Loading

0 comments on commit a0f12e6

Please sign in to comment.