Skip to content

Commit

Permalink
Adds informer waits for scheduler namespace informer
Browse files Browse the repository at this point in the history
Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL committed Oct 16, 2024
1 parent 28d4927 commit ff0e423
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ limitations under the License.
package informer

import (
"bytes"
"context"
"encoding/json"
"math/rand/v2"
"net/http"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -41,11 +45,14 @@ import (
type Informer struct {
lock sync.Mutex
active map[string][][]byte

informed map[uint64]chan *metav1.WatchEvent
}

func New() *Informer {
return &Informer{
active: make(map[string][][]byte),
active: make(map[string][][]byte),
informed: make(map[uint64]chan *metav1.WatchEvent),
}
}

Expand Down Expand Up @@ -89,8 +96,18 @@ func (i *Informer) Handler(t *testing.T, wrapped http.Handler) http.HandlerFunc
w.WriteHeader(http.StatusOK)

if len(i.active[gvk.String()]) > 0 {
var event metav1.WatchEvent
assert.NoError(t, json.Unmarshal(i.active[gvk.String()][0], &event))
w.Write(i.active[gvk.String()][0])
i.active[gvk.String()] = i.active[gvk.String()][1:]

for _, ch := range i.informed {
select {
case ch <- &event:
case <-time.After(3 * time.Second):
t.Errorf("failed to send informed event to subscriber")
}
}
}
w.(http.Flusher).Flush()
}
Expand All @@ -111,6 +128,47 @@ func (i *Informer) Delete(t *testing.T, obj runtime.Object) {
i.inform(t, obj, string(watch.Deleted))
}

func (i *Informer) DeleteWait(t *testing.T, ctx context.Context, obj runtime.Object) {
t.Helper()

i.lock.Lock()
//nolint:gosec
ui := rand.Uint64()
ch := make(chan *metav1.WatchEvent)
i.informed[ui] = ch
i.lock.Unlock()

defer func() {
i.lock.Lock()
close(ch)
delete(i.informed, ui)
i.lock.Unlock()
}()

i.Delete(t, obj)

exp, err := json.Marshal(obj)
require.NoError(t, err)

for {
select {
case <-ctx.Done():
assert.Fail(t, "failed to wait for delete event to occur")
return
case e := <-ch:
if e.Type != string(watch.Deleted) {
continue
}

if !bytes.Equal(exp, e.Object.Raw) {
continue
}

return
}
}
}

func (i *Informer) inform(t *testing.T, obj runtime.Object, event string) {
t.Helper()
i.lock.Lock()
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/suite/daprd/jobs/kubernetes/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (n *namespace) Run(t *testing.T, ctx context.Context) {
assert.Len(c, resp.Kvs, 2)
}, time.Second*20, 10*time.Millisecond)

n.kubeapi.Informer().Delete(t, &corev1.Namespace{
n.kubeapi.Informer().DeleteWait(t, ctx, &corev1.Namespace{
TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Namespace"},
ObjectMeta: metav1.ObjectMeta{Name: "default"},
})
Expand Down
27 changes: 11 additions & 16 deletions tests/integration/suite/scheduler/kubernetes/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
schedulerv1pb "github.com/dapr/dapr/pkg/proto/scheduler/v1"
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/process/kubernetes"
"github.com/dapr/dapr/tests/integration/framework/process/kubernetes/store"
"github.com/dapr/dapr/tests/integration/framework/process/scheduler"
"github.com/dapr/dapr/tests/integration/framework/process/sentry"
"github.com/dapr/dapr/tests/integration/suite"
Expand All @@ -41,33 +40,29 @@ func init() {
type namespace struct {
sentry *sentry.Sentry
scheduler *scheduler.Scheduler
store *store.Store
kubeapi *kubernetes.Kubernetes
}

func (n *namespace) Setup(t *testing.T) []framework.Option {
n.sentry = sentry.New(t)

n.store = store.New(metav1.GroupVersionKind{
Version: "v1",
Kind: "Namespace",
})
n.store.Add(&corev1.Namespace{
TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Namespace"},
ObjectMeta: metav1.ObjectMeta{Name: "default"},
})

kubeapi := kubernetes.New(t,
kubernetes.WithClusterNamespaceListFromStore(t, n.store),
n.kubeapi = kubernetes.New(t,
kubernetes.WithClusterNamespaceList(t, &corev1.NamespaceList{
Items: []corev1.Namespace{{
TypeMeta: metav1.TypeMeta{Kind: "Namespace", APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{Name: "default"},
}},
}),
)

n.scheduler = scheduler.New(t,
scheduler.WithSentry(n.sentry),
scheduler.WithKubeconfig(kubeapi.KubeconfigPath(t)),
scheduler.WithKubeconfig(n.kubeapi.KubeconfigPath(t)),
scheduler.WithMode("kubernetes"),
)

return []framework.Option{
framework.WithProcesses(n.sentry, kubeapi, n.scheduler),
framework.WithProcesses(n.sentry, n.kubeapi, n.scheduler),
}
}

Expand Down Expand Up @@ -114,7 +109,7 @@ func (n *namespace) Run(t *testing.T, ctx context.Context) {
assert.Len(c, resp.Kvs, 2)
}, time.Second*10, 10*time.Millisecond)

n.store.Delete(&corev1.Namespace{
n.kubeapi.Informer().DeleteWait(t, ctx, &corev1.Namespace{
TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Namespace"},
ObjectMeta: metav1.ObjectMeta{Name: "default"},
})
Expand Down

0 comments on commit ff0e423

Please sign in to comment.