Skip to content
This repository was archived by the owner on Jan 28, 2025. It is now read-only.

Conversation

@ankitathomas
Copy link
Contributor

An implementation of a RegistryEntitySource that watches CatalogSources on all namespaces. It maintains an internal cache

See #76

@ankitathomas ankitathomas requested a review from a team as a code owner January 17, 2023 18:17
@openshift-ci openshift-ci bot added the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Jan 17, 2023
@ankitathomas ankitathomas force-pushed the watchclient branch 2 times, most recently from 3eb56a3 to 65fc0d5 Compare January 17, 2023 18:25
@ankitathomas ankitathomas changed the title [WIP] EntitySource implementation for the v1 registry client EntitySource implementation for the v1 registry client Feb 14, 2023
@openshift-ci openshift-ci bot removed the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Feb 14, 2023
@ankitathomas ankitathomas force-pushed the watchclient branch 4 times, most recently from 229c10e to 636c6e6 Compare February 15, 2023 21:02
SkipRange string `json:"skipRange,omitempty"`
}

func entityFromBundle(catsrcID string, bundle *catalogsourceapi.Bundle) (*input.Entity, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might make sense to break this function down into sub-functions to make it a bit smaller and more readable. You may also want to make a type declaration for the key of the property map, e.g. type NamespacedName string or some such.

func (r *CachedRegistryEntitySource) StopCache() {
r.Mutex.Lock()
defer r.Mutex.Unlock()
r.done <- struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can just close(r.done)

"github.com/operator-framework/deppy/pkg/lib/grpc"
)

type RegistryClient interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we straight up use the registry client coming from operator-registry?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh nvm - we do

func TestDeppy(t *testing.T) {
RegisterFailHandler(Fail)

RunSpecs(t, "Deppy Suite")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be "RegistryQuerier Suite" or something like that?

}
}

for _, prvAPI := range bundle.ProvidedApis {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to break these steps into their own functions to make this function a bit smaller and more readable. wdyt?

}

// Since multiple instances of bundle may exist for different channels, entityID must include reference to channel
entityIDFromBundle := func(catsrcID string, bundle *catalogsourceapi.Bundle) deppy.Identifier {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just push this out into a proper function rather than an inline one?

}
}

func NewCachedRegistryQuerier(client client.WithWatch, rClient RegistryClient, logger *logr.Logger, options ...Option) *CachedRegistryEntitySource {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could the registry client be internal to this struct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might also be worth adding the logger as an option with zap.New() as the default value.
if you want to allow for custom registry client implementations, it might also be worth moving this to an option and using yours as the default.

r.logger.Info("Completed cache update", "catalogSource", catalogSourceKey)
}

func (r *CachedRegistryEntitySource) Get(ctx context.Context, id deppy.Identifier) *input.Entity {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than iterating over the whole cache to find the item, could you break out the catalog source component of the key and go directly to the right map?

return nil
}

func (r *CachedRegistryEntitySource) Filter(ctx context.Context, filter input.Predicate) (input.EntityList, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could re-use Iterate here to reduce /reuse some of the code

return resultSet, nil
}

func (r *CachedRegistryEntitySource) GroupBy(ctx context.Context, fn input.GroupByFunction) (input.EntityListMap, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here - you could re-use Iterate

r.logger.Error(err, "failed to start catalogsource watch")
return
}
if err := r.populate(ctx); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should have a client-go style WaitForCacheSync method, here's an example:

// stop signal for the informer
stopper := make(chan struct{})
defer close(stopper)

informer := podInformer.Informer()

// start informer ->
go factory.Start(stopper)

// start to sync and call list
if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
    runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
    return
}


switch entry.Type {
case watch.Deleted:
func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say move this to its own function, wdyt?

case <-r.done:
return
case entry := <-catalogSourceWatch.ResultChan():
encodedObj, err := json.Marshal(entry.Object)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another option might to be just cast it to a *v1alpha1.CatalogSource, e.g.

catalogsource, ok := entry.Object.(*v1alpha1.CatalogSource)

}
// watching catalogSource for changes works only with OLM managed catalogSources.
go r.ProcessQueue(ctx)
for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth wrapping this into its own function and using adding defer r.Stop() + any other shutdown bits

func() {
r.RWMutex.Lock()
defer r.RWMutex.Unlock()
catalogSourceKey := types.NamespacedName{Namespace: catalogSource.Namespace, Name: catalogSource.Name}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe push this to its own function. I think I've seen it around a couple of times in the code

r.logger.Info("Completed cache delete", "catalogSource", catalogSourceKey)
}()
case watch.Added, watch.Modified:
r.syncCatalogSource(ctx, catalogSource)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should consider using the workqueue to drive sync in a different go routine. When something is added or modified add that resource to the workqueue. This could also help us with error handling and retries. E.g. if there's a connection failure or transient issue, we can just re-enqueue the resource after some time.

careful with modified because it can happen frequently - e.g. everytime the connection state changes - we should only trigger a resync for certain changes to the resource. Maybe when the connection state is READY?

r.logger.Info("cannot find catalogSource, skipping cache update", "CatalogSource", item)
return
}
r.syncCatalogSource(ctx, catalogSource)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should catch any errors here and re-enqueue the catalog if it they happen. We may want to understand how this work queue works so we can have an exponential backoff. It may also be worth emitting a kube event that says that the cache couldn't be sync'ed for a particular catalog and that it will retry in so many seconds.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can also do the check for whether the catalog is managed or not and re-enqueue here

}
r.cache[catalogSourceKey.String()] = sourceCache{
Items: entities,
//imageID: imageID,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this comment still valid?

}
return
}
r.cache[catalogSourceKey.String()] = sourceCache{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice - change the whole entry at once - makes it atomic ^^

const defaultCatalogSourceSyncInterval = 5 * time.Minute

type CachedRegistryEntitySource struct {
sync.RWMutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come this is exported rather than an internal variable? is this a Go pattern?

func (r *registryGRPCClient) ListEntities(ctx context.Context, catalogSource *v1alpha1.CatalogSource) ([]*input.Entity, error) {
// TODO: create GRPC connections separately
conn, err := grpc.ConnectWithTimeout(ctx, catalogSource.Address(), r.timeout)
if conn != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you could change this to

if err != nil {
  return ...
}
defer conn.Close()

catalogPackages[packageKey] = pkg
}

entity, err := EntityFromBundle(catalogSourceID, pkg, bundle)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this function is in the same package we can get away with not exporting it - or did you export it to make testing easier?


entity, err := EntityFromBundle(catalogSourceID, pkg, bundle)
if err != nil {
return entities, fmt.Errorf("failed to parse entity %s: %v", entity.Identifier(), err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean to return partial results? it might be worth just returning nil

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/operator-framework/api/pkg/operators/v1alpha1"
api2 "github.com/operator-framework/operator-registry/pkg/api"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe registryapi rather than api2?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or registry

"olm.bundle.path": "quay.io/openshift-community-operators/prometheus@sha256:6fbd3eaa123054c5023323d1f9ab7cbea178087fcb7cb4f3e83872c6a88d39a1",
},
}}))
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth adding error test cases as well

@@ -0,0 +1,125 @@
package grpc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't need to be in this PR, but I think we should open a ticket and move all of this to the registry repo. It seems like it could be useful beyond this use-case

return grpc.Dial(address, dialOptions...)
}

func grpcProxyURL(addr string) (*url.URL, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like I've seen this code before. Is it from the olm repo? if we move this to operator-registry we should also update olm.

}

var _ = Describe("Registry EntitySource", func() {
var querier input.EntitySource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just make it a straigh up RegistryCacheQuerier so you don't need to cast it back in the AfterEach?

)

var _ = Describe("RegistryBundleConverter", func() {
It("generates entity from bundle", func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you may want to also poke at some of the error cases

Copy link
Contributor

@perdasilva perdasilva left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really good and like you've put a lot of effort into this. Thank you!
I think we should move all of this code to the operator-controller and remove the dependencies on olm and registry from deppy.
I've left a few comments that I hope will help ^^ Please let me know if you need any help or have any questions =D

@github-actions
Copy link

This PR has become stale because it has been open for 30 days with no activity. Please update this PR or remove the lifecycle/stale label before it is automatically closed in 30 days. Adding the lifecycle/frozen label will cause this PR to ignore lifecycle events.

@github-actions github-actions bot added the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Mar 26, 2023
@ankitathomas
Copy link
Contributor Author

Closing in favor of operator-framework/operator-controller#129

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants