Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 201 additions & 22 deletions cmd/auth_proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ package main

import (
"context"
"encoding/json"
"errors"
"net"
"os"
"sync"
"time"

//nolint:gosec
"crypto/tls"
Expand All @@ -36,11 +38,17 @@ import (

"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/ptr"
cmdbroker "knative.dev/eventing/cmd/broker"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/auth"
eventingclient "knative.dev/eventing/pkg/client/clientset/versioned"
eventinginformers "knative.dev/eventing/pkg/client/informers/externalversions"
eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/apis"
Expand All @@ -64,12 +72,17 @@ type envConfig struct {
TargetHTTPSPort int `envconfig:"TARGET_HTTPS_PORT" default:"8443"`
ProxyHTTPPort int `envconfig:"PROXY_HTTP_PORT" default:"3128"`
ProxyHTTPSPort int `envconfig:"PROXY_HTTPS_PORT" default:"3129"`
ProxyHealthPort int `envconfig:"PROXY_HEALTH_PORT" default:"8090"`

SinkURI string `envconfig:"SINK_URI" required:"true"`
SinkNamespace string `envconfig:"SINK_NAMESPACE" required:"true"`
SinkAudience *string `envconfig:"SINK_AUDIENCE"`

AuthPolicies string `envconfig:"AUTH_POLICIES"`
// Parent resource information for dynamic EventPolicy lookup
ParentAPIVersion string `envconfig:"PARENT_API_VERSION" required:"true"`
ParentKind string `envconfig:"PARENT_KIND" required:"true"`
ParentName string `envconfig:"PARENT_NAME" required:"true"`
ParentNamespace string `envconfig:"PARENT_NAMESPACE" required:"true"`

SinkTLSCertPath *string `envconfig:"SINK_TLS_CERT_FILE"`
SinkTLSKeyPath *string `envconfig:"SINK_TLS_KEY_FILE"`
Expand All @@ -79,13 +92,18 @@ type envConfig struct {
// ProxyHandler handles HTTP requests and performs authentication/authorization
// before forwarding to the target service
type ProxyHandler struct {
kubeClient kubernetes.Interface
withContext func(ctx context.Context) context.Context
authVerifier *auth.Verifier
httpProxy *httputil.ReverseProxy
httpsProxy *httputil.ReverseProxy
config envConfig
authSubjects []auth.SubjectsWithFilters
kubeClient kubernetes.Interface
withContext func(ctx context.Context) context.Context
authVerifier *auth.Verifier
httpProxy *httputil.ReverseProxy
httpsProxy *httputil.ReverseProxy
config envConfig
eventPolicyLister eventingv1alpha1listers.EventPolicyLister
parentResourceGVK schema.GroupVersionKind

// Cache for subjects with filters
subjectsWithFiltersMu sync.RWMutex
cachedSubjectsWithFilters []auth.SubjectsWithFilters
}

func main() {
Expand All @@ -104,11 +122,21 @@ func main() {

featureStore := setupFeatureStore(ctx, logger, configMapWatcher)

handler, err := createProxyHandler(ctx, config, logger, featureStore, configMapWatcher)
// Create namespace-scoped EventPolicy informer
eventPolicyLister, eventPolicyInformer, err := setupEventPolicyInformer(ctx, config, logger)
if err != nil {
logger.Fatalw("Failed to setup EventPolicy informer", zap.Error(err))
}
informers = append(informers, eventPolicyInformer)

handler, err := createProxyHandler(ctx, config, logger, featureStore, configMapWatcher, eventPolicyLister)
if err != nil {
logger.Fatalw("Failed to create proxy handler", zap.Error(err))
}

// Register event handler to invalidate cache when EventPolicies change
registerEventPolicyHandler(eventPolicyInformer, handler, logger)

serverManager, err := createServerManager(ctx, config, handler, logger, configMapWatcher)
if err != nil {
logger.Fatalw("Failed to create server manager", zap.Error(err))
Expand All @@ -118,6 +146,20 @@ func main() {
logger.Fatalw("Failed to start services", zap.Error(err))
}

// Start health server
healthServer := startHealthServer(config, handler)
go func() {
logger.Infof("Starting health server on port %d", config.ProxyHealthPort)
if err := healthServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Errorw("Health server error", zap.Error(err))
}
}()
defer func() {
if err := healthServer.Shutdown(context.Background()); err != nil {
logger.Errorw("Error shutting down health server", zap.Error(err))
}
}()

logger.Info("Starting auth proxy servers...")
if err = serverManager.StartServers(ctx); err != nil {
logger.Fatalw("StartServers() returned an error", zap.Error(err))
Expand Down Expand Up @@ -166,21 +208,45 @@ func setupFeatureStore(_ context.Context, logger *zap.SugaredLogger, configMapWa
return featureStore
}

// createProxyHandler creates and configures the proxy handler
func createProxyHandler(ctx context.Context, config envConfig, logger *zap.SugaredLogger, featureStore *feature.Store, configMapWatcher *configmap.InformedWatcher) (*ProxyHandler, error) {
var authSubjects []auth.SubjectsWithFilters
// setupEventPolicyInformer creates a namespace-scoped EventPolicy informer
func setupEventPolicyInformer(ctx context.Context, config envConfig, logger *zap.SugaredLogger) (eventingv1alpha1listers.EventPolicyLister, cache.SharedIndexInformer, error) {
cfg := injection.GetConfig(ctx)

if len(config.AuthPolicies) > 0 {
if err := json.Unmarshal([]byte(config.AuthPolicies), &authSubjects); err != nil {
return nil, fmt.Errorf("failed to parse policies: %w", err)
}
// Create eventing client
eventingClient, err := eventingclient.NewForConfig(cfg)
if err != nil {
return nil, nil, fmt.Errorf("failed to create eventing client: %w", err)
}

// Create namespace-scoped informer factory
eventingInformerFactory := eventinginformers.NewSharedInformerFactoryWithOptions(
eventingClient,
controller.GetResyncPeriod(ctx),
eventinginformers.WithNamespace(config.ParentNamespace),
)

eventPolicyInformer := eventingInformerFactory.Eventing().V1alpha1().EventPolicies()

logger.Infof("Created namespace-scoped EventPolicy informer for namespace %s", config.ParentNamespace)

return eventPolicyInformer.Lister(), eventPolicyInformer.Informer(), nil
}

// createProxyHandler creates and configures the proxy handler
func createProxyHandler(ctx context.Context, config envConfig, logger *zap.SugaredLogger, featureStore *feature.Store, configMapWatcher *configmap.InformedWatcher, eventPolicyLister eventingv1alpha1listers.EventPolicyLister) (*ProxyHandler, error) {
// Parse parent resource GVK
gv, err := schema.ParseGroupVersion(config.ParentAPIVersion)
if err != nil {
return nil, fmt.Errorf("failed to parse parent API version %q: %w", config.ParentAPIVersion, err)
}
parentGVK := gv.WithKind(config.ParentKind)

handler := &ProxyHandler{
kubeClient: kubeclient.Get(ctx),
authVerifier: auth.NewVerifier(ctx, nil, nil, configMapWatcher),
config: config,
authSubjects: authSubjects,
kubeClient: kubeclient.Get(ctx),
authVerifier: auth.NewVerifier(ctx, nil, nil, configMapWatcher),
config: config,
eventPolicyLister: eventPolicyLister,
parentResourceGVK: parentGVK,
}

handler.withContext = func(ctx context.Context) context.Context {
Expand Down Expand Up @@ -244,14 +310,74 @@ func startServices(ctx context.Context, informers []controller.Informer, configM
return nil
}

// registerEventPolicyHandler registers an event handler on the EventPolicy informer
// to invalidate the cache when policies change
func registerEventPolicyHandler(eventPolicyInformer cache.SharedIndexInformer, handler *ProxyHandler, logger *zap.SugaredLogger) {
_, _ = eventPolicyInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
handler.invalidateSubjectsCache(logger)
},
UpdateFunc: func(oldObj, newObj interface{}) {
handler.invalidateSubjectsCache(logger)
},
DeleteFunc: func(obj interface{}) {
handler.invalidateSubjectsCache(logger)
},
})
}

// invalidateSubjectsCache clears the cached subjects with filters
func (h *ProxyHandler) invalidateSubjectsCache(logger *zap.SugaredLogger) {
h.subjectsWithFiltersMu.Lock()
defer h.subjectsWithFiltersMu.Unlock()
h.cachedSubjectsWithFilters = nil
logger.Debug("Invalidated subjects cache due to EventPolicy change")
}

// startHealthServer creates and returns an HTTP server for health checks
func startHealthServer(config envConfig, handler *ProxyHandler) *http.Server {
mux := http.NewServeMux()

// Readiness endpoint - checks if auth-proxy is ready to validate tokens
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
if handler.authVerifier.IsReady() {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ready"))
} else {
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte("not ready"))
}
})

// Liveness endpoint - always returns OK if the server is running
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("alive"))
})

return &http.Server{
Addr: fmt.Sprintf(":%d", config.ProxyHealthPort),
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
}
}

func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := h.withContext(r.Context())
logger := logging.FromContext(ctx)
features := feature.FromContext(ctx)

logger.Debugf("Handling request to %s", r.RequestURI)

err := h.authVerifier.VerifyRequestFromSubjectsWithFilters(ctx, features, h.config.SinkAudience, h.authSubjects, h.config.SinkNamespace, r, w)
// Get EventPolicies dynamically for the parent resource
authSubjects, err := h.getAuthSubjects(logger)
if err != nil {
logger.Errorw("Failed to get EventPolicies", zap.Error(err))
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}

err = h.authVerifier.VerifyRequestFromSubjectsWithFilters(ctx, features, h.config.SinkAudience, authSubjects, h.config.SinkNamespace, r, w)
if err != nil {
logger.Debugw("Failed to verify AuthN and AuthZ", zap.Error(err))
return
Expand All @@ -266,6 +392,59 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

// getAuthSubjects retrieves EventPolicies for the parent resource and converts them to SubjectsWithFilters
func (h *ProxyHandler) getAuthSubjects(logger *zap.SugaredLogger) ([]auth.SubjectsWithFilters, error) {
// Try to use cached value (fast path with read lock)
h.subjectsWithFiltersMu.RLock()
if h.cachedSubjectsWithFilters != nil {
cached := h.cachedSubjectsWithFilters
h.subjectsWithFiltersMu.RUnlock()
logger.Debug("Using cached subjects with filters")
return cached, nil
}
h.subjectsWithFiltersMu.RUnlock()

// Cache miss - acquire write lock and compute
h.subjectsWithFiltersMu.Lock()
defer h.subjectsWithFiltersMu.Unlock()

// Double-check after acquiring write lock (another goroutine might have populated the cache)
if h.cachedSubjectsWithFilters != nil {
logger.Debug("Using cached subjects with filters (after lock)")
return h.cachedSubjectsWithFilters, nil
}

// Get EventPolicies applying to this resource
policies, err := auth.GetEventPoliciesForResource(
h.eventPolicyLister,
h.parentResourceGVK,
metav1.ObjectMeta{
Name: h.config.ParentName,
Namespace: h.config.ParentNamespace,
},
)
if err != nil {
return nil, fmt.Errorf("failed to get EventPolicies: %w", err)
}

logger.Debugf("Found %d EventPolicies for %s/%s", len(policies), h.config.ParentNamespace, h.config.ParentName)

// Convert EventPolicies to SubjectsWithFilters
subjectsWithFilters := make([]auth.SubjectsWithFilters, 0, len(policies))
for _, policy := range policies {
subjectsWithFilters = append(subjectsWithFilters, auth.SubjectsWithFilters{
Subjects: policy.Status.From,
Filters: policy.Spec.Filters,
})
}

// Store in cache
h.cachedSubjectsWithFilters = subjectsWithFilters
logger.Debugf("Cached %d subjects with filters for %s/%s", len(subjectsWithFilters), h.config.ParentNamespace, h.config.ParentName)

return subjectsWithFilters, nil
}

// httpReverseProxy creates a reverse proxy for HTTP traffic to the target service
func httpReverseProxy(config envConfig) (*httputil.ReverseProxy, error) {
httpTarget := fmt.Sprintf("http://%s:%d", config.TargetHost, config.TargetHTTPPort)
Expand Down
13 changes: 13 additions & 0 deletions config/core/roles/clusterrole-namespaced.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,16 @@ rules:
- apiGroups: ["eventing.knative.dev", "messaging.knative.dev", "sources.knative.dev", "flows.knative.dev", "bindings.knative.dev", "sinks.knative.dev"]
resources: ["*"]
verbs: ["get", "list", "watch"]
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: knative-eventing-eventpolicy-reader
labels:
rbac.authorization.k8s.io/aggregate-to-view: "true"
app.kubernetes.io/version: devel
app.kubernetes.io/name: knative-eventing
rules:
- apiGroups: ["eventing.knative.dev"]
resources: ["eventpolicies"]
verbs: ["get", "list", "watch"]
9 changes: 9 additions & 0 deletions pkg/auth/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,15 @@ func (v *Verifier) initOIDCProvider(ctx context.Context, features feature.Flags)
return nil
}

// IsReady returns true if the OIDC provider has been initialized and the verifier
// is ready to validate tokens. This is used by health checks to ensure the auth-proxy
// doesn't receive traffic before it can properly validate authentication.
func (v *Verifier) IsReady() bool {
v.m.RLock()
defer v.m.RUnlock()
return v.provider != nil
}

func (v *Verifier) getHTTPClientForKubeAPIServer() (*http.Client, error) {
client, err := rest.HTTPClientFor(v.restConfig)
if err != nil {
Expand Down
Loading