Skip to content

Commit

Permalink
feat(controller): Reused existing workflow informer. Resolves argopro…
Browse files Browse the repository at this point in the history
…j#5202 (argoproj#5204)

Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec authored Feb 26, 2021
1 parent e5f3dc1 commit cb9676e
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 21 deletions.
2 changes: 1 addition & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (wfc *WorkflowController) runTTLController(ctx context.Context, workflowTTL
}

func (wfc *WorkflowController) runCronController(ctx context.Context) {
cronController := cron.NewCronController(wfc.wfclientset, wfc.dynamicInterface, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, wfc.metrics, wfc.eventRecorderManager)
cronController := cron.NewCronController(wfc.wfclientset, wfc.dynamicInterface, wfc.wfInformer, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, wfc.metrics, wfc.eventRecorderManager)
cronController.Run(ctx)
}

Expand Down
24 changes: 4 additions & 20 deletions workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
Expand All @@ -26,7 +25,6 @@ import (
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/events"
"github.com/argoproj/argo-workflows/v3/workflow/metrics"
"github.com/argoproj/argo-workflows/v3/workflow/util"
Expand All @@ -42,6 +40,7 @@ type Controller struct {
wfClientset versioned.Interface
wfLister util.WorkflowLister
wfQueue workqueue.RateLimitingInterface
wfInformer cache.SharedIndexInformer
cronWfInformer informers.GenericInformer
cronWfQueue workqueue.RateLimitingInterface
dynamicInterface dynamic.Interface
Expand All @@ -54,9 +53,10 @@ const (
cronWorkflowWorkers = 8
)

func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic.Interface, namespace string, managedNamespace string, instanceId string, metrics *metrics.Metrics, eventRecorderManager events.EventRecorderManager) *Controller {
func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic.Interface, wfInformer cache.SharedIndexInformer, namespace string, managedNamespace string, instanceId string, metrics *metrics.Metrics, eventRecorderManager events.EventRecorderManager) *Controller {
return &Controller{
wfClientset: wfclientset,
wfInformer: wfInformer,
namespace: namespace,
managedNamespace: managedNamespace,
instanceId: instanceId,
Expand All @@ -83,12 +83,7 @@ func (cc *Controller) Run(ctx context.Context) {
}).ForResource(schema.GroupVersionResource{Group: workflow.Group, Version: workflow.Version, Resource: workflow.CronWorkflowPlural})
cc.addCronWorkflowInformerHandler()

wfInformer := util.NewWorkflowInformer(cc.dynamicInterface, cc.managedNamespace, cronWorkflowResyncPeriod, func(options *v1.ListOptions) {
wfInformerListOptionsFunc(options, cc.instanceId)
}, cache.Indexers{})
go wfInformer.Run(ctx.Done())

cc.wfLister = util.NewWorkflowLister(wfInformer)
cc.wfLister = util.NewWorkflowLister(cc.wfInformer)

cc.cron.Start()
defer cc.cron.Stop()
Expand Down Expand Up @@ -273,14 +268,3 @@ func cronWfInformerListOptionsFunc(options *v1.ListOptions, instanceId string) {
labelSelector := labels.NewSelector().Add(util.InstanceIDRequirement(instanceId))
options.LabelSelector = labelSelector.String()
}

func wfInformerListOptionsFunc(options *v1.ListOptions, instanceId string) {
options.FieldSelector = fields.Everything().String()
isCronWorkflowChildReq, err := labels.NewRequirement(common.LabelKeyCronWorkflow, selection.Exists, []string{})
if err != nil {
panic(err)
}
labelSelector := labels.NewSelector().Add(*isCronWorkflowChildReq)
labelSelector = labelSelector.Add(util.InstanceIDRequirement(instanceId))
options.LabelSelector = labelSelector.String()
}

0 comments on commit cb9676e

Please sign in to comment.