diff --git a/go.mod b/go.mod index 0b9786844dc2..3dba3012fe1c 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,6 @@ require ( github.com/stretchr/testify v1.8.3 github.com/vektra/mockery/v2 v2.10.0 github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64 - go.uber.org/atomic v1.9.0 golang.org/x/net v0.12.0 golang.org/x/term v0.10.0 golang.org/x/text v0.11.0 @@ -163,6 +162,7 @@ require ( go.opentelemetry.io/otel/trace v1.10.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect + go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/crypto v0.11.0 // indirect diff --git a/pkg/util/worker.go b/pkg/util/worker.go index d873542aa901..d38c135737b1 100644 --- a/pkg/util/worker.go +++ b/pkg/util/worker.go @@ -12,15 +12,6 @@ import ( "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" ) -const ( - // maxRetries is the number of times a resource will be retried before it is dropped out of the queue. - // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times - // a resource is going to be re-queued: - // - // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s - maxRetries = 15 -) - // AsyncWorker maintains a rate limiting queue and the items in the queue will be reconciled by a "ReconcileFunc". // The item will be re-queued if "ReconcileFunc" returns an error, maximum re-queue times defined by "maxRetries" above, // after that the item will be discarded from the queue. @@ -113,21 +104,6 @@ func (w *asyncWorker) AddAfter(item interface{}, duration time.Duration) { w.queue.AddAfter(item, duration) } -func (w *asyncWorker) handleError(err error, key interface{}) { - if err == nil { - w.queue.Forget(key) - return - } - - if w.queue.NumRequeues(key) < maxRetries { - w.queue.AddRateLimited(key) - return - } - - klog.V(2).Infof("Dropping resource %q out of the queue: %v", key, err) - w.queue.Forget(key) -} - func (w *asyncWorker) worker() { key, quit := w.queue.Get() if quit { @@ -136,7 +112,12 @@ func (w *asyncWorker) worker() { defer w.queue.Done(key) err := w.reconcileFunc(key) - w.handleError(err, key) + if err != nil { + w.queue.AddRateLimited(key) + return + } + + w.queue.Forget(key) } func (w *asyncWorker) Run(workerNumber int, stopChan <-chan struct{}) { diff --git a/pkg/util/worker_test.go b/pkg/util/worker_test.go index ab631b44cf0d..da0860d2ea3d 100644 --- a/pkg/util/worker_test.go +++ b/pkg/util/worker_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "go.uber.org/atomic" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -147,44 +146,6 @@ func Test_asyncWorker_Run(t *testing.T) { } } -type asyncWorkerReconciler2 struct { - receivedTimes atomic.Int64 -} - -func (a *asyncWorkerReconciler2) ReconcileFunc(_ QueueKey) error { - a.receivedTimes.Inc() - return errors.New("always fail") -} - -func Test_asyncWorker_drop_resource(t *testing.T) { - const name = "fake_node" - const wantReceivedTimes = maxRetries + 1 - - reconcile := new(asyncWorkerReconciler2) - worker := newTestAsyncWorker(reconcile.ReconcileFunc) - - stopChan := make(chan struct{}) - defer close(stopChan) - - worker.Run(5, stopChan) - - worker.Add(name) - - err := assertUntil(20*time.Second, func() error { - receivedTimes := reconcile.receivedTimes.Load() - - if receivedTimes != wantReceivedTimes { - return fmt.Errorf("receivedTimes = %v, want = %v", receivedTimes, wantReceivedTimes) - } - - return nil - }) - - if err != nil { - t.Error(err.Error()) - } -} - // Block running assertion func periodically to check if condition match. // Fail if: maxDuration is reached // Success if: assertion return nil error