From ff0e4231e8b2dd8bc00b7f3992c4e410b14b79fc Mon Sep 17 00:00:00 2001 From: joshvanl Date: Tue, 8 Oct 2024 16:19:11 +0100 Subject: [PATCH] Adds informer waits for scheduler namespace informer Signed-off-by: joshvanl --- .../process/kubernetes/informer/informer.go | 60 ++++++++++++++++++- .../suite/daprd/jobs/kubernetes/namespace.go | 2 +- .../suite/scheduler/kubernetes/namespace.go | 27 ++++----- 3 files changed, 71 insertions(+), 18 deletions(-) diff --git a/tests/integration/framework/process/kubernetes/informer/informer.go b/tests/integration/framework/process/kubernetes/informer/informer.go index b6fb5c12807..e4a0ac70e8c 100644 --- a/tests/integration/framework/process/kubernetes/informer/informer.go +++ b/tests/integration/framework/process/kubernetes/informer/informer.go @@ -14,11 +14,15 @@ limitations under the License. package informer import ( + "bytes" + "context" "encoding/json" + "math/rand/v2" "net/http" "strings" "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -41,11 +45,14 @@ import ( type Informer struct { lock sync.Mutex active map[string][][]byte + + informed map[uint64]chan *metav1.WatchEvent } func New() *Informer { return &Informer{ - active: make(map[string][][]byte), + active: make(map[string][][]byte), + informed: make(map[uint64]chan *metav1.WatchEvent), } } @@ -89,8 +96,18 @@ func (i *Informer) Handler(t *testing.T, wrapped http.Handler) http.HandlerFunc w.WriteHeader(http.StatusOK) if len(i.active[gvk.String()]) > 0 { + var event metav1.WatchEvent + assert.NoError(t, json.Unmarshal(i.active[gvk.String()][0], &event)) w.Write(i.active[gvk.String()][0]) i.active[gvk.String()] = i.active[gvk.String()][1:] + + for _, ch := range i.informed { + select { + case ch <- &event: + case <-time.After(3 * time.Second): + t.Errorf("failed to send informed event to subscriber") + } + } } w.(http.Flusher).Flush() } @@ -111,6 +128,47 @@ func (i *Informer) Delete(t *testing.T, obj runtime.Object) { i.inform(t, obj, string(watch.Deleted)) } +func (i *Informer) DeleteWait(t *testing.T, ctx context.Context, obj runtime.Object) { + t.Helper() + + i.lock.Lock() + //nolint:gosec + ui := rand.Uint64() + ch := make(chan *metav1.WatchEvent) + i.informed[ui] = ch + i.lock.Unlock() + + defer func() { + i.lock.Lock() + close(ch) + delete(i.informed, ui) + i.lock.Unlock() + }() + + i.Delete(t, obj) + + exp, err := json.Marshal(obj) + require.NoError(t, err) + + for { + select { + case <-ctx.Done(): + assert.Fail(t, "failed to wait for delete event to occur") + return + case e := <-ch: + if e.Type != string(watch.Deleted) { + continue + } + + if !bytes.Equal(exp, e.Object.Raw) { + continue + } + + return + } + } +} + func (i *Informer) inform(t *testing.T, obj runtime.Object, event string) { t.Helper() i.lock.Lock() diff --git a/tests/integration/suite/daprd/jobs/kubernetes/namespace.go b/tests/integration/suite/daprd/jobs/kubernetes/namespace.go index b9843420a6d..37400838ee4 100644 --- a/tests/integration/suite/daprd/jobs/kubernetes/namespace.go +++ b/tests/integration/suite/daprd/jobs/kubernetes/namespace.go @@ -139,7 +139,7 @@ func (n *namespace) Run(t *testing.T, ctx context.Context) { assert.Len(c, resp.Kvs, 2) }, time.Second*20, 10*time.Millisecond) - n.kubeapi.Informer().Delete(t, &corev1.Namespace{ + n.kubeapi.Informer().DeleteWait(t, ctx, &corev1.Namespace{ TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Namespace"}, ObjectMeta: metav1.ObjectMeta{Name: "default"}, }) diff --git a/tests/integration/suite/scheduler/kubernetes/namespace.go b/tests/integration/suite/scheduler/kubernetes/namespace.go index 2cdc9e985c3..ac1814cb4f0 100644 --- a/tests/integration/suite/scheduler/kubernetes/namespace.go +++ b/tests/integration/suite/scheduler/kubernetes/namespace.go @@ -27,7 +27,6 @@ import ( schedulerv1pb "github.com/dapr/dapr/pkg/proto/scheduler/v1" "github.com/dapr/dapr/tests/integration/framework" "github.com/dapr/dapr/tests/integration/framework/process/kubernetes" - "github.com/dapr/dapr/tests/integration/framework/process/kubernetes/store" "github.com/dapr/dapr/tests/integration/framework/process/scheduler" "github.com/dapr/dapr/tests/integration/framework/process/sentry" "github.com/dapr/dapr/tests/integration/suite" @@ -41,33 +40,29 @@ func init() { type namespace struct { sentry *sentry.Sentry scheduler *scheduler.Scheduler - store *store.Store + kubeapi *kubernetes.Kubernetes } func (n *namespace) Setup(t *testing.T) []framework.Option { n.sentry = sentry.New(t) - n.store = store.New(metav1.GroupVersionKind{ - Version: "v1", - Kind: "Namespace", - }) - n.store.Add(&corev1.Namespace{ - TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Namespace"}, - ObjectMeta: metav1.ObjectMeta{Name: "default"}, - }) - - kubeapi := kubernetes.New(t, - kubernetes.WithClusterNamespaceListFromStore(t, n.store), + n.kubeapi = kubernetes.New(t, + kubernetes.WithClusterNamespaceList(t, &corev1.NamespaceList{ + Items: []corev1.Namespace{{ + TypeMeta: metav1.TypeMeta{Kind: "Namespace", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{Name: "default"}, + }}, + }), ) n.scheduler = scheduler.New(t, scheduler.WithSentry(n.sentry), - scheduler.WithKubeconfig(kubeapi.KubeconfigPath(t)), + scheduler.WithKubeconfig(n.kubeapi.KubeconfigPath(t)), scheduler.WithMode("kubernetes"), ) return []framework.Option{ - framework.WithProcesses(n.sentry, kubeapi, n.scheduler), + framework.WithProcesses(n.sentry, n.kubeapi, n.scheduler), } } @@ -114,7 +109,7 @@ func (n *namespace) Run(t *testing.T, ctx context.Context) { assert.Len(c, resp.Kvs, 2) }, time.Second*10, 10*time.Millisecond) - n.store.Delete(&corev1.Namespace{ + n.kubeapi.Informer().DeleteWait(t, ctx, &corev1.Namespace{ TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Namespace"}, ObjectMeta: metav1.ObjectMeta{Name: "default"}, })