diff --git a/config/core/resources/apiserversource.yaml b/config/core/resources/apiserversource.yaml
index 8d78dcd3bd5..8938cb0a312 100644
--- a/config/core/resources/apiserversource.yaml
+++ b/config/core/resources/apiserversource.yaml
@@ -137,6 +137,32 @@ spec:
uri:
description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref.
type: string
+ namespaceSelector:
+ description: NamespaceSelector is a label selector to capture the namespaces that should be watched by the source.
+ type: object
+ properties:
+ matchExpressions:
+ description: matchExpressions is a list of label selector requirements. The requirements are ANDed.
+ type: array
+ items:
+ type: object
+ properties:
+ key:
+ description: key is the label key that the selector applies to.
+ type: string
+ operator:
+ description: operator represents a key's relationship to a set of values. Valid operators are In, NotIn, Exists and DoesNotExist.
+ type: string
+ values:
+ description: values is an array of string values. If the operator is In or NotIn, the values array must be non-empty. If the operator is Exists or DoesNotExist, the values array must be empty. This array is replaced during a strategic merge patch.
+ type: array
+ items:
+ type: string
+ matchLabels:
+ description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed.
+ type: object
+ x-kubernetes-preserve-unknown-fields: true
+
status:
type: object
properties:
@@ -190,6 +216,11 @@ spec:
sinkUri:
description: SinkURI is the current active sink URI that has been configured for the Source.
type: string
+ namespaces:
+ description: Namespaces show the namespaces currently watched by the ApiServerSource
+ type: array
+ items:
+ type: string
additionalPrinterColumns:
- name: Sink
type: string
diff --git a/docs/eventing-api.md b/docs/eventing-api.md
index 36c9165d1ed..c68ff38c26e 100644
--- a/docs/eventing-api.md
+++ b/docs/eventing-api.md
@@ -4410,6 +4410,21 @@ string
source. Defaults to default if not set.
+
+
+namespaceSelector
+
+
+Kubernetes meta/v1.LabelSelector
+
+
+ |
+
+(Optional)
+ NamespaceSelector is a label selector to capture the namespaces that
+should be watched by the source.
+ |
+
@@ -5002,6 +5017,21 @@ string
source. Defaults to default if not set.
+
+
+namespaceSelector
+
+
+Kubernetes meta/v1.LabelSelector
+
+
+ |
+
+(Optional)
+ NamespaceSelector is a label selector to capture the namespaces that
+should be watched by the source.
+ |
+
ApiServerSourceStatus
@@ -5042,6 +5072,17 @@ state.
Source.
+
+
+namespaces
+
+[]string
+
+ |
+
+ Namespaces show the namespaces currently watched by the ApiServerSource
+ |
+
ContainerSourceSpec
diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go
index 775301a0991..66609a03489 100644
--- a/pkg/adapter/apiserver/adapter.go
+++ b/pkg/adapter/apiserver/adapter.go
@@ -95,21 +95,25 @@ func (a *apiServerAdapter) start(ctx context.Context, stopCh <-chan struct{}) er
exists := false
for _, apires := range resources.APIResources {
if apires.Name == configRes.GVR.Resource {
-
- var res dynamic.ResourceInterface
- if apires.Namespaced {
- res = a.k8s.Resource(configRes.GVR).Namespace(a.config.Namespace)
+ var resources []dynamic.ResourceInterface
+ if apires.Namespaced && !a.config.AllNamespaces {
+ for _, ns := range a.config.Namespaces {
+ resources = append(resources, a.k8s.Resource(configRes.GVR).Namespace(ns))
+ }
} else {
- res = a.k8s.Resource(configRes.GVR)
+ resources = append(resources, a.k8s.Resource(configRes.GVR))
}
- lw := &cache.ListWatch{
- ListFunc: asUnstructuredLister(ctx, res.List, configRes.LabelSelector),
- WatchFunc: asUnstructuredWatcher(ctx, res.Watch, configRes.LabelSelector),
+ for _, res := range resources {
+ lw := &cache.ListWatch{
+ ListFunc: asUnstructuredLister(ctx, res.List, configRes.LabelSelector),
+ WatchFunc: asUnstructuredWatcher(ctx, res.Watch, configRes.LabelSelector),
+ }
+
+ reflector := cache.NewReflector(lw, &unstructured.Unstructured{}, delegate, resyncPeriod)
+ go reflector.Run(stop)
}
- reflector := cache.NewReflector(lw, &unstructured.Unstructured{}, delegate, resyncPeriod)
- go reflector.Run(stop)
exists = true
break
}
diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go
index 85524d28a1a..71b9a86f7cc 100644
--- a/pkg/adapter/apiserver/adapter_test.go
+++ b/pkg/adapter/apiserver/adapter_test.go
@@ -45,7 +45,7 @@ func TestAdapter_StartRef(t *testing.T) {
ce := adaptertest.NewTestClient()
config := Config{
- Namespace: "default",
+ Namespaces: []string{"default"},
Resources: []ResourceWatch{{
GVR: schema.GroupVersionResource{
Version: "v1",
@@ -93,7 +93,7 @@ func TestAdapter_StartResource(t *testing.T) {
ce := adaptertest.NewTestClient()
config := Config{
- Namespace: "default",
+ Namespaces: []string{"default"},
Resources: []ResourceWatch{{
GVR: schema.GroupVersionResource{
Version: "v1",
@@ -140,7 +140,7 @@ func TestAdapter_StartNonNamespacedResource(t *testing.T) {
ce := adaptertest.NewTestClient()
config := Config{
- Namespace: "default",
+ Namespaces: []string{"default"},
Resources: []ResourceWatch{{
GVR: schema.GroupVersionResource{
Version: "v1",
diff --git a/pkg/adapter/apiserver/config.go b/pkg/adapter/apiserver/config.go
index e3d383ff837..19fca01177a 100644
--- a/pkg/adapter/apiserver/config.go
+++ b/pkg/adapter/apiserver/config.go
@@ -31,9 +31,13 @@ type ResourceWatch struct {
}
type Config struct {
- // Namespace specifies the namespace that Resources[] exist.
+ // Namespaces specifies the namespaces where Resources[] exist.
// +required
- Namespace string `json:"namespace"`
+ Namespaces []string `json:"namespaces"`
+
+ // AllNamespaces indicates whether this source is watching all
+ // existing namespaces
+ AllNamespaces bool `json:"allNamespaces"`
// Resource is the resource this source will track and send related
// lifecycle events from the Kubernetes ApiServer.
diff --git a/pkg/apis/sources/v1/apiserver_types.go b/pkg/apis/sources/v1/apiserver_types.go
index b70516d49f9..cfe41a956b1 100644
--- a/pkg/apis/sources/v1/apiserver_types.go
+++ b/pkg/apis/sources/v1/apiserver_types.go
@@ -80,6 +80,11 @@ type ApiServerSourceSpec struct {
// source. Defaults to default if not set.
// +optional
ServiceAccountName string `json:"serviceAccountName,omitempty"`
+
+ // NamespaceSelector is a label selector to capture the namespaces that
+ // should be watched by the source.
+ // +optional
+ NamespaceSelector *metav1.LabelSelector `json:"namespaceSelector,omitempty"`
}
// ApiServerSourceStatus defines the observed state of ApiServerSource
@@ -92,6 +97,9 @@ type ApiServerSourceStatus struct {
// * SinkURI - the current active sink URI that has been configured for the
// Source.
duckv1.SourceStatus `json:",inline"`
+
+ // Namespaces show the namespaces currently watched by the ApiServerSource
+ Namespaces []string `json:"namespaces"`
}
// APIVersionKind is an APIVersion and Kind tuple.
diff --git a/pkg/apis/sources/v1/zz_generated.deepcopy.go b/pkg/apis/sources/v1/zz_generated.deepcopy.go
index fd7b8b7177d..551322eab40 100644
--- a/pkg/apis/sources/v1/zz_generated.deepcopy.go
+++ b/pkg/apis/sources/v1/zz_generated.deepcopy.go
@@ -140,6 +140,11 @@ func (in *ApiServerSourceSpec) DeepCopyInto(out *ApiServerSourceSpec) {
*out = new(APIVersionKind)
**out = **in
}
+ if in.NamespaceSelector != nil {
+ in, out := &in.NamespaceSelector, &out.NamespaceSelector
+ *out = new(metav1.LabelSelector)
+ (*in).DeepCopyInto(*out)
+ }
return
}
@@ -157,6 +162,11 @@ func (in *ApiServerSourceSpec) DeepCopy() *ApiServerSourceSpec {
func (in *ApiServerSourceStatus) DeepCopyInto(out *ApiServerSourceStatus) {
*out = *in
in.SourceStatus.DeepCopyInto(&out.SourceStatus)
+ if in.Namespaces != nil {
+ in, out := &in.Namespaces, &out.Namespaces
+ *out = make([]string, len(*in))
+ copy(*out, *in)
+ }
return
}
diff --git a/pkg/reconciler/apiserversource/apiserversource.go b/pkg/reconciler/apiserversource/apiserversource.go
index 36ae5b8df38..7964a4a5825 100644
--- a/pkg/reconciler/apiserversource/apiserversource.go
+++ b/pkg/reconciler/apiserversource/apiserversource.go
@@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
+ "sort"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
@@ -32,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
+ clientv1 "k8s.io/client-go/listers/core/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
@@ -67,7 +69,8 @@ type Reconciler struct {
ceSource string
sinkResolver *resolver.URIResolver
- configs reconcilersource.ConfigAccessor
+ configs reconcilersource.ConfigAccessor
+ namespaceLister clientv1.NamespaceLister
}
var _ apiserversourcereconciler.Interface = (*Reconciler)(nil)
@@ -98,13 +101,23 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1.ApiServerSour
}
source.Status.MarkSink(sinkURI)
- err = r.runAccessCheck(ctx, source)
+ // resolve namespaces to watch
+ namespaces, err := r.namespacesFromSelector(source)
+ if err != nil {
+ logging.FromContext(ctx).Errorw("cannot retrieve namespaces to watch", zap.Error(err))
+ return err
+ }
+ source.Status.Namespaces = namespaces
+
+ err = r.runAccessCheck(ctx, source, namespaces)
if err != nil {
logging.FromContext(ctx).Errorw("Not enough permission", zap.Error(err))
return err
}
- ra, err := r.createReceiveAdapter(ctx, source, sinkURI.String())
+ // An empty selector targets all namespaces.
+ allNamespaces := isEmptySelector(source.Spec.NamespaceSelector)
+ ra, err := r.createReceiveAdapter(ctx, source, sinkURI.String(), namespaces, allNamespaces)
if err != nil {
logging.FromContext(ctx).Errorw("Unable to create the receive adapter", zap.Error(err))
return err
@@ -121,18 +134,55 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1.ApiServerSour
return nil
}
-func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1.ApiServerSource, sinkURI string) (*appsv1.Deployment, error) {
+func (r *Reconciler) namespacesFromSelector(src *v1.ApiServerSource) ([]string, error) {
+ if src.Spec.NamespaceSelector == nil {
+ return []string{src.Namespace}, nil
+ }
+
+ selector, err := metav1.LabelSelectorAsSelector(src.Spec.NamespaceSelector)
+ if err != nil {
+ return nil, err
+ }
+
+ namespaces, err := r.namespaceLister.List(selector)
+ if err != nil {
+ return nil, err
+ }
+
+ nsString := make([]string, 0, len(namespaces))
+ for _, ns := range namespaces {
+ nsString = append(nsString, ns.Name)
+ }
+ sort.Strings(nsString)
+ return nsString, nil
+}
+
+func isEmptySelector(selector *metav1.LabelSelector) bool {
+ if selector == nil {
+ return false
+ }
+
+ if len(selector.MatchLabels) == 0 && len(selector.MatchExpressions) == 0 {
+ return true
+ }
+
+ return false
+}
+
+func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1.ApiServerSource, sinkURI string, namespaces []string, allNamespaces bool) (*appsv1.Deployment, error) {
// TODO: missing.
// if err := checkResourcesStatus(src); err != nil {
// return nil, err
// }
adapterArgs := resources.ReceiveAdapterArgs{
- Image: r.receiveAdapterImage,
- Source: src,
- Labels: resources.Labels(src.Name),
- SinkURI: sinkURI,
- Configs: r.configs,
+ Image: r.receiveAdapterImage,
+ Source: src,
+ Labels: resources.Labels(src.Name),
+ SinkURI: sinkURI,
+ Configs: r.configs,
+ Namespaces: namespaces,
+ AllNamespaces: allNamespaces,
}
expected, err := resources.MakeReceiveAdapter(&adapterArgs)
if err != nil {
@@ -180,7 +230,7 @@ func (r *Reconciler) podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1
return false
}
-func (r *Reconciler) runAccessCheck(ctx context.Context, src *v1.ApiServerSource) error {
+func (r *Reconciler) runAccessCheck(ctx context.Context, src *v1.ApiServerSource, namespaces []string) error {
if src.Spec.Resources == nil || len(src.Spec.Resources) == 0 {
src.Status.MarkSufficientPermissions()
return nil
@@ -206,35 +256,39 @@ func (r *Reconciler) runAccessCheck(ctx context.Context, src *v1.ApiServerSource
return err
}
gvr, _ := meta.UnsafeGuessKindToResource(schema.GroupVersionKind{Kind: res.Kind, Group: gv.Group, Version: gv.Version}) // TODO: Test for nil Kind.
- missingVerbs := ""
- sep1 := ""
- for _, verb := range verbs {
- sar := &authorizationv1.SubjectAccessReview{
- Spec: authorizationv1.SubjectAccessReviewSpec{
- ResourceAttributes: &authorizationv1.ResourceAttributes{
- Namespace: src.Namespace,
- Verb: verb,
- Group: gv.Group,
- Resource: gvr.Resource,
+
+ for _, ns := range namespaces {
+ missingVerbs := ""
+ sep1 := ""
+ for _, verb := range verbs {
+ sar := &authorizationv1.SubjectAccessReview{
+ Spec: authorizationv1.SubjectAccessReviewSpec{
+ ResourceAttributes: &authorizationv1.ResourceAttributes{
+ Namespace: ns,
+ Verb: verb,
+ Group: gv.Group,
+ Resource: gvr.Resource,
+ },
+ User: user,
},
- User: user,
- },
- }
+ }
+
+ response, err := r.kubeClientSet.AuthorizationV1().SubjectAccessReviews().Create(ctx, sar, metav1.CreateOptions{})
+ if err != nil {
+ return err
+ }
- response, err := r.kubeClientSet.AuthorizationV1().SubjectAccessReviews().Create(ctx, sar, metav1.CreateOptions{})
- if err != nil {
- return err
+ if !response.Status.Allowed {
+ missingVerbs += sep1 + verb
+ sep1 = ", "
+ }
}
- if !response.Status.Allowed {
- missingVerbs += sep1 + verb
- sep1 = ", "
+ if missingVerbs != "" {
+ missing += sep + missingVerbs + ` resource "` + gvr.Resource + `" in API group "` + gv.Group + `" in Namespace "` + ns + `"`
+ sep = ", "
}
}
- if missingVerbs != "" {
- missing += sep + missingVerbs + ` resource "` + gvr.Resource + `" in API group "` + gv.Group + `"`
- sep = ", "
- }
}
if missing == "" {
src.Status.MarkSufficientPermissions()
@@ -242,7 +296,7 @@ func (r *Reconciler) runAccessCheck(ctx context.Context, src *v1.ApiServerSource
}
src.Status.MarkNoSufficientPermissions(lastReason, "User %s cannot %s", user, missing)
- return fmt.Errorf("Insufficient permission: user %s cannot %s", user, missing)
+ return fmt.Errorf("insufficient permissions: User %s cannot %s", user, missing)
}
diff --git a/pkg/reconciler/apiserversource/apiserversource_test.go b/pkg/reconciler/apiserversource/apiserversource_test.go
index 7b0fbba7b77..a09f72aa0a4 100644
--- a/pkg/reconciler/apiserversource/apiserversource_test.go
+++ b/pkg/reconciler/apiserversource/apiserversource_test.go
@@ -25,6 +25,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
authorizationv1 "k8s.io/api/authorization/v1"
corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientgotesting "k8s.io/client-go/testing"
@@ -126,6 +127,7 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
rttestingv1.WithApiServerSourceSink(sinkURI),
rttestingv1.WithApiServerSourceNoSufficientPermissions,
+ rttestingv1.WithApiServerSourceStatusNamespaces([]string{testNS}),
),
}},
WantCreates: []runtime.Object{
@@ -135,7 +137,7 @@ func TestReconcile(t *testing.T) {
},
WantErr: true,
WantEvents: []string{
- Eventf(corev1.EventTypeWarning, "InternalError", `Insufficient permission: user system:serviceaccount:testnamespace:default cannot get, list, watch resource "namespaces" in API group ""`),
+ Eventf(corev1.EventTypeWarning, "InternalError", `insufficient permissions: User system:serviceaccount:testnamespace:default cannot get, list, watch resource "namespaces" in API group "" in Namespace "testnamespace"`),
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(false)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
@@ -178,6 +180,7 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithApiServerSourceSufficientPermissions,
rttestingv1.WithApiServerSourceReferenceModeEventTypes(source),
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
+ rttestingv1.WithApiServerSourceStatusNamespaces([]string{testNS}),
),
}},
WantCreates: []runtime.Object{
@@ -187,6 +190,135 @@ func TestReconcile(t *testing.T) {
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
+ }, {
+ Name: "valid with namespace selector",
+ Objects: []runtime.Object{
+ rttestingv1.NewApiServerSource(sourceName, testNS,
+ rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
+ Resources: []sourcesv1.APIVersionKindSelector{{
+ APIVersion: "v1",
+ Kind: "Namespace",
+ }},
+ SourceSpec: duckv1.SourceSpec{Sink: sinkDest},
+ }),
+ rttestingv1.WithApiServerSourceUID(sourceUID),
+ rttestingv1.WithApiServerSourceObjectMetaGeneration(generation),
+ rttestingv1.WithApiServerSourceNamespaceSelector(metav1.LabelSelector{MatchLabels: map[string]string{"target": "yes"}}),
+ ),
+ rttestingv1.NewChannel(sinkName, testNS,
+ rttestingv1.WithInitChannelConditions,
+ rttestingv1.WithChannelAddress(sinkDNS),
+ ),
+ makeAvailableReceiveAdapter(t),
+ rttesting.NewNamespace("test-a", rttesting.WithNamespaceLabeled(map[string]string{"target": "yes"})),
+ rttesting.NewNamespace("test-b", rttesting.WithNamespaceLabeled(map[string]string{"target": "yes"})),
+ rttesting.NewNamespace("test-c", rttesting.WithNamespaceLabeled(map[string]string{"target": "no"})),
+ },
+ Key: testNS + "/" + sourceName,
+ WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
+ Object: rttestingv1.NewApiServerSource(sourceName, testNS,
+ rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
+ Resources: []sourcesv1.APIVersionKindSelector{{
+ APIVersion: "v1",
+ Kind: "Namespace",
+ }},
+ SourceSpec: duckv1.SourceSpec{Sink: sinkDest},
+ }),
+ rttestingv1.WithApiServerSourceUID(sourceUID),
+ rttestingv1.WithApiServerSourceObjectMetaGeneration(generation),
+ // Status Update:
+ rttestingv1.WithInitApiServerSourceConditions,
+ rttestingv1.WithApiServerSourceDeployed,
+ rttestingv1.WithApiServerSourceSink(sinkURI),
+ rttestingv1.WithApiServerSourceSufficientPermissions,
+ rttestingv1.WithApiServerSourceReferenceModeEventTypes(source),
+ rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
+ rttestingv1.WithApiServerSourceNamespaceSelector(metav1.LabelSelector{MatchLabels: map[string]string{"target": "yes"}}),
+ rttestingv1.WithApiServerSourceStatusNamespaces([]string{"test-a", "test-b"}),
+ ),
+ }},
+ WantCreates: []runtime.Object{
+ makeNamespacedSubjectAccessReview("namespaces", "get", "default", "test-a"),
+ makeNamespacedSubjectAccessReview("namespaces", "list", "default", "test-a"),
+ makeNamespacedSubjectAccessReview("namespaces", "watch", "default", "test-a"),
+ makeNamespacedSubjectAccessReview("namespaces", "get", "default", "test-b"),
+ makeNamespacedSubjectAccessReview("namespaces", "list", "default", "test-b"),
+ makeNamespacedSubjectAccessReview("namespaces", "watch", "default", "test-b"),
+ },
+ WantUpdates: []clientgotesting.UpdateActionImpl{{
+ Object: makeAvailableReceiveAdapterWithNamespaces(t, []string{"test-a", "test-b"}, false),
+ }},
+ WantEvents: []string{
+ Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`),
+ },
+ WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
+ SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
+ }, {
+ Name: "valid with an empty namespace selector",
+ Objects: []runtime.Object{
+ rttestingv1.NewApiServerSource(sourceName, testNS,
+ rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
+ Resources: []sourcesv1.APIVersionKindSelector{{
+ APIVersion: "v1",
+ Kind: "Namespace",
+ }},
+ SourceSpec: duckv1.SourceSpec{Sink: sinkDest},
+ }),
+ rttestingv1.WithApiServerSourceUID(sourceUID),
+ rttestingv1.WithApiServerSourceObjectMetaGeneration(generation),
+ rttestingv1.WithApiServerSourceNamespaceSelector(metav1.LabelSelector{}),
+ ),
+ rttestingv1.NewChannel(sinkName, testNS,
+ rttestingv1.WithInitChannelConditions,
+ rttestingv1.WithChannelAddress(sinkDNS),
+ ),
+ makeAvailableReceiveAdapter(t),
+ rttesting.NewNamespace("test-a"),
+ rttesting.NewNamespace("test-b"),
+ rttesting.NewNamespace("test-c"),
+ },
+ Key: testNS + "/" + sourceName,
+ WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
+ Object: rttestingv1.NewApiServerSource(sourceName, testNS,
+ rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
+ Resources: []sourcesv1.APIVersionKindSelector{{
+ APIVersion: "v1",
+ Kind: "Namespace",
+ }},
+ SourceSpec: duckv1.SourceSpec{Sink: sinkDest},
+ }),
+ rttestingv1.WithApiServerSourceUID(sourceUID),
+ rttestingv1.WithApiServerSourceObjectMetaGeneration(generation),
+ // Status Update:
+ rttestingv1.WithInitApiServerSourceConditions,
+ rttestingv1.WithApiServerSourceDeployed,
+ rttestingv1.WithApiServerSourceSink(sinkURI),
+ rttestingv1.WithApiServerSourceSufficientPermissions,
+ rttestingv1.WithApiServerSourceReferenceModeEventTypes(source),
+ rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
+ rttestingv1.WithApiServerSourceNamespaceSelector(metav1.LabelSelector{}),
+ rttestingv1.WithApiServerSourceStatusNamespaces([]string{"test-a", "test-b", "test-c"}),
+ ),
+ }},
+ WantCreates: []runtime.Object{
+ makeNamespacedSubjectAccessReview("namespaces", "get", "default", "test-a"),
+ makeNamespacedSubjectAccessReview("namespaces", "list", "default", "test-a"),
+ makeNamespacedSubjectAccessReview("namespaces", "watch", "default", "test-a"),
+ makeNamespacedSubjectAccessReview("namespaces", "get", "default", "test-b"),
+ makeNamespacedSubjectAccessReview("namespaces", "list", "default", "test-b"),
+ makeNamespacedSubjectAccessReview("namespaces", "watch", "default", "test-b"),
+ makeNamespacedSubjectAccessReview("namespaces", "get", "default", "test-c"),
+ makeNamespacedSubjectAccessReview("namespaces", "list", "default", "test-c"),
+ makeNamespacedSubjectAccessReview("namespaces", "watch", "default", "test-c"),
+ },
+ WantUpdates: []clientgotesting.UpdateActionImpl{{
+ Object: makeAvailableReceiveAdapterWithNamespaces(t, []string{"test-a", "test-b", "test-c"}, true),
+ }},
+ WantEvents: []string{
+ Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`),
+ },
+ WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
+ SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
}, {
Name: "valid with eventmode of resourcemode",
Objects: []runtime.Object{
@@ -228,6 +360,7 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithApiServerSourceSufficientPermissions,
rttestingv1.WithApiServerSourceResourceModeEventTypes(source),
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
+ rttestingv1.WithApiServerSourceStatusNamespaces([]string{testNS}),
),
}},
WantCreates: []runtime.Object{
@@ -276,6 +409,7 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithApiServerSourceSufficientPermissions,
rttestingv1.WithApiServerSourceReferenceModeEventTypes(source),
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
+ rttestingv1.WithApiServerSourceStatusNamespaces([]string{testNS}),
),
}},
WantCreates: []runtime.Object{
@@ -367,6 +501,7 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithApiServerSourceSink(sinkURI),
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
rttestingv1.WithApiServerSourceSufficientPermissions,
+ rttestingv1.WithApiServerSourceStatusNamespaces([]string{testNS}),
),
}},
WantCreates: []runtime.Object{
@@ -430,6 +565,7 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithApiServerSourceSufficientPermissions,
rttestingv1.WithApiServerSourceReferenceModeEventTypes(source),
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
+ rttestingv1.WithApiServerSourceStatusNamespaces([]string{testNS}),
),
}},
WantCreates: []runtime.Object{
@@ -481,6 +617,7 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithApiServerSourceReferenceModeEventTypes(source),
rttestingv1.WithApiServerSourceDeploymentUnavailable,
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
+ rttestingv1.WithApiServerSourceStatusNamespaces([]string{testNS}),
),
}},
WantUpdates: []clientgotesting.UpdateActionImpl{{
@@ -541,6 +678,7 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithApiServerSourceSufficientPermissions,
rttestingv1.WithApiServerSourceReferenceModeEventTypes(source),
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
+ rttestingv1.WithApiServerSourceStatusNamespaces([]string{testNS}),
),
}},
WantUpdates: []clientgotesting.UpdateActionImpl{{
@@ -595,6 +733,7 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithApiServerSourceSufficientPermissions,
rttestingv1.WithApiServerSourceReferenceModeEventTypes(source),
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
+ rttestingv1.WithApiServerSourceStatusNamespaces([]string{testNS}),
),
}},
WantUpdates: []clientgotesting.UpdateActionImpl{{
@@ -646,6 +785,7 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithApiServerSourceSufficientPermissions,
rttestingv1.WithApiServerSourceReferenceModeEventTypes(source),
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
+ rttestingv1.WithApiServerSourceStatusNamespaces([]string{testNS}),
),
}},
WantCreates: []runtime.Object{
@@ -666,6 +806,7 @@ func TestReconcile(t *testing.T) {
receiveAdapterImage: image,
sinkResolver: resolver.NewURIResolverFromTracker(ctx, tracker.New(func(types.NamespacedName) {}, 0)),
configs: &reconcilersource.EmptyVarsGenerator{},
+ namespaceLister: listers.GetNamespaceLister(),
}
return apiserversource.NewReconciler(ctx, logger,
fakeeventingclient.Get(ctx), listers.GetApiServerSourceLister(),
@@ -699,11 +840,12 @@ func makeReceiveAdapterWithName(t *testing.T, sourceName string) *appsv1.Deploym
)
args := resources.ReceiveAdapterArgs{
- Image: image,
- Source: src,
- Labels: resources.Labels(sourceName),
- SinkURI: sinkURI.String(),
- Configs: &reconcilersource.EmptyVarsGenerator{},
+ Image: image,
+ Source: src,
+ Labels: resources.Labels(sourceName),
+ SinkURI: sinkURI.String(),
+ Configs: &reconcilersource.EmptyVarsGenerator{},
+ Namespaces: []string{testNS},
}
ra, err := resources.MakeReceiveAdapter(&args)
@@ -737,11 +879,12 @@ func makeAvailableReceiveAdapterWithTargetURI(t *testing.T) *appsv1.Deployment {
)
args := resources.ReceiveAdapterArgs{
- Image: image,
- Source: src,
- Labels: resources.Labels(sourceName),
- SinkURI: sinkTargetURI.String(),
- Configs: &reconcilersource.EmptyVarsGenerator{},
+ Image: image,
+ Source: src,
+ Labels: resources.Labels(sourceName),
+ SinkURI: sinkTargetURI.String(),
+ Configs: &reconcilersource.EmptyVarsGenerator{},
+ Namespaces: []string{testNS},
}
ra, err := resources.MakeReceiveAdapter(&args)
@@ -771,11 +914,47 @@ func makeAvailableReceiveAdapterWithEventMode(t *testing.T, eventMode string) *a
)
args := resources.ReceiveAdapterArgs{
- Image: image,
- Source: src,
- Labels: resources.Labels(sourceName),
- SinkURI: sinkURI.String(),
- Configs: &reconcilersource.EmptyVarsGenerator{},
+ Image: image,
+ Source: src,
+ Labels: resources.Labels(sourceName),
+ SinkURI: sinkURI.String(),
+ Configs: &reconcilersource.EmptyVarsGenerator{},
+ Namespaces: []string{testNS},
+ }
+
+ ra, err := resources.MakeReceiveAdapter(&args)
+ require.NoError(t, err)
+
+ rttesting.WithDeploymentAvailable()(ra)
+ return ra
+}
+
+func makeAvailableReceiveAdapterWithNamespaces(t *testing.T, namespaces []string, allNamespaces bool) *appsv1.Deployment {
+ t.Helper()
+
+ src := rttestingv1.NewApiServerSource(sourceName, testNS,
+ rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
+ Resources: []sourcesv1.APIVersionKindSelector{{
+ APIVersion: "v1",
+ Kind: "Namespace",
+ }},
+ SourceSpec: duckv1.SourceSpec{Sink: sinkDest},
+ }),
+ rttestingv1.WithApiServerSourceUID(sourceUID),
+ // Status Update:
+ rttestingv1.WithInitApiServerSourceConditions,
+ rttestingv1.WithApiServerSourceDeployed,
+ rttestingv1.WithApiServerSourceSink(sinkURI),
+ )
+
+ args := resources.ReceiveAdapterArgs{
+ Image: image,
+ Source: src,
+ Labels: resources.Labels(sourceName),
+ SinkURI: sinkURI.String(),
+ Configs: &reconcilersource.EmptyVarsGenerator{},
+ Namespaces: namespaces,
+ AllNamespaces: allNamespaces,
}
ra, err := resources.MakeReceiveAdapter(&args)
@@ -806,11 +985,11 @@ func makeReceiveAdapterWithDifferentContainerCount(t *testing.T) *appsv1.Deploym
return ra
}
-func makeSubjectAccessReview(resource, verb, sa string) *authorizationv1.SubjectAccessReview {
+func makeNamespacedSubjectAccessReview(resource, verb, sa, ns string) *authorizationv1.SubjectAccessReview {
return &authorizationv1.SubjectAccessReview{
Spec: authorizationv1.SubjectAccessReviewSpec{
ResourceAttributes: &authorizationv1.ResourceAttributes{
- Namespace: testNS,
+ Namespace: ns,
Verb: verb,
Group: "",
Resource: resource,
@@ -820,6 +999,10 @@ func makeSubjectAccessReview(resource, verb, sa string) *authorizationv1.Subject
}
}
+func makeSubjectAccessReview(resource, verb, sa string) *authorizationv1.SubjectAccessReview {
+ return makeNamespacedSubjectAccessReview(resource, verb, sa, testNS)
+}
+
func subjectAccessReviewCreateReactor(allowed bool) clientgotesting.ReactionFunc {
return func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) {
if action.GetVerb() == "create" && action.GetResource().Resource == "subjectaccessreviews" {
diff --git a/pkg/reconciler/apiserversource/controller.go b/pkg/reconciler/apiserversource/controller.go
index 3fadb4e0f48..5dfe5f2f714 100644
--- a/pkg/reconciler/apiserversource/controller.go
+++ b/pkg/reconciler/apiserversource/controller.go
@@ -31,6 +31,7 @@ import (
kubeclient "knative.dev/pkg/client/injection/kube/client"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
+ "knative.dev/pkg/client/injection/kube/informers/core/v1/namespace"
apiserversourceinformer "knative.dev/eventing/pkg/client/injection/informers/sources/v1/apiserversource"
apiserversourcereconciler "knative.dev/eventing/pkg/client/injection/reconciler/sources/v1/apiserversource"
@@ -52,11 +53,13 @@ func NewController(
deploymentInformer := deploymentinformer.Get(ctx)
apiServerSourceInformer := apiserversourceinformer.Get(ctx)
+ namespaceInformer := namespace.Get(ctx)
r := &Reconciler{
- kubeClientSet: kubeclient.Get(ctx),
- ceSource: GetCfgHost(ctx),
- configs: reconcilersource.WatchConfigurations(ctx, component, cmw),
+ kubeClientSet: kubeclient.Get(ctx),
+ ceSource: GetCfgHost(ctx),
+ configs: reconcilersource.WatchConfigurations(ctx, component, cmw),
+ namespaceLister: namespaceInformer.Lister(),
}
env := &envConfig{}
@@ -76,5 +79,16 @@ func NewController(
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})
+ cb := func() {
+ logging.FromContext(ctx).Info("Global resync of APIServerSources due to namespaces changing.")
+ impl.GlobalResync(apiServerSourceInformer.Informer())
+ }
+
+ namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) { cb() },
+ UpdateFunc: func(oldObj, newObj interface{}) { cb() },
+ DeleteFunc: func(obj interface{}) { cb() },
+ })
+
return impl
}
diff --git a/pkg/reconciler/apiserversource/controller_test.go b/pkg/reconciler/apiserversource/controller_test.go
index 56f33b01108..23d5f810d3f 100644
--- a/pkg/reconciler/apiserversource/controller_test.go
+++ b/pkg/reconciler/apiserversource/controller_test.go
@@ -33,6 +33,7 @@ import (
// Fake injection informers
_ "knative.dev/eventing/pkg/client/injection/informers/sources/v1/apiserversource/fake"
_ "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/fake"
+ _ "knative.dev/pkg/client/injection/kube/informers/core/v1/namespace/fake"
. "knative.dev/pkg/reconciler/testing"
)
diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter.go b/pkg/reconciler/apiserversource/resources/receive_adapter.go
index 9f61d52570f..499fe8141d8 100644
--- a/pkg/reconciler/apiserversource/resources/receive_adapter.go
+++ b/pkg/reconciler/apiserversource/resources/receive_adapter.go
@@ -41,11 +41,13 @@ import (
// ReceiveAdapterArgs are the arguments needed to create a ApiServer Receive Adapter.
// Every field is required.
type ReceiveAdapterArgs struct {
- Image string
- Source *v1.ApiServerSource
- Labels map[string]string
- SinkURI string
- Configs reconcilersource.ConfigAccessor
+ Image string
+ Source *v1.ApiServerSource
+ Labels map[string]string
+ SinkURI string
+ Configs reconcilersource.ConfigAccessor
+ Namespaces []string
+ AllNamespaces bool
}
// MakeReceiveAdapter generates (but does not insert into K8s) the Receive Adapter Deployment for
@@ -111,10 +113,11 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) (*appsv1.Deployment, error) {
func makeEnv(args *ReceiveAdapterArgs) ([]corev1.EnvVar, error) {
cfg := &apiserver.Config{
- Namespace: args.Source.Namespace,
+ Namespaces: args.Namespaces,
Resources: make([]apiserver.ResourceWatch, 0, len(args.Source.Spec.Resources)),
ResourceOwner: args.Source.Spec.ResourceOwner,
EventMode: args.Source.Spec.EventMode,
+ AllNamespaces: args.AllNamespaces,
}
for _, r := range args.Source.Spec.Resources {
diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go
index f28413373a5..d61c8963e22 100644
--- a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go
+++ b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go
@@ -129,7 +129,7 @@ func TestMakeReceiveAdapters(t *testing.T) {
Value: "sink-uri",
}, {
Name: "K_SOURCE_CONFIG",
- Value: `{"namespace":"source-namespace","resources":[{"gvr":{"Group":"","Version":"","Resource":"namespaces"}},{"gvr":{"Group":"batch","Version":"v1","Resource":"jobs"}},{"gvr":{"Group":"","Version":"","Resource":"pods"},"selector":"test-key1=test-value1"}],"owner":{"apiVersion":"custom/v1","kind":"Parent"},"mode":"Resource"}`,
+ Value: `{"namespaces":["source-namespace"],"allNamespaces":false,"resources":[{"gvr":{"Group":"","Version":"","Resource":"namespaces"}},{"gvr":{"Group":"batch","Version":"v1","Resource":"jobs"}},{"gvr":{"Group":"","Version":"","Resource":"pods"},"selector":"test-key1=test-value1"}],"owner":{"apiVersion":"custom/v1","kind":"Parent"},"mode":"Resource"}`,
}, {
Name: "SYSTEM_NAMESPACE",
Value: "knative-testing",
@@ -202,8 +202,9 @@ func TestMakeReceiveAdapters(t *testing.T) {
"test-key1": "test-value1",
"test-key2": "test-value2",
},
- SinkURI: "sink-uri",
- Configs: &source.EmptyVarsGenerator{},
+ SinkURI: "sink-uri",
+ Configs: &source.EmptyVarsGenerator{},
+ Namespaces: []string{"source-namespace"},
})
if diff := cmp.Diff(tc.want, got); diff != "" {
diff --git a/pkg/reconciler/testing/v1/apiserversouce.go b/pkg/reconciler/testing/v1/apiserversouce.go
index f78ab6a8350..04bb90f63e5 100644
--- a/pkg/reconciler/testing/v1/apiserversouce.go
+++ b/pkg/reconciler/testing/v1/apiserversouce.go
@@ -111,7 +111,7 @@ func WithApiServerSourceSufficientPermissions(s *v1.ApiServerSource) {
}
func WithApiServerSourceNoSufficientPermissions(s *v1.ApiServerSource) {
- s.Status.MarkNoSufficientPermissions("", `User system:serviceaccount:testnamespace:default cannot get, list, watch resource "namespaces" in API group ""`)
+ s.Status.MarkNoSufficientPermissions("", `User system:serviceaccount:testnamespace:default cannot get, list, watch resource "namespaces" in API group "" in Namespace "testnamespace"`)
}
func WithApiServerSourceDeleted(c *v1.ApiServerSource) {
@@ -136,3 +136,15 @@ func WithApiServerSourceObjectMetaGeneration(generation int64) ApiServerSourceOp
c.ObjectMeta.Generation = generation
}
}
+
+func WithApiServerSourceNamespaceSelector(nsSelector metav1.LabelSelector) ApiServerSourceOption {
+ return func(c *v1.ApiServerSource) {
+ c.Spec.NamespaceSelector = &nsSelector
+ }
+}
+
+func WithApiServerSourceStatusNamespaces(namespaces []string) ApiServerSourceOption {
+ return func(c *v1.ApiServerSource) {
+ c.Status.Namespaces = namespaces
+ }
+}
diff --git a/test/rekt/apiserversource_test.go b/test/rekt/apiserversource_test.go
index 56b7f71f78d..0d865f4e054 100644
--- a/test/rekt/apiserversource_test.go
+++ b/test/rekt/apiserversource_test.go
@@ -120,3 +120,31 @@ func TestApiServerSourceDataPlane_EventsRetries(t *testing.T) {
env.Test(ctx, t, apiserversourcefeatures.SendsEventsWithRetries())
}
+
+func TestApiServerSourceDataPlane_MultipleNamespaces(t *testing.T) {
+ t.Parallel()
+
+ ctx, env := global.Environment(
+ knative.WithKnativeNamespace(system.Namespace()),
+ knative.WithLoggingConfig,
+ knative.WithTracingConfig,
+ k8s.WithEventListener,
+ environment.Managed(t),
+ )
+
+ env.Test(ctx, t, apiserversourcefeatures.SendsEventsForAllResourcesWithNamespaceSelector())
+}
+
+func TestApiServerSourceDataPlane_MultipleNamespacesEmptySelector(t *testing.T) {
+ t.Parallel()
+
+ ctx, env := global.Environment(
+ knative.WithKnativeNamespace(system.Namespace()),
+ knative.WithLoggingConfig,
+ knative.WithTracingConfig,
+ k8s.WithEventListener,
+ environment.Managed(t),
+ )
+
+ env.Test(ctx, t, apiserversourcefeatures.SendsEventsForAllResourcesWithEmptyNamespaceSelector())
+}
diff --git a/test/rekt/features/apiserversource/data_plane.go b/test/rekt/features/apiserversource/data_plane.go
index b539a0f92bd..f3478ae276e 100644
--- a/test/rekt/features/apiserversource/data_plane.go
+++ b/test/rekt/features/apiserversource/data_plane.go
@@ -38,6 +38,8 @@ import (
"knative.dev/eventing/test/rekt/resources/apiserversource"
"knative.dev/eventing/test/rekt/resources/broker"
"knative.dev/eventing/test/rekt/resources/eventtype"
+ "knative.dev/eventing/test/rekt/resources/namespace"
+ "knative.dev/eventing/test/rekt/resources/pingsource"
"knative.dev/eventing/test/rekt/resources/pod"
"knative.dev/eventing/test/rekt/resources/trigger"
)
@@ -345,6 +347,151 @@ func SendsEventsForAllResources() *feature.Feature {
return f
}
+func SendsEventsForAllResourcesWithNamespaceSelector() *feature.Feature {
+ source := feature.MakeRandomK8sName("apiserversource")
+ sink := feature.MakeRandomK8sName("sink")
+ f := feature.NewFeatureNamed("Send events for select resources within multiple namespaces")
+
+ f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver))
+
+ sacmName := feature.MakeRandomK8sName("apiserversource")
+ f.Setup("Create Service Account for ApiServerSource with RBAC for v1.Pod resources",
+ setupAccountAndRoleForPods(sacmName))
+
+ testNS1 := feature.MakeRandomK8sName("source-namespace-1")
+ testNS2 := feature.MakeRandomK8sName("source-namespace-2")
+ testNS3 := feature.MakeRandomK8sName("source-namespace-3")
+
+ // create two new namespaces with matching selector
+ f.Setup("create a namespace with matching label", namespace.Install(testNS1, namespace.WithLabels(map[string]string{"env": "development"})))
+ f.Setup("create a namespace with matching label", namespace.Install(testNS2, namespace.WithLabels(map[string]string{"env": "development"})))
+
+ // create one new namespace that doesn't match selector
+ f.Setup("create a namespace without matching label", namespace.Install(testNS3, namespace.WithLabels(map[string]string{"env": "production"})))
+
+ cfg := []manifest.CfgFn{
+ apiserversource.WithServiceAccountName(sacmName),
+ apiserversource.WithEventMode("Reference"),
+ apiserversource.WithSink(svc.AsKReference(sink), ""),
+ apiserversource.WithResources(v1.APIVersionKindSelector{
+ APIVersion: "v1",
+ Kind: "Pod",
+ }),
+ apiserversource.WithNamespaceSelector(&metav1.LabelSelector{
+ MatchLabels: map[string]string{"env": "development"},
+ }),
+ }
+
+ f.Setup("install ApiServerSource", apiserversource.Install(source, cfg...))
+ f.Setup("ApiServerSource goes ready", apiserversource.IsReady(source))
+
+ pod1 := feature.MakeRandomK8sName("example-pod-1")
+ pod2 := feature.MakeRandomK8sName("example-pod-2")
+ pod3 := feature.MakeRandomK8sName("example-pod-3")
+ f.Requirement("install example pod 1",
+ pod.Install(pod1, pod.WithImage(exampleImage), pod.WithNamespace(testNS1)),
+ )
+ f.Requirement("install example pod 2",
+ pod.Install(pod2, pod.WithImage(exampleImage), pod.WithNamespace(testNS2)),
+ )
+ f.Requirement("install example pod 3",
+ pod.Install(pod3, pod.WithImage(exampleImage), pod.WithNamespace(testNS3)),
+ )
+
+ f.Stable("ApiServerSource as event source").
+ Must("delivers events from matching namespace",
+ eventasssert.OnStore(sink).MatchEvent(
+ test.HasType("dev.knative.apiserver.ref.add"),
+ test.DataContains(`"kind":"Pod"`),
+ test.DataContains(fmt.Sprintf(`"name":"%s"`, pod1)),
+ ).Exact(1))
+ f.Stable("ApiServerSource as event source").
+ Must("delivers events from matching namespace",
+ eventasssert.OnStore(sink).MatchEvent(
+ test.HasType("dev.knative.apiserver.ref.add"),
+ test.DataContains(`"kind":"Pod"`),
+ test.DataContains(fmt.Sprintf(`"name":"%s"`, pod2)),
+ ).Exact(1))
+ f.Stable("ApiServerSource as event source").
+ MustNot("must not deliver events from non-matching namespace",
+ eventasssert.OnStore(sink).MatchEvent(
+ test.HasType("dev.knative.apiserver.ref.add"),
+ test.DataContains(`"kind":"Pod"`),
+ test.DataContains(fmt.Sprintf(`"name":"%s"`, pod3)),
+ ).Not())
+
+ // Delete resources including temporary namespaces
+ f.Teardown("Deleting resources", f.DeleteResources)
+ return f
+}
+
+// SendsEventsForAllResourcesWithEmptyNamespaceSelector tests an APIServerSource with an empty namespace selector
+// which will target all namespaces
+func SendsEventsForAllResourcesWithEmptyNamespaceSelector() *feature.Feature {
+ source := feature.MakeRandomK8sName("apiserversource")
+ sink := feature.MakeRandomK8sName("sink")
+ f := feature.NewFeatureNamed("Send events for select resources within all namespaces")
+
+ f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver))
+
+ sacmName := feature.MakeRandomK8sName("apiserversource")
+ f.Setup("Create Service Account for ApiServerSource with RBAC for sources.knative.dev/v1 PingSources",
+ setupAccountAndRoleForPingSources(sacmName))
+
+ testNS1 := feature.MakeRandomK8sName("source-namespace-1")
+ testNS2 := feature.MakeRandomK8sName("source-namespace-2")
+
+ // create two new namespaces
+ f.Setup("create a namespace", namespace.Install(testNS1))
+ f.Setup("create a namespace", namespace.Install(testNS2))
+
+ cfg := []manifest.CfgFn{
+ apiserversource.WithServiceAccountName(sacmName),
+ apiserversource.WithEventMode("Reference"),
+ apiserversource.WithSink(svc.AsKReference(sink), ""),
+ apiserversource.WithResources(v1.APIVersionKindSelector{
+ APIVersion: "sources.knative.dev/v1",
+ Kind: "PingSource",
+ }),
+ apiserversource.WithNamespaceSelector(&metav1.LabelSelector{
+ MatchLabels: map[string]string{},
+ MatchExpressions: []metav1.LabelSelectorRequirement{},
+ }),
+ }
+
+ f.Setup("install ApiServerSource", apiserversource.Install(source, cfg...))
+ f.Setup("ApiServerSource goes ready", apiserversource.IsReady(source))
+
+ pingSource1 := feature.MakeRandomK8sName("ping-source-1")
+ pingSource2 := feature.MakeRandomK8sName("ping-source-2")
+
+ f.Requirement("install PingSource 1",
+ pingsource.Install(pingSource1, pingsource.WithSink(nil, "http://example.com")),
+ )
+ f.Requirement("install PingSource 2",
+ pingsource.Install(pingSource2, pingsource.WithSink(nil, "http://example.com")),
+ )
+
+ f.Stable("ApiServerSource as event source").
+ Must("delivers events from new namespace",
+ eventasssert.OnStore(sink).MatchEvent(
+ test.HasType("dev.knative.apiserver.ref.add"),
+ test.DataContains(`"kind":"PingSource"`),
+ test.DataContains(fmt.Sprintf(`"name":"%s"`, pingSource1)),
+ ).Exact(1))
+ f.Stable("ApiServerSource as event source").
+ Must("delivers events from new namespace",
+ eventasssert.OnStore(sink).MatchEvent(
+ test.HasType("dev.knative.apiserver.ref.add"),
+ test.DataContains(`"kind":"PingSource"`),
+ test.DataContains(fmt.Sprintf(`"name":"%s"`, pingSource2)),
+ ).Exact(1))
+
+ // Delete resources including temporary namespaces
+ f.Teardown("Deleting resources", f.DeleteResources)
+ return f
+}
+
func SendsEventsForLabelMatchingResources() *feature.Feature {
source := feature.MakeRandomK8sName("apiserversource")
sink := feature.MakeRandomK8sName("sink")
@@ -498,6 +645,17 @@ func setupAccountAndRoleForPods(sacmName string) feature.StepFn {
)
}
+func setupAccountAndRoleForPingSources(sacmName string) feature.StepFn {
+ return account_role.Install(sacmName,
+ account_role.WithRole(sacmName+"-clusterrole"),
+ account_role.WithRules(rbacv1.PolicyRule{
+ APIGroups: []string{"", "sources.knative.dev"},
+ Resources: []string{"events", "pingsources"},
+ Verbs: []string{"get", "list", "watch"},
+ }),
+ )
+}
+
//// any matches any event
//func any() test.EventMatcher {
// return func(have cloudevent.Event) error {
diff --git a/test/rekt/resources/apiserversource/apiserversource.go b/test/rekt/resources/apiserversource/apiserversource.go
index ed089e89a54..abcd8479d09 100644
--- a/test/rekt/resources/apiserversource/apiserversource.go
+++ b/test/rekt/resources/apiserversource/apiserversource.go
@@ -120,6 +120,13 @@ func WithResources(resources ...v1.APIVersionKindSelector) manifest.CfgFn {
}
}
+// WithNamespaceSelector adds a namespace selector to an ApiServerSource spec.
+func WithNamespaceSelector(selector *metav1.LabelSelector) manifest.CfgFn {
+ return func(cfg map[string]interface{}) {
+ cfg["namespaceSelector"] = labelSelectorToStringMap(selector)
+ }
+}
+
func labelSelectorToStringMap(selector *metav1.LabelSelector) map[string]interface{} {
if selector == nil {
return nil
diff --git a/test/rekt/resources/apiserversource/apiserversource.yaml b/test/rekt/resources/apiserversource/apiserversource.yaml
index 4c4f4f190f8..f14976c8051 100644
--- a/test/rekt/resources/apiserversource/apiserversource.yaml
+++ b/test/rekt/resources/apiserversource/apiserversource.yaml
@@ -24,6 +24,22 @@ spec:
{{ if .mode }}
mode: {{ .mode }}
{{ end }}
+ {{ if .namespaceSelector }}
+ namespaceSelector:
+ matchLabels:
+ {{ range $key, $value := .namespaceSelector.matchLabels }}
+ {{ $key }}: {{ $value }}
+ {{ end }}
+ matchExpressions:
+ {{ range $_, $expr := .namespaceSelector.matchExpressions }}
+ - key: {{ $expr.key }}
+ operator: {{ $expr.operator }}
+ values:
+ {{ range $_, $exprValue := $expr.values }}
+ - {{ $exprValue }}
+ {{ end }}
+ {{ end }}
+ {{ end }}
{{ if .resources }}
resources:
{{ range $_, $resource := .resources }}
diff --git a/test/rekt/resources/apiserversource/apiserversource_test.go b/test/rekt/resources/apiserversource/apiserversource_test.go
index c52d0950131..4d49d07a19f 100644
--- a/test/rekt/resources/apiserversource/apiserversource_test.go
+++ b/test/rekt/resources/apiserversource/apiserversource_test.go
@@ -222,6 +222,78 @@ func Example_withResources() {
// - b
}
+func Example_withNamespaceSelector() {
+ ctx := testlog.NewContext()
+ images := map[string]string{}
+ cfg := map[string]interface{}{
+ "name": "foo",
+ "namespace": "bar",
+ }
+
+ apiserversource.WithNamespaceSelector(&metav1.LabelSelector{
+ MatchLabels: map[string]string{"env": "development"},
+ MatchExpressions: []metav1.LabelSelectorRequirement{{
+ Key: "daf",
+ Operator: "uk",
+ Values: []string{"a", "b"},
+ }},
+ })(cfg)
+
+ files, err := manifest.ExecuteYAML(ctx, yaml, images, cfg)
+ if err != nil {
+ panic(err)
+ }
+
+ manifest.OutputYAML(os.Stdout, files)
+ // Output:
+ // apiVersion: sources.knative.dev/v1
+ // kind: ApiServerSource
+ // metadata:
+ // name: foo
+ // namespace: bar
+ // spec:
+ // namespaceSelector:
+ // matchLabels:
+ // env: development
+ // matchExpressions:
+ // - key: daf
+ // operator: uk
+ // values:
+ // - a
+ // - b
+}
+
+func Example_withEmptyNamespaceSelector() {
+ ctx := testlog.NewContext()
+ images := map[string]string{}
+ cfg := map[string]interface{}{
+ "name": "foo",
+ "namespace": "bar",
+ }
+
+ apiserversource.WithNamespaceSelector(&metav1.LabelSelector{
+ MatchLabels: map[string]string{},
+ MatchExpressions: []metav1.LabelSelectorRequirement{},
+ })(cfg)
+
+ files, err := manifest.ExecuteYAML(ctx, yaml, images, cfg)
+ if err != nil {
+ panic(err)
+ }
+
+ manifest.OutputYAML(os.Stdout, files)
+ // Output:
+ // apiVersion: sources.knative.dev/v1
+ // kind: ApiServerSource
+ // metadata:
+ // name: foo
+ // namespace: bar
+ // spec:
+ // namespaceSelector:
+ // matchLabels:
+ // matchExpressions:
+}
+
func Example_full() {
ctx := testlog.NewContext()
images := map[string]string{}
diff --git a/test/rekt/resources/namespace/namespace.go b/test/rekt/resources/namespace/namespace.go
new file mode 100644
index 00000000000..2984cb435ec
--- /dev/null
+++ b/test/rekt/resources/namespace/namespace.go
@@ -0,0 +1,53 @@
+/*
+Copyright 2023 The Knative Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package namespace
+
+import (
+ "context"
+ "embed"
+
+ "knative.dev/reconciler-test/pkg/feature"
+ "knative.dev/reconciler-test/pkg/manifest"
+)
+
+//go:embed *.yaml
+var yaml embed.FS
+
+func Install(name string, opts ...manifest.CfgFn) feature.StepFn {
+ cfg := map[string]interface{}{
+ "name": name,
+ }
+
+ for _, fn := range opts {
+ fn(cfg)
+ }
+
+ return func(ctx context.Context, t feature.T) {
+ if _, err := manifest.InstallYamlFS(ctx, yaml, cfg); err != nil {
+ t.Fatal(err)
+ }
+ }
+}
+
+// WithLabels sets the given labels on the Namespace.
+func WithLabels(labels map[string]string) manifest.CfgFn {
+ return func(cfg map[string]interface{}) {
+ if labels != nil {
+ cfg["labels"] = labels
+ }
+ }
+}
diff --git a/test/rekt/resources/namespace/namespace.yaml b/test/rekt/resources/namespace/namespace.yaml
new file mode 100644
index 00000000000..59dacf5bb74
--- /dev/null
+++ b/test/rekt/resources/namespace/namespace.yaml
@@ -0,0 +1,24 @@
+# Copyright 2023 The Knative Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+apiVersion: v1
+kind: Namespace
+metadata:
+ name: {{ .name }}
+ {{ if .labels }}
+ labels:
+ {{ range $key, $value := .labels }}
+ {{ $key }}: {{ $value }}
+ {{ end }}
+ {{ end}}
diff --git a/test/rekt/resources/namespace/namespace_test.go b/test/rekt/resources/namespace/namespace_test.go
new file mode 100644
index 00000000000..f84a65da359
--- /dev/null
+++ b/test/rekt/resources/namespace/namespace_test.go
@@ -0,0 +1,74 @@
+/*
+Copyright 2023 The Knative Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package namespace_test
+
+import (
+ "embed"
+ "os"
+
+ "knative.dev/eventing/test/rekt/resources/namespace"
+ testlog "knative.dev/reconciler-test/pkg/logging"
+ "knative.dev/reconciler-test/pkg/manifest"
+)
+
+//go:embed *.yaml
+var yaml embed.FS
+
+func Example_min() {
+ ctx := testlog.NewContext()
+ images := map[string]string{}
+ cfg := map[string]interface{}{
+ "name": "foo",
+ }
+
+ files, err := manifest.ExecuteYAML(ctx, yaml, images, cfg)
+ if err != nil {
+ panic(err)
+ }
+
+ manifest.OutputYAML(os.Stdout, files)
+ // Output:
+ // apiVersion: v1
+ // kind: Namespace
+ // metadata:
+ // name: foo
+}
+
+func Example_withLabels() {
+ ctx := testlog.NewContext()
+ images := map[string]string{}
+ cfg := map[string]interface{}{
+ "name": "foo",
+ "labels": map[string]string{"target": "yes"},
+ }
+
+ namespace.WithLabels(map[string]string{"overwrite": "yes"})(cfg)
+
+ files, err := manifest.ExecuteYAML(ctx, yaml, images, cfg)
+ if err != nil {
+ panic(err)
+ }
+
+ manifest.OutputYAML(os.Stdout, files)
+ // Output:
+ // apiVersion: v1
+ // kind: Namespace
+ // metadata:
+ // name: foo
+ // labels:
+ // overwrite: yes
+}
diff --git a/test/rekt/resources/pod/pod.go b/test/rekt/resources/pod/pod.go
index 66cf8d6d39a..eae541d2946 100644
--- a/test/rekt/resources/pod/pod.go
+++ b/test/rekt/resources/pod/pod.go
@@ -66,3 +66,12 @@ func WithImage(image string) manifest.CfgFn {
}
}
}
+
+// WithOverriddenNamespace will modify the namespace of the pod from the specs to the provided one
+func WithNamespace(ns string) manifest.CfgFn {
+ return func(cfg map[string]interface{}) {
+ if ns != "" {
+ cfg["namespace"] = ns
+ }
+ }
+}
diff --git a/test/rekt/resources/pod/pod_test.go b/test/rekt/resources/pod/pod_test.go
index 12e54953b96..c4dd4542fec 100644
--- a/test/rekt/resources/pod/pod_test.go
+++ b/test/rekt/resources/pod/pod_test.go
@@ -131,6 +131,41 @@ func Example_withImage() {
// - containerPort: 8080
}
+func Example_withNamespace() {
+ ctx := testlog.NewContext()
+ images := map[string]string{}
+ cfg := map[string]interface{}{
+ "name": "foo",
+ "namespace": "bar",
+ "image": "baz",
+ "port": "8080",
+ "labels": map[string]string{"app": "bla"},
+ }
+
+ pod.WithNamespace("new-namespace")(cfg)
+
+ files, err := manifest.ExecuteYAML(ctx, yaml, images, cfg)
+ if err != nil {
+ panic(err)
+ }
+
+ manifest.OutputYAML(os.Stdout, files)
+ // Output:
+ // apiVersion: v1
+ // kind: Pod
+ // metadata:
+ // name: foo
+ // namespace: new-namespace
+ // labels:
+ // app: bla
+ // spec:
+ // containers:
+ // - name: user-container
+ // image: baz
+ // ports:
+ // - containerPort: 8080
+}
+
func Example_full() {
ctx := testlog.NewContext()
images := map[string]string{}