- 
                Notifications
    You must be signed in to change notification settings 
- Fork 22
EntitySource implementation for the v1 registry client #84
Conversation
3eb56a3    to
    65fc0d5      
    Compare
  
    43feec0    to
    861ab51      
    Compare
  
    229c10e    to
    636c6e6      
    Compare
  
    636c6e6    to
    fe8cc76      
    Compare
  
    | SkipRange string `json:"skipRange,omitempty"` | ||
| } | ||
|  | ||
| func entityFromBundle(catsrcID string, bundle *catalogsourceapi.Bundle) (*input.Entity, error) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might make sense to break this function down into sub-functions to make it a bit smaller and more readable. You may also want to make a type declaration for the key of the property map, e.g. type NamespacedName string or some such.
| func (r *CachedRegistryEntitySource) StopCache() { | ||
| r.Mutex.Lock() | ||
| defer r.Mutex.Unlock() | ||
| r.done <- struct{}{} | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can just close(r.done)
| "github.com/operator-framework/deppy/pkg/lib/grpc" | ||
| ) | ||
|  | ||
| type RegistryClient interface { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we straight up use the registry client coming from operator-registry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh nvm - we do
| func TestDeppy(t *testing.T) { | ||
| RegisterFailHandler(Fail) | ||
|  | ||
| RunSpecs(t, "Deppy Suite") | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be "RegistryQuerier Suite" or something like that?
| } | ||
| } | ||
|  | ||
| for _, prvAPI := range bundle.ProvidedApis { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be good to break these steps into their own functions to make this function a bit smaller and more readable. wdyt?
| } | ||
|  | ||
| // Since multiple instances of bundle may exist for different channels, entityID must include reference to channel | ||
| entityIDFromBundle := func(catsrcID string, bundle *catalogsourceapi.Bundle) deppy.Identifier { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just push this out into a proper function rather than an inline one?
| } | ||
| } | ||
|  | ||
| func NewCachedRegistryQuerier(client client.WithWatch, rClient RegistryClient, logger *logr.Logger, options ...Option) *CachedRegistryEntitySource { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could the registry client be internal to this struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might also be worth adding the logger as an option with zap.New() as the default value.
if you want to allow for custom registry client implementations, it might also be worth moving this to an option and using yours as the default.
| r.logger.Info("Completed cache update", "catalogSource", catalogSourceKey) | ||
| } | ||
|  | ||
| func (r *CachedRegistryEntitySource) Get(ctx context.Context, id deppy.Identifier) *input.Entity { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rather than iterating over the whole cache to find the item, could you break out the catalog source component of the key and go directly to the right map?
| return nil | ||
| } | ||
|  | ||
| func (r *CachedRegistryEntitySource) Filter(ctx context.Context, filter input.Predicate) (input.EntityList, error) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could re-use Iterate here to reduce /reuse some of the code
| return resultSet, nil | ||
| } | ||
|  | ||
| func (r *CachedRegistryEntitySource) GroupBy(ctx context.Context, fn input.GroupByFunction) (input.EntityListMap, error) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here - you could re-use Iterate
| r.logger.Error(err, "failed to start catalogsource watch") | ||
| return | ||
| } | ||
| if err := r.populate(ctx); err != nil { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should have a client-go style WaitForCacheSync method, here's an example:
// stop signal for the informer
stopper := make(chan struct{})
defer close(stopper)
informer := podInformer.Informer()
// start informer ->
go factory.Start(stopper)
// start to sync and call list
if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
    runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
    return
}
|  | ||
| switch entry.Type { | ||
| case watch.Deleted: | ||
| func() { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say move this to its own function, wdyt?
| case <-r.done: | ||
| return | ||
| case entry := <-catalogSourceWatch.ResultChan(): | ||
| encodedObj, err := json.Marshal(entry.Object) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another option might to be just cast it to a *v1alpha1.CatalogSource, e.g.
catalogsource, ok := entry.Object.(*v1alpha1.CatalogSource)
| } | ||
| // watching catalogSource for changes works only with OLM managed catalogSources. | ||
| go r.ProcessQueue(ctx) | ||
| for { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be worth wrapping this into its own function and using adding defer r.Stop() + any other shutdown bits
| func() { | ||
| r.RWMutex.Lock() | ||
| defer r.RWMutex.Unlock() | ||
| catalogSourceKey := types.NamespacedName{Namespace: catalogSource.Namespace, Name: catalogSource.Name} | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe push this to its own function. I think I've seen it around a couple of times in the code
| r.logger.Info("Completed cache delete", "catalogSource", catalogSourceKey) | ||
| }() | ||
| case watch.Added, watch.Modified: | ||
| r.syncCatalogSource(ctx, catalogSource) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should consider using the workqueue to drive sync in a different go routine. When something is added or modified add that resource to the workqueue. This could also help us with error handling and retries. E.g. if there's a connection failure or transient issue, we can just re-enqueue the resource after some time.
careful with modified because it can happen frequently - e.g. everytime the connection state changes - we should only trigger a resync for certain changes to the resource. Maybe when the connection state is READY?
| r.logger.Info("cannot find catalogSource, skipping cache update", "CatalogSource", item) | ||
| return | ||
| } | ||
| r.syncCatalogSource(ctx, catalogSource) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should catch any errors here and re-enqueue the catalog if it they happen. We may want to understand how this work queue works so we can have an exponential backoff. It may also be worth emitting a kube event that says that the cache couldn't be sync'ed for a particular catalog and that it will retry in so many seconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can also do the check for whether the catalog is managed or not and re-enqueue here
| } | ||
| r.cache[catalogSourceKey.String()] = sourceCache{ | ||
| Items: entities, | ||
| //imageID: imageID, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this comment still valid?
| } | ||
| return | ||
| } | ||
| r.cache[catalogSourceKey.String()] = sourceCache{ | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice - change the whole entry at once - makes it atomic ^^
| const defaultCatalogSourceSyncInterval = 5 * time.Minute | ||
|  | ||
| type CachedRegistryEntitySource struct { | ||
| sync.RWMutex | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how come this is exported rather than an internal variable? is this a Go pattern?
| func (r *registryGRPCClient) ListEntities(ctx context.Context, catalogSource *v1alpha1.CatalogSource) ([]*input.Entity, error) { | ||
| // TODO: create GRPC connections separately | ||
| conn, err := grpc.ConnectWithTimeout(ctx, catalogSource.Address(), r.timeout) | ||
| if conn != nil { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you could change this to
if err != nil {
  return ...
}
defer conn.Close()
| catalogPackages[packageKey] = pkg | ||
| } | ||
|  | ||
| entity, err := EntityFromBundle(catalogSourceID, pkg, bundle) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this function is in the same package we can get away with not exporting it - or did you export it to make testing easier?
|  | ||
| entity, err := EntityFromBundle(catalogSourceID, pkg, bundle) | ||
| if err != nil { | ||
| return entities, fmt.Errorf("failed to parse entity %s: %v", entity.Identifier(), err) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean to return partial results? it might be worth just returning nil
| . "github.com/onsi/ginkgo/v2" | ||
| . "github.com/onsi/gomega" | ||
| "github.com/operator-framework/api/pkg/operators/v1alpha1" | ||
| api2 "github.com/operator-framework/operator-registry/pkg/api" | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe registryapi rather than api2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or registry
| "olm.bundle.path": "quay.io/openshift-community-operators/prometheus@sha256:6fbd3eaa123054c5023323d1f9ab7cbea178087fcb7cb4f3e83872c6a88d39a1", | ||
| }, | ||
| }})) | ||
| }) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be worth adding error test cases as well
| @@ -0,0 +1,125 @@ | |||
| package grpc | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't need to be in this PR, but I think we should open a ticket and move all of this to the registry repo. It seems like it could be useful beyond this use-case
| return grpc.Dial(address, dialOptions...) | ||
| } | ||
|  | ||
| func grpcProxyURL(addr string) (*url.URL, error) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like I've seen this code before. Is it from the olm repo? if we move this to operator-registry we should also update olm.
| } | ||
|  | ||
| var _ = Describe("Registry EntitySource", func() { | ||
| var querier input.EntitySource | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just make it a straigh up RegistryCacheQuerier so you don't need to cast it back in the AfterEach?
| ) | ||
|  | ||
| var _ = Describe("RegistryBundleConverter", func() { | ||
| It("generates entity from bundle", func() { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you may want to also poke at some of the error cases
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really good and like you've put a lot of effort into this. Thank you!
I think we should move all of this code to the operator-controller and remove the dependencies on olm and registry from deppy.
I've left a few comments that I hope will help ^^ Please let me know if you need any help or have any questions =D
| This PR has become stale because it has been open for 30 days with no activity. Please update this PR or remove the  | 
| Closing in favor of operator-framework/operator-controller#129 | 
An implementation of a RegistryEntitySource that watches CatalogSources on all namespaces. It maintains an internal cache
See #76