Skip to content

Commit

Permalink
fix: Mitigate the bug where items are re-added constantly to the work…
Browse files Browse the repository at this point in the history
…queue. argoproj#1193 (argoproj#1243)

This will prevent argo from hanging for up to 16 minutes at a time while processing a rollout.

Signed-off-by: Mark Robinson <mrobinson@plaid.com>
Signed-off-by: caoyang001 <caoyang001@foxmail.com>
  • Loading branch information
MarkSRobinson authored and caoyang001 committed Jun 12, 2021
1 parent 917dba7 commit 7ec525a
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 34 deletions.
4 changes: 3 additions & 1 deletion analysis/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"testing"
"time"

"github.com/argoproj/argo-rollouts/utils/queue"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/undefinedlabs/go-mpatch"
Expand Down Expand Up @@ -87,7 +89,7 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share
i := informers.NewSharedInformerFactory(f.client, resync())
k8sI := kubeinformers.NewSharedInformerFactory(f.kubeclient, resync())

analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AnalysisRuns")
analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "AnalysisRuns")

metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{
Addr: "localhost:8080",
Expand Down
12 changes: 7 additions & 5 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"os"
"time"

"github.com/argoproj/argo-rollouts/utils/queue"

"github.com/pkg/errors"
smiclientset "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -169,11 +171,11 @@ func NewManager(
K8SRequestProvider: k8sRequestProvider,
})

rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
experimentWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Experiments")
analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AnalysisRuns")
serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services")
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Ingresses")
rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
experimentWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Experiments")
analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "AnalysisRuns")
serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Services")
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Ingresses")

refResolver := rollout.NewInformerBasedWorkloadRefResolver(namespace, dynamicclientset, discoveryClient, rolloutWorkqueue, rolloutsInformer.Informer())

Expand Down
9 changes: 6 additions & 3 deletions experiments/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/undefinedlabs/go-mpatch"

"github.com/argoproj/argo-rollouts/utils/queue"

"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -316,8 +319,8 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share
i := informers.NewSharedInformerFactory(f.client, resync())
k8sI := kubeinformers.NewSharedInformerFactory(f.kubeclient, resync())

rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
experimentWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Experiments")
rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
experimentWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Experiments")

metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{
Addr: "localhost:8080",
Expand Down
6 changes: 4 additions & 2 deletions ingress/ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"sync"
"testing"

"github.com/argoproj/argo-rollouts/utils/queue"

"github.com/stretchr/testify/assert"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -63,8 +65,8 @@ func newFakeIngressController(ing *extensionsv1beta1.Ingress, rollout *v1alpha1.
i := informers.NewSharedInformerFactory(client, 0)
k8sI := kubeinformers.NewSharedInformerFactory(kubeclient, 0)

rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Ingresses")
rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Ingresses")

c := NewController(ControllerConfig{
Client: kubeclient,
Expand Down
4 changes: 3 additions & 1 deletion pkg/kubectl-argo-rollouts/viewcontroller/viewcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"reflect"
"time"

"github.com/argoproj/argo-rollouts/utils/queue"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -98,7 +100,7 @@ func newViewController(namespace string, name string, kubeClient kubernetes.Inte
rolloutLister: rolloutsInformerFactory.Argoproj().V1alpha1().Rollouts().Lister().Rollouts(namespace),
experimentLister: rolloutsInformerFactory.Argoproj().V1alpha1().Experiments().Lister().Experiments(namespace),
analysisRunLister: rolloutsInformerFactory.Argoproj().V1alpha1().AnalysisRuns().Lister().AnalysisRuns(namespace),
workqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
workqueue: workqueue.NewRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter()),
}

controller.cacheSyncs = append(controller.cacheSyncs,
Expand Down
4 changes: 4 additions & 0 deletions rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ func (c *Controller) syncHandler(key string) error {
resolveErr := c.refResolver.Resolve(r)
roCtx, err := c.newRolloutContext(r)
if err != nil {
logCtx.Errorf("newRolloutContext err %v", err)
return err
}
if resolveErr != nil {
Expand All @@ -388,6 +389,9 @@ func (c *Controller) syncHandler(key string) error {
if roCtx.newRollout != nil {
c.writeBackToInformer(roCtx.newRollout)
}
if err != nil {
logCtx.Errorf("roCtx.reconcile err %v", err)
}
return err
}

Expand Down
8 changes: 5 additions & 3 deletions rollout/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"testing"
"time"

"github.com/argoproj/argo-rollouts/utils/queue"

"github.com/ghodss/yaml"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -487,9 +489,9 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share
istioVirtualServiceInformer := dynamicInformerFactory.ForResource(istioutil.GetIstioVirtualServiceGVR()).Informer()
istioDestinationRuleInformer := dynamicInformerFactory.ForResource(istioutil.GetIstioDestinationRuleGVR()).Informer()

rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services")
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Ingresses")
rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(time.Millisecond, 10*time.Second), "Rollouts")
serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Services")
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Ingresses")

metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{
Addr: "localhost:8080",
Expand Down
4 changes: 3 additions & 1 deletion rollout/trafficrouting/istio/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"github.com/argoproj/argo-rollouts/utils/queue"

log "github.com/sirupsen/logrus"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -58,7 +60,7 @@ type IstioController struct {
func NewIstioController(cfg IstioControllerConfig) *IstioController {
c := IstioController{
IstioControllerConfig: cfg,
destinationRuleWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DestinationRules"),
destinationRuleWorkqueue: workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "DestinationRules"),
VirtualServiceLister: dynamiclister.New(cfg.VirtualServiceInformer.GetIndexer(), istioutil.GetIstioVirtualServiceGVR()),
DestinationRuleLister: dynamiclister.New(cfg.DestinationRuleInformer.GetIndexer(), istioutil.GetIstioDestinationRuleGVR()),
}
Expand Down
6 changes: 4 additions & 2 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package service
import (
"testing"

"github.com/argoproj/argo-rollouts/utils/queue"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -65,8 +67,8 @@ func newFakeServiceController(svc *corev1.Service, rollout *v1alpha1.Rollout) (*
i := informers.NewSharedInformerFactory(client, 0)
k8sI := kubeinformers.NewSharedInformerFactory(kubeclient, 0)

rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services")
rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Services")
metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{
Addr: "localhost:8080",
K8SRequestProvider: &metrics.K8sRequestsCountProvider{},
Expand Down
2 changes: 2 additions & 0 deletions utils/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ func processNextWorkItem(workqueue workqueue.RateLimitingInterface, objType stri
// Put the item back on
// the workqueue to handle any transient errors.
workqueue.AddRateLimited(key)

logCtx.Infof("%s syncHandler queue retries: %v : key \"%v\"", objType, workqueue.NumRequeues(key), key)
return err
}
// Finally, if no error occurs we Forget this item so it does not
Expand Down
34 changes: 18 additions & 16 deletions utils/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"
"time"

"github.com/argoproj/argo-rollouts/utils/queue"

"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -30,7 +32,7 @@ import (
)

func TestProcessNextWorkItemHandlePanic(t *testing.T) {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
q.Add("valid/key")

metricServer := metrics.NewMetricsServer(metrics.ServerConfig{
Expand All @@ -44,7 +46,7 @@ func TestProcessNextWorkItemHandlePanic(t *testing.T) {
}

func TestProcessNextWorkItemShutDownQueue(t *testing.T) {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
syncHandler := func(key string) error {
return nil
}
Expand All @@ -53,7 +55,7 @@ func TestProcessNextWorkItemShutDownQueue(t *testing.T) {
}

func TestProcessNextWorkItemNoTStringKey(t *testing.T) {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
q.Add(1)
syncHandler := func(key string) error {
return nil
Expand All @@ -62,7 +64,7 @@ func TestProcessNextWorkItemNoTStringKey(t *testing.T) {
}

func TestProcessNextWorkItemNoValidKey(t *testing.T) {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
q.Add("invalid.key")
syncHandler := func(key string) error {
return nil
Expand All @@ -71,7 +73,7 @@ func TestProcessNextWorkItemNoValidKey(t *testing.T) {
}

func TestProcessNextWorkItemNormalSync(t *testing.T) {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
q.Add("valid/key")
syncHandler := func(key string) error {
return nil
Expand All @@ -80,7 +82,7 @@ func TestProcessNextWorkItemNormalSync(t *testing.T) {
}

func TestProcessNextWorkItemSyncHandlerReturnError(t *testing.T) {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
q.Add("valid/key")
metricServer := metrics.NewMetricsServer(metrics.ServerConfig{
Addr: "localhost:8080",
Expand All @@ -93,7 +95,7 @@ func TestProcessNextWorkItemSyncHandlerReturnError(t *testing.T) {
}

func TestEnqueue(t *testing.T) {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
r := &v1alpha1.Rollout{
ObjectMeta: metav1.ObjectMeta{
Name: "testName",
Expand All @@ -105,13 +107,13 @@ func TestEnqueue(t *testing.T) {
}

func TestEnqueueInvalidObj(t *testing.T) {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
Enqueue(struct{}{}, q)
assert.Equal(t, 0, q.Len())
}

func TestEnqueueAfter(t *testing.T) {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
r := &v1alpha1.Rollout{
ObjectMeta: metav1.ObjectMeta{
Name: "testName",
Expand All @@ -125,21 +127,21 @@ func TestEnqueueAfter(t *testing.T) {
}

func TestEnqueueString(t *testing.T) {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
Enqueue("default/foo", q)
assert.Equal(t, 1, q.Len())
}

func TestEnqueueAfterInvalidObj(t *testing.T) {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
EnqueueAfter(struct{}{}, time.Duration(1), q)
assert.Equal(t, 0, q.Len())
time.Sleep(2 * time.Second)
assert.Equal(t, 0, q.Len())
}

func TestEnqueueRateLimited(t *testing.T) {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
r := &v1alpha1.Rollout{
ObjectMeta: metav1.ObjectMeta{
Name: "testName",
Expand All @@ -153,7 +155,7 @@ func TestEnqueueRateLimited(t *testing.T) {
}

func TestEnqueueRateLimitedInvalidObject(t *testing.T) {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
q := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
EnqueueRateLimited(struct{}{}, q)
assert.Equal(t, 0, q.Len())
time.Sleep(time.Second)
Expand Down Expand Up @@ -440,23 +442,23 @@ func TestProcessNextWatchObj(t *testing.T) {
})
indexer.Add(obj)
{
wq := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
wq := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
watchEvent := watch.Event{
Object: obj,
}
processNextWatchObj(watchEvent, wq, indexer, "testIndexer")
assert.Equal(t, 1, wq.Len())
}
{
wq := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
wq := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
watchEvent := watch.Event{
Object: obj,
}
processNextWatchObj(watchEvent, wq, indexer, "no-indexer")
assert.Equal(t, 0, wq.Len())
}
{
wq := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
wq := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
invalidWatchEvent := watch.Event{}
processNextWatchObj(invalidWatchEvent, wq, indexer, "testIndexer")
assert.Equal(t, 0, wq.Len())
Expand Down
13 changes: 13 additions & 0 deletions utils/queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package queue

import (
"time"

"k8s.io/client-go/util/workqueue"
)

// DefaultArgoRolloutsRateLimiter is the default queue rate limiter.
// Similar to workqueue.DefaultControllerRateLimiter() but the max limit is 10 seconds instead of 16 minutes
func DefaultArgoRolloutsRateLimiter() workqueue.RateLimiter {
return workqueue.NewItemExponentialFailureRateLimiter(time.Millisecond, 10*time.Second)
}

0 comments on commit 7ec525a

Please sign in to comment.