Skip to content

Commit

Permalink
mt-broker-filter: Allow only requests from Triggers Subscriptions OID…
Browse files Browse the repository at this point in the history
…C ID (#8147)

mt-broker-filter: Allow only requests from Subscriptions OIDC ID
  • Loading branch information
creydr authored Aug 12, 2024
1 parent 941a9e1 commit f0ccedc
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 24 deletions.
3 changes: 2 additions & 1 deletion cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
eventtypeinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1beta2/eventtype"
subscriptioninformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/eventing/pkg/eventtype"
"knative.dev/eventing/pkg/reconciler/names"
Expand Down Expand Up @@ -153,7 +154,7 @@ func main() {
// the messages to the triggers' subscribers) in this binary.
oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx)
trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())
handler, err = filter.NewHandler(logger, oidcTokenVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), reporter, trustBundleConfigMapInformer, ctxFunc)
handler, err = filter.NewHandler(logger, oidcTokenVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), subscriptioninformer.Get(ctx), reporter, trustBundleConfigMapInformer, ctxFunc)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ rules:
- get
- list
- watch
# get subscription of trigger for AuthZ
- apiGroups:
- messaging.knative.dev
resources:
- subscriptions
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
23 changes: 23 additions & 0 deletions pkg/auth/token_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,29 @@ func (v *OIDCTokenVerifier) VerifyRequest(ctx context.Context, features feature.
return nil
}

// VerifyRequestFromSubject verifies AuthN and AuthZ in the request.
// In the AuthZ part it checks if the request comes from the given allowedSubject.
// On verification errors, it sets the responses HTTP status and returns an error.
// This method is similar to VerifyRequest() except that VerifyRequestFromSubject()
// verifies in the AuthZ part that the request comes from a given subject.
func (v *OIDCTokenVerifier) VerifyRequestFromSubject(ctx context.Context, features feature.Flags, requiredOIDCAudience *string, allowedSubject string, req *http.Request, resp http.ResponseWriter) error {
if !features.IsOIDCAuthentication() {
return nil
}

idToken, err := v.verifyAuthN(ctx, requiredOIDCAudience, req, resp)
if err != nil {
return fmt.Errorf("authentication of request could not be verified: %w", err)
}

if idToken.Subject != allowedSubject {
resp.WriteHeader(http.StatusForbidden)
return fmt.Errorf("token is from subject %q, but only %q is allowed", idToken.Subject, allowedSubject)
}

return nil
}

// verifyAuthN verifies if the incoming request contains a correct JWT token
func (v *OIDCTokenVerifier) verifyAuthN(ctx context.Context, audience *string, req *http.Request, resp http.ResponseWriter) (*IDToken, error) {
token := GetJWTFromHeader(req.Header)
Expand Down
63 changes: 45 additions & 18 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
"net/http"
"time"

messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
messaginginformers "knative.dev/eventing/pkg/client/informers/externalversions/messaging/v1"
"knative.dev/eventing/pkg/reconciler/broker/resources"

opencensusclient "github.com/cloudevents/sdk-go/observability/opencensus/v2/client"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
Expand All @@ -49,6 +53,7 @@ import (
eventingbroker "knative.dev/eventing/pkg/broker"
v1 "knative.dev/eventing/pkg/client/informers/externalversions/eventing/v1"
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1"
messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1"
"knative.dev/eventing/pkg/eventfilter"
"knative.dev/eventing/pkg/eventfilter/attributes"
"knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
Expand Down Expand Up @@ -79,17 +84,18 @@ type Handler struct {

eventDispatcher *kncloudevents.Dispatcher

triggerLister eventinglisters.TriggerLister
brokerLister eventinglisters.BrokerLister
logger *zap.Logger
withContext func(ctx context.Context) context.Context
filtersMap *subscriptionsapi.FiltersMap
tokenVerifier *auth.OIDCTokenVerifier
EventTypeCreator *eventtype.EventTypeAutoHandler
triggerLister eventinglisters.TriggerLister
brokerLister eventinglisters.BrokerLister
subscriptionLister messaginglisters.SubscriptionLister
logger *zap.Logger
withContext func(ctx context.Context) context.Context
filtersMap *subscriptionsapi.FiltersMap
tokenVerifier *auth.OIDCTokenVerifier
EventTypeCreator *eventtype.EventTypeAutoHandler
}

// NewHandler creates a new Handler and its associated EventReceiver.
func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, brokerInformer v1.BrokerInformer, reporter StatsReporter, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, wc func(ctx context.Context) context.Context) (*Handler, error) {
func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, brokerInformer v1.BrokerInformer, subscriptionInformer messaginginformers.SubscriptionInformer, reporter StatsReporter, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, wc func(ctx context.Context) context.Context) (*Handler, error) {
kncloudevents.ConfigureConnectionArgs(&kncloudevents.ConnectionArgs{
MaxIdleConns: defaultMaxIdleConnections,
MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost,
Expand Down Expand Up @@ -141,19 +147,21 @@ func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcT
})

return &Handler{
reporter: reporter,
eventDispatcher: kncloudevents.NewDispatcher(clientConfig, oidcTokenProvider),
triggerLister: triggerInformer.Lister(),
brokerLister: brokerInformer.Lister(),
logger: logger,
tokenVerifier: tokenVerifier,
withContext: wc,
filtersMap: fm,
reporter: reporter,
eventDispatcher: kncloudevents.NewDispatcher(clientConfig, oidcTokenProvider),
triggerLister: triggerInformer.Lister(),
brokerLister: brokerInformer.Lister(),
subscriptionLister: subscriptionInformer.Lister(),
logger: logger,
tokenVerifier: tokenVerifier,
withContext: wc,
filtersMap: fm,
}, nil
}

func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
ctx := h.withContext(request.Context())
features := feature.FromContext(ctx)

writer.Header().Set("Allow", "POST")

Expand All @@ -176,6 +184,13 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
return
}

subscription, err := h.getSubscription(features, trigger)
if err != nil {
h.logger.Info("Unable to get the Subscription of the Trigger", zap.Error(err), zap.Any("triggerRef", triggerRef))
writer.WriteHeader(http.StatusInternalServerError)
return
}

event, err := cehttp.NewEventFromHTTPRequest(request)
if err != nil {
h.logger.Warn("failed to extract event from request", zap.Error(err))
Expand All @@ -198,13 +213,19 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
span.AddAttributes(opencensusclient.EventTraceAttributes(event)...)
}

features := feature.FromContext(ctx)
if features.IsOIDCAuthentication() {
h.logger.Debug("OIDC authentication is enabled")

audience := FilterAudience

err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, &audience, writer)
if subscription.Status.Auth == nil || subscription.Status.Auth.ServiceAccountName == nil {
h.logger.Warn("Subscription does not have an OIDC identity set, while OIDC is enabled", zap.String("subscription", subscription.Name), zap.String("subscription-namespace", subscription.Namespace))
writer.WriteHeader(http.StatusInternalServerError)
return
}

subscriptionFullIdentity := fmt.Sprintf("system:serviceaccount:%s:%s", subscription.Namespace, *subscription.Status.Auth.ServiceAccountName)
err = h.tokenVerifier.VerifyRequestFromSubject(ctx, features, &audience, subscriptionFullIdentity, request, writer)
if err != nil {
h.logger.Warn("Error when validating the JWT token in the request", zap.Error(err))
return
Expand Down Expand Up @@ -561,6 +582,12 @@ func (h *Handler) getTrigger(ref path.NamespacedNameUID) (*eventingv1.Trigger, e
return t, nil
}

func (h *Handler) getSubscription(features feature.Flags, trigger *eventingv1.Trigger) (*messagingv1.Subscription, error) {
subscriptionName := resources.SubscriptionName(features, trigger)

return h.subscriptionLister.Subscriptions(trigger.Namespace).Get(subscriptionName)
}

func (h *Handler) filterEvent(ctx context.Context, trigger *eventingv1.Trigger, event cloudevents.Event) eventfilter.FilterResult {
switch {
case len(trigger.Spec.Filters) > 0:
Expand Down
24 changes: 24 additions & 0 deletions pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"testing"
"time"

messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/reconciler/broker/resources"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/event"
Expand All @@ -51,6 +54,7 @@ import (

brokerinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake"
triggerinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake"
subscriptioninformerfake "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake"

// Fake injection client
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"
Expand Down Expand Up @@ -453,6 +457,15 @@ func TestReceiver(t *testing.T) {
}
triggerinformerfake.Get(ctx).Informer().GetStore().Add(trig)

// create needed triggers subscription object
sub := &messagingv1.Subscription{
ObjectMeta: metav1.ObjectMeta{
Name: resources.SubscriptionName(feature.FromContext(ctx), trig),
Namespace: trig.Namespace,
},
}
subscriptioninformerfake.Get(ctx).Informer().GetStore().Add(sub)

// create the needed broker object
b := &v1.Broker{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -470,6 +483,7 @@ func TestReceiver(t *testing.T) {
oidcTokenProvider,
triggerinformerfake.Get(ctx),
brokerinformerfake.Get(ctx),
subscriptioninformerfake.Get(ctx),
reporter,
configmapinformer.Get(ctx).Lister().ConfigMaps("ns"),
func(ctx context.Context) context.Context {
Expand Down Expand Up @@ -653,6 +667,15 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {
triggerinformerfake.Get(ctx).Informer().GetStore().Add(trig)
filtersMap.Set(trig, createSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trig))

// create needed triggers subscription object
sub := &messagingv1.Subscription{
ObjectMeta: metav1.ObjectMeta{
Name: resources.SubscriptionName(feature.FromContext(ctx), trig),
Namespace: trig.Namespace,
},
}
subscriptioninformerfake.Get(ctx).Informer().GetStore().Add(sub)

// create the needed broker object
b := &v1.Broker{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -669,6 +692,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {
oidcTokenProvider,
triggerinformerfake.Get(ctx),
brokerinformerfake.Get(ctx),
subscriptioninformerfake.Get(ctx),
reporter,
configmapinformer.Get(ctx).Lister().ConfigMaps("ns"),
func(ctx context.Context) context.Context {
Expand Down
21 changes: 16 additions & 5 deletions pkg/reconciler/broker/resources/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,17 @@ import (
// NewSubscription returns a placeholder subscription for trigger 't', from brokerTrigger to 'dest'
// replying to brokerIngress.
func NewSubscription(ctx context.Context, t *eventingv1.Trigger, brokerTrigger *corev1.ObjectReference, dest, reply *duckv1.Destination, delivery *eventingduckv1.DeliverySpec) *messagingv1.Subscription {
var broker, channelNamespace string
if t.Spec.BrokerRef != nil && feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) {
broker = t.Spec.BrokerRef.Name
features := feature.FromContext(ctx)
var channelNamespace string
if t.Spec.BrokerRef != nil && features.IsEnabled(feature.CrossNamespaceEventLinks) {
channelNamespace = t.Spec.BrokerRef.Namespace
} else {
broker = t.Spec.Broker
channelNamespace = ""
}
return &messagingv1.Subscription{
ObjectMeta: metav1.ObjectMeta{
Namespace: t.Namespace,
Name: kmeta.ChildName(fmt.Sprintf("%s-%s-", broker, t.Name), string(t.GetUID())),
Name: SubscriptionName(features, t),
OwnerReferences: []metav1.OwnerReference{
*kmeta.NewControllerRef(t),
},
Expand All @@ -66,6 +65,18 @@ func NewSubscription(ctx context.Context, t *eventingv1.Trigger, brokerTrigger *
}
}

// SubscriptionName returns the name of the Subscription for the given Trigger
func SubscriptionName(features feature.Flags, trigger *eventingv1.Trigger) string {
var brokerName string
if trigger.Spec.BrokerRef != nil && features.IsEnabled(feature.CrossNamespaceEventLinks) {
brokerName = trigger.Spec.BrokerRef.Name
} else {
brokerName = trigger.Spec.Broker
}

return kmeta.ChildName(fmt.Sprintf("%s-%s-", brokerName, trigger.Name), string(trigger.GetUID()))
}

// SubscriptionLabels generates the labels present on the Subscription linking this Trigger to the
// Broker's Channels.
func SubscriptionLabels(ctx context.Context, t *eventingv1.Trigger) map[string]string {
Expand Down

0 comments on commit f0ccedc

Please sign in to comment.