diff --git a/cmd/contour/serve.go b/cmd/contour/serve.go index dd5343d7a92..21d6ee02281 100644 --- a/cmd/contour/serve.go +++ b/cmd/contour/serve.go @@ -127,15 +127,12 @@ func registerServe(app *kingpin.Application) (*kingpin.CmdClause, *serveContext) // doServe runs the contour serve subcommand. func doServe(log logrus.FieldLogger, ctx *serveContext) error { - - // step 1. establish k8s core & dynamic client connections + // Establish k8s core & dynamic client connections. clients, err := k8s.NewClients(ctx.Kubeconfig, ctx.InCluster) if err != nil { return fmt.Errorf("failed to create Kubernetes clients: %w", err) } - // step 2. create informer factories - // Factory for cluster-wide informers. clusterInformerFactory := clients.NewInformerFactory() @@ -162,7 +159,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { } } - // setup prometheus registry and register base metrics. + // Set up Prometheus registry and register base metrics. registry := prometheus.NewRegistry() registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) registry.MustRegister(prometheus.NewGoCollector()) @@ -175,7 +172,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { return err } - listenerConfig := contour.ListenerVisitorConfig{ + listenerConfig := contour.ListenerConfig{ UseProxyProto: ctx.useProxyProto, HTTPAddress: ctx.httpAddr, HTTPPort: ctx.httpPort, @@ -200,16 +197,25 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { listenerConfig.DefaultHTTPVersions = defaultHTTPVersions - // step 3. build our mammoth Kubernetes event handler. + // Endpoints updates are handled directly by the EndpointsTranslator + // due to their high update rate and their orthogonal nature. + endpointHandler := &contour.EndpointsTranslator{ + FieldLogger: log.WithField("context", "endpointstranslator"), + } + + resources := []contour.ResourceCache{ + contour.NewListenerCache(listenerConfig, ctx.statsAddr, ctx.statsPort), + &contour.SecretCache{}, + &contour.RouteCache{}, + &contour.ClusterCache{}, + endpointHandler, + } + + // Build the core Kubernetes event handler. eventHandler := &contour.EventHandler{ - CacheHandler: &contour.CacheHandler{ - ListenerVisitorConfig: listenerConfig, - ListenerCache: contour.NewListenerCache(ctx.statsAddr, ctx.statsPort), - FieldLogger: log.WithField("context", "CacheHandler"), - Metrics: metrics.NewMetrics(registry), - }, HoldoffDelay: 100 * time.Millisecond, HoldoffMaxDelay: 500 * time.Millisecond, + Observer: dag.ComposeObservers(contour.ObserversOf(resources)...), Builder: dag.Builder{ Source: dag.KubernetesCache{ RootNamespaces: ctx.proxyRootNamespaces(), @@ -219,15 +225,16 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { DisablePermitInsecure: ctx.DisablePermitInsecure, }, FieldLogger: log.WithField("context", "contourEventHandler"), + Metrics: metrics.NewMetrics(registry), } // Set the fallback certificate if configured. if fallbackCert != nil { log.WithField("context", "fallback-certificate").Infof("enabled fallback certificate with secret: %q", fallbackCert) - eventHandler.FallbackCertificate = fallbackCert + eventHandler.Builder.FallbackCertificate = fallbackCert } - // wrap eventHandler in a converter for objects from the dynamic client. + // Wrap eventHandler in a converter for objects from the dynamic client. // and an EventRecorder which tracks API server events. dynamicHandler := &k8s.DynamicClientHandler{ Next: &contour.EventRecorder{ @@ -238,7 +245,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { Logger: log.WithField("context", "dynamicHandler"), } - // step 4. register our resource event handler with the k8s informers, + // Register our resource event handler with the k8s informers, // using the SyncList to keep track of what to sync later. var informerSyncList k8s.InformerSyncList @@ -270,23 +277,17 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { informerSyncList.InformOnResources(clusterInformerFactory, dynamicHandler, k8s.SecretsResources()...) } - // step 5. endpoints updates are handled directly by the EndpointsTranslator - // due to their high update rate and their orthogonal nature. - et := &contour.EndpointsTranslator{ - FieldLogger: log.WithField("context", "endpointstranslator"), - } - informerSyncList.InformOnResources(clusterInformerFactory, &k8s.DynamicClientHandler{ Next: &contour.EventRecorder{ - Next: et, + Next: endpointHandler, Counter: eventHandler.Metrics.EventHandlerOperations, }, Converter: converter, Logger: log.WithField("context", "endpointstranslator"), }, k8s.EndpointsResources()...) - // step 6. setup workgroup runner and register informers. + // Set up workgroup runner and register informers. var g workgroup.Group g.Add(startInformer(clusterInformerFactory, log.WithField("context", "contourinformers"))) @@ -294,10 +295,10 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { g.Add(startInformer(factory, log.WithField("context", "corenamespacedinformers").WithField("namespace", ns))) } - // step 7. register our event handler with the workgroup + // Register our event handler with the workgroup. g.Add(eventHandler.Start()) - // step 8. create metrics service and register with workgroup. + // Create metrics service and register with workgroup. metricsvc := httpsvc.Service{ Addr: ctx.metricsAddr, Port: ctx.metricsPort, @@ -315,7 +316,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { g.Add(metricsvc.Start) - // step 9. create a separate health service if required. + // Create a separate health service if required. if ctx.healthAddr != ctx.metricsAddr || ctx.healthPort != ctx.metricsPort { healthsvc := httpsvc.Service{ Addr: ctx.healthAddr, @@ -330,7 +331,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { g.Add(healthsvc.Start) } - // step 10. create debug service and register with workgroup. + // Create debug service and register with workgroup. debugsvc := debug.Service{ Service: httpsvc.Service{ Addr: ctx.debugAddr, @@ -341,7 +342,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { } g.Add(debugsvc.Start) - // step 11. register leadership election. + // Register leadership election. eventHandler.IsLeader = setupLeadershipElection(&g, log, ctx, clients, eventHandler.UpdateNow) sh := k8s.StatusUpdateHandler{ @@ -358,7 +359,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { Updater: sh.Writer(), } - // step 11. set up ingress load balancer status writer + // Set up ingress load balancer status writer. lbsw := loadBalancerStatusWriter{ log: log.WithField("context", "loadBalancerStatusWriter"), clients: clients, @@ -370,7 +371,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { } g.Add(lbsw.Start) - // step 12. register an informer to watch envoy's service if we haven't been given static details. + // Register an informer to watch envoy's service if we haven't been given static details. if ctx.IngressStatusAddress == "" { dynamicServiceHandler := &k8s.DynamicClientHandler{ Next: &k8s.ServiceStatusLoadBalancerWatcher{ @@ -402,15 +403,8 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { } log.Printf("informer caches synced") - resources := map[string]cgrpc.Resource{ - eventHandler.CacheHandler.ClusterCache.TypeURL(): &eventHandler.CacheHandler.ClusterCache, - eventHandler.CacheHandler.RouteCache.TypeURL(): &eventHandler.CacheHandler.RouteCache, - eventHandler.CacheHandler.ListenerCache.TypeURL(): &eventHandler.CacheHandler.ListenerCache, - eventHandler.CacheHandler.SecretCache.TypeURL(): &eventHandler.CacheHandler.SecretCache, - et.TypeURL(): et, - } opts := ctx.grpcOptions() - s := cgrpc.NewAPI(log, resources, registry, opts...) + s := cgrpc.NewAPI(log, contour.ResourcesOf(resources), registry, opts...) addr := net.JoinHostPort(ctx.xdsAddr, strconv.Itoa(ctx.xdsPort)) l, err := net.Listen("tcp", addr) if err != nil { @@ -433,7 +427,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { return s.Serve(l) }) - // step 14. Setup SIGTERM handler + // Set up SIGTERM handler for graceful shutdown. g.Add(func(stop <-chan struct{}) error { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGTERM, syscall.SIGINT) diff --git a/internal/contour/cachehandler.go b/internal/contour/cachehandler.go index b3029d69906..9eae11d73b6 100644 --- a/internal/contour/cachehandler.go +++ b/internal/contour/cachehandler.go @@ -17,55 +17,32 @@ package contour import ( - "time" - "github.com/projectcontour/contour/internal/dag" - "github.com/projectcontour/contour/internal/metrics" - "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" + "github.com/projectcontour/contour/internal/grpc" ) -// CacheHandler manages the state of xDS caches. -type CacheHandler struct { - ListenerVisitorConfig - ListenerCache - RouteCache - ClusterCache - SecretCache - - *metrics.Metrics - - logrus.FieldLogger -} - -func (ch *CacheHandler) OnChange(dag *dag.DAG) { - timer := prometheus.NewTimer(ch.CacheHandlerOnUpdateSummary) - defer timer.ObserveDuration() - - ch.updateSecrets(dag) - ch.updateListeners(dag) - ch.updateRoutes(dag) - ch.updateClusters(dag) - - ch.SetDAGLastRebuilt(time.Now()) -} - -func (ch *CacheHandler) updateSecrets(root dag.Visitable) { - secrets := visitSecrets(root) - ch.SecretCache.Update(secrets) -} - -func (ch *CacheHandler) updateListeners(root dag.Visitable) { - listeners := visitListeners(root, &ch.ListenerVisitorConfig) - ch.ListenerCache.Update(listeners) +// ResourceCache is a store of an xDS resource type. It is able to +// visit the dag.DAG to update the its resource collection, then +// serve those resources over xDS. +type ResourceCache interface { + dag.Observer + grpc.Resource } -func (ch *CacheHandler) updateRoutes(root dag.Visitable) { - routes := visitRoutes(root) - ch.RouteCache.Update(routes) +// ResourcesOf transliterates a slice of ResourceCache into a slice of grpc.Resource. +func ResourcesOf(in []ResourceCache) []grpc.Resource { + out := make([]grpc.Resource, len(in)) + for i := range in { + out[i] = in[i] + } + return out } -func (ch *CacheHandler) updateClusters(root dag.Visitable) { - clusters := visitClusters(root) - ch.ClusterCache.Update(clusters) +// ObserversOf transliterates a slice of ResourceCache into a slice of dag.Observer. +func ObserversOf(in []ResourceCache) []dag.Observer { + out := make([]dag.Observer, len(in)) + for i := range in { + out[i] = in[i] + } + return out } diff --git a/internal/contour/cluster.go b/internal/contour/cluster.go index 5c3c5fd5450..6f46bc4bc13 100644 --- a/internal/contour/cluster.go +++ b/internal/contour/cluster.go @@ -18,6 +18,7 @@ import ( "sync" resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2" + "github.com/projectcontour/contour/internal/grpc" v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" "github.com/golang/protobuf/proto" @@ -34,6 +35,9 @@ type ClusterCache struct { Cond } +var _ dag.Observer = &ClusterCache{} +var _ grpc.Resource = &ClusterCache{} + // Update replaces the contents of the cache with the supplied map. func (c *ClusterCache) Update(v map[string]*v2.Cluster) { c.mu.Lock() @@ -75,6 +79,11 @@ func (c *ClusterCache) Query(names []string) []proto.Message { func (*ClusterCache) TypeURL() string { return resource.ClusterType } +func (c *ClusterCache) OnChange(root *dag.DAG) { + clusters := visitClusters(root) + c.Update(clusters) +} + type clusterVisitor struct { clusters map[string]*v2.Cluster } diff --git a/internal/contour/endpointstranslator.go b/internal/contour/endpointstranslator.go index 2c9629e380b..350d433ed63 100644 --- a/internal/contour/endpointstranslator.go +++ b/internal/contour/endpointstranslator.go @@ -21,6 +21,7 @@ import ( envoy_api_v2_endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2" "github.com/golang/protobuf/proto" + "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/envoy" "github.com/projectcontour/contour/internal/k8s" "github.com/projectcontour/contour/internal/protobuf" @@ -40,6 +41,14 @@ type EndpointsTranslator struct { entries map[string]*v2.ClusterLoadAssignment } +var _ k8scache.ResourceEventHandler = &EndpointsTranslator{} +var _ dag.Observer = &EndpointsTranslator{} + +func (e *EndpointsTranslator) OnChange(d *dag.DAG) { + // TODO(jpeach) Update the internal model to map which + // services are targets of which cluster load assignments. +} + func (e *EndpointsTranslator) OnAdd(obj interface{}) { switch obj := obj.(type) { case *v1.Endpoints: diff --git a/internal/contour/handler.go b/internal/contour/handler.go index b45600e1a97..cbbe9492fb3 100644 --- a/internal/contour/handler.go +++ b/internal/contour/handler.go @@ -24,18 +24,20 @@ import ( projcontour "github.com/projectcontour/contour/apis/projectcontour/v1" "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/k8s" + "github.com/projectcontour/contour/internal/metrics" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ) // EventHandler implements cache.ResourceEventHandler, filters k8s events towards -// a dag.Builder and calls through to the CacheHandler to notify it that a new DAG +// a dag.Builder and calls through to the Observer to notify it that a new DAG // is available. type EventHandler struct { - dag.Builder - - *CacheHandler + Builder dag.Builder + Observer dag.Observer + Metrics *metrics.Metrics HoldoffDelay, HoldoffMaxDelay time.Duration @@ -53,7 +55,8 @@ type EventHandler struct { // Sequence is a channel that receives a incrementing sequence number // for each update processed. The updates may be processed immediately, or // delayed by a holdoff timer. In each case a non blocking send to Sequence - // will be made once CacheHandler.OnUpdate has been called. + // will be made once the resource update is received (note + // that the DAG is not guaranteed to be called each time). Sequence chan int // seq is the sequence counter of the number of times @@ -104,7 +107,7 @@ func (e *EventHandler) run(stop <-chan struct{}) error { var ( // outstanding counts the number of events received but not - // yet send to the CacheHandler. + // yet included in a DAG rebuild. outstanding int // timer holds the timer which will expire after e.HoldoffDelay @@ -113,7 +116,7 @@ func (e *EventHandler) run(stop <-chan struct{}) error { // pending is a reference to the current timer's channel. pending <-chan time.Time - // lastDAGUpdate holds the last time updateDAG was called. + // lastDAGUpdate holds the last time rebuildDAG was called. // lastDAGUpdate is seeded to the current time on entry to // run to allow the holdoff timer to batch the updates from // the API informers. @@ -131,7 +134,7 @@ func (e *EventHandler) run(stop <-chan struct{}) error { // pending may be nil if there are no pending events. // 2. We're processing an event. // 3. The holdoff timer from a previous event has fired and we're - // building a new DAG and sending to the CacheHandler. + // building a new DAG and sending to the Observer. // 4. We're stopping. // // Only one of these things can happen at a time. @@ -159,7 +162,7 @@ func (e *EventHandler) run(stop <-chan struct{}) error { } case <-pending: e.WithField("last_update", time.Since(lastDAGUpdate)).WithField("outstanding", reset()).Info("performing delayed update") - e.updateDAG() + e.rebuildDAG() e.incSequence() lastDAGUpdate = time.Now() case <-stop: @@ -171,7 +174,7 @@ func (e *EventHandler) run(stop <-chan struct{}) error { // onUpdate processes the event received. onUpdate returns // true if the event changed the cache in a way that requires -// notifying the CacheHandler. +// notifying the Observer. func (e *EventHandler) onUpdate(op interface{}) bool { switch op := op.(type) { case opAdd: @@ -206,16 +209,20 @@ func (e *EventHandler) incSequence() { } } -// updateDAG builds a new DAG and sends it to the CacheHandler -// the updates the status on objects and updates the metrics. -func (e *EventHandler) updateDAG() { - dag := e.Builder.Build() - e.CacheHandler.OnChange(dag) +// rebuildDAG builds a new DAG and sends it to the Observer, +// the updates the status on objects, and updates the metrics. +func (e *EventHandler) rebuildDAG() { + latestDAG := e.Builder.Build() + e.Metrics.SetDAGLastRebuilt(time.Now()) + + timer := prometheus.NewTimer(e.Metrics.CacheHandlerOnUpdateSummary) + e.Observer.OnChange(latestDAG) + timer.ObserveDuration() select { case <-e.IsLeader: // we're the leader, update status and metrics - statuses := dag.Statuses() + statuses := latestDAG.Statuses() e.setStatus(statuses) metrics := calculateRouteMetric(statuses) diff --git a/internal/contour/listener.go b/internal/contour/listener.go index 7c56e2ddc96..6b618d7ca49 100644 --- a/internal/contour/listener.go +++ b/internal/contour/listener.go @@ -26,6 +26,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/envoy" + "github.com/projectcontour/contour/internal/grpc" "github.com/projectcontour/contour/internal/protobuf" "github.com/projectcontour/contour/internal/sorter" "github.com/projectcontour/contour/internal/timeout" @@ -44,8 +45,8 @@ const ( DEFAULT_ACCESS_LOG_TYPE = "envoy" ) -// ListenerVisitorConfig holds configuration parameters for visitListeners. -type ListenerVisitorConfig struct { +// ListenerConfig holds configuration parameters for Listener. +type ListenerConfig struct { // Envoy's HTTP (non TLS) listener address. // If not set, defaults to DEFAULT_HTTP_LISTENER_ADDRESS. HTTPAddress string @@ -115,7 +116,7 @@ type ListenerVisitorConfig struct { // httpAddress returns the port for the HTTP (non TLS) // listener or DEFAULT_HTTP_LISTENER_ADDRESS if not configured. -func (lvc *ListenerVisitorConfig) httpAddress() string { +func (lvc *ListenerConfig) httpAddress() string { if lvc.HTTPAddress != "" { return lvc.HTTPAddress } @@ -124,7 +125,7 @@ func (lvc *ListenerVisitorConfig) httpAddress() string { // httpPort returns the port for the HTTP (non TLS) // listener or DEFAULT_HTTP_LISTENER_PORT if not configured. -func (lvc *ListenerVisitorConfig) httpPort() int { +func (lvc *ListenerConfig) httpPort() int { if lvc.HTTPPort != 0 { return lvc.HTTPPort } @@ -133,7 +134,7 @@ func (lvc *ListenerVisitorConfig) httpPort() int { // httpAccessLog returns the access log for the HTTP (non TLS) // listener or DEFAULT_HTTP_ACCESS_LOG if not configured. -func (lvc *ListenerVisitorConfig) httpAccessLog() string { +func (lvc *ListenerConfig) httpAccessLog() string { if lvc.HTTPAccessLog != "" { return lvc.HTTPAccessLog } @@ -142,7 +143,7 @@ func (lvc *ListenerVisitorConfig) httpAccessLog() string { // httpsAddress returns the port for the HTTPS (TLS) // listener or DEFAULT_HTTPS_LISTENER_ADDRESS if not configured. -func (lvc *ListenerVisitorConfig) httpsAddress() string { +func (lvc *ListenerConfig) httpsAddress() string { if lvc.HTTPSAddress != "" { return lvc.HTTPSAddress } @@ -151,7 +152,7 @@ func (lvc *ListenerVisitorConfig) httpsAddress() string { // httpsPort returns the port for the HTTPS (TLS) listener // or DEFAULT_HTTPS_LISTENER_PORT if not configured. -func (lvc *ListenerVisitorConfig) httpsPort() int { +func (lvc *ListenerConfig) httpsPort() int { if lvc.HTTPSPort != 0 { return lvc.HTTPSPort } @@ -160,7 +161,7 @@ func (lvc *ListenerVisitorConfig) httpsPort() int { // httpsAccessLog returns the access log for the HTTPS (TLS) // listener or DEFAULT_HTTPS_ACCESS_LOG if not configured. -func (lvc *ListenerVisitorConfig) httpsAccessLog() string { +func (lvc *ListenerConfig) httpsAccessLog() string { if lvc.HTTPSAccessLog != "" { return lvc.HTTPSAccessLog } @@ -169,7 +170,7 @@ func (lvc *ListenerVisitorConfig) httpsAccessLog() string { // accesslogType returns the access log type that should be configured // across all listener types or DEFAULT_ACCESS_LOG_TYPE if not configured. -func (lvc *ListenerVisitorConfig) accesslogType() string { +func (lvc *ListenerConfig) accesslogType() string { if lvc.AccessLogType != "" { return lvc.AccessLogType } @@ -178,14 +179,14 @@ func (lvc *ListenerVisitorConfig) accesslogType() string { // accesslogFields returns the access log fields that should be configured // for Envoy, or a default set if not configured. -func (lvc *ListenerVisitorConfig) accesslogFields() []string { +func (lvc *ListenerConfig) accesslogFields() []string { if lvc.AccessLogFields != nil { return lvc.AccessLogFields } return envoy.DefaultFields } -func (lvc *ListenerVisitorConfig) newInsecureAccessLog() []*envoy_api_v2_accesslog.AccessLog { +func (lvc *ListenerConfig) newInsecureAccessLog() []*envoy_api_v2_accesslog.AccessLog { switch lvc.accesslogType() { case "json": return envoy.FileAccessLogJSON(lvc.httpAccessLog(), lvc.accesslogFields()) @@ -194,7 +195,7 @@ func (lvc *ListenerVisitorConfig) newInsecureAccessLog() []*envoy_api_v2_accessl } } -func (lvc *ListenerVisitorConfig) newSecureAccessLog() []*envoy_api_v2_accesslog.AccessLog { +func (lvc *ListenerConfig) newSecureAccessLog() []*envoy_api_v2_accesslog.AccessLog { switch lvc.accesslogType() { case "json": return envoy.FileAccessLogJSON(lvc.httpsAccessLog(), lvc.accesslogFields()) @@ -205,7 +206,7 @@ func (lvc *ListenerVisitorConfig) newSecureAccessLog() []*envoy_api_v2_accesslog // minTLSVersion returns the requested minimum TLS protocol // version or envoy_api_v2_auth.TlsParameters_TLSv1_1 if not configured. -func (lvc *ListenerVisitorConfig) minTLSVersion() envoy_api_v2_auth.TlsParameters_TlsProtocol { +func (lvc *ListenerConfig) minTLSVersion() envoy_api_v2_auth.TlsParameters_TlsProtocol { if lvc.MinimumTLSVersion > envoy_api_v2_auth.TlsParameters_TLSv1_1 { return lvc.MinimumTLSVersion } @@ -217,13 +218,19 @@ type ListenerCache struct { mu sync.Mutex values map[string]*v2.Listener staticValues map[string]*v2.Listener + + Config ListenerConfig Cond } +var _ dag.Observer = &ListenerCache{} +var _ grpc.Resource = &ListenerCache{} + // NewListenerCache returns an instance of a ListenerCache -func NewListenerCache(address string, port int) ListenerCache { +func NewListenerCache(config ListenerConfig, address string, port int) *ListenerCache { stats := envoy.StatsListener(address, port) - return ListenerCache{ + return &ListenerCache{ + Config: config, staticValues: map[string]*v2.Listener{ stats.Name: stats, }, @@ -281,16 +288,21 @@ func (c *ListenerCache) Query(names []string) []proto.Message { func (*ListenerCache) TypeURL() string { return resource.ListenerType } +func (l *ListenerCache) OnChange(root *dag.DAG) { + listeners := visitListeners(root, &l.Config) + l.Update(listeners) +} + type listenerVisitor struct { - *ListenerVisitorConfig + *ListenerConfig listeners map[string]*v2.Listener http bool // at least one dag.VirtualHost encountered } -func visitListeners(root dag.Vertex, lvc *ListenerVisitorConfig) map[string]*v2.Listener { +func visitListeners(root dag.Vertex, lvc *ListenerConfig) map[string]*v2.Listener { lv := listenerVisitor{ - ListenerVisitorConfig: lvc, + ListenerConfig: lvc, listeners: map[string]*v2.Listener{ ENVOY_HTTPS_LISTENER: envoy.Listener( ENVOY_HTTPS_LISTENER, @@ -385,12 +397,12 @@ func (v *listenerVisitor) visit(vertex dag.Vertex) { DefaultFilters(). RouteConfigName(path.Join("https", vh.VirtualHost.Name)). MetricsPrefix(ENVOY_HTTPS_LISTENER). - AccessLoggers(v.ListenerVisitorConfig.newSecureAccessLog()). - RequestTimeout(v.ListenerVisitorConfig.RequestTimeout). - ConnectionIdleTimeout(v.ListenerVisitorConfig.ConnectionIdleTimeout). - StreamIdleTimeout(v.ListenerVisitorConfig.StreamIdleTimeout). - MaxConnectionDuration(v.ListenerVisitorConfig.MaxConnectionDuration). - ConnectionShutdownGracePeriod(v.ListenerVisitorConfig.ConnectionShutdownGracePeriod). + AccessLoggers(v.ListenerConfig.newSecureAccessLog()). + RequestTimeout(v.ListenerConfig.RequestTimeout). + ConnectionIdleTimeout(v.ListenerConfig.ConnectionIdleTimeout). + StreamIdleTimeout(v.ListenerConfig.StreamIdleTimeout). + MaxConnectionDuration(v.ListenerConfig.MaxConnectionDuration). + ConnectionShutdownGracePeriod(v.ListenerConfig.ConnectionShutdownGracePeriod). Get(), ) @@ -399,7 +411,7 @@ func (v *listenerVisitor) visit(vertex dag.Vertex) { filters = envoy.Filters( envoy.TCPProxy(ENVOY_HTTPS_LISTENER, vh.TCPProxy, - v.ListenerVisitorConfig.newSecureAccessLog()), + v.ListenerConfig.newSecureAccessLog()), ) // Do not offer ALPN for TCP proxying, since @@ -412,7 +424,7 @@ func (v *listenerVisitor) visit(vertex dag.Vertex) { // Secret is provided when TLS is terminated and nil when TLS passthrough is used. if vh.Secret != nil { // Choose the higher of the configured or requested TLS version. - vers := max(v.ListenerVisitorConfig.minTLSVersion(), vh.MinTLSVersion) + vers := max(v.ListenerConfig.minTLSVersion(), vh.MinTLSVersion) downstreamTLS = envoy.DownstreamTLSContext( vh.Secret, @@ -434,7 +446,7 @@ func (v *listenerVisitor) visit(vertex dag.Vertex) { // the value defined in the Contour Configuration file if defined. downstreamTLS = envoy.DownstreamTLSContext( vh.FallbackCertificate, - v.ListenerVisitorConfig.minTLSVersion(), + v.ListenerConfig.minTLSVersion(), vh.DownstreamValidation, alpnProtos...) @@ -444,12 +456,12 @@ func (v *listenerVisitor) visit(vertex dag.Vertex) { DefaultFilters(). RouteConfigName(ENVOY_FALLBACK_ROUTECONFIG). MetricsPrefix(ENVOY_HTTPS_LISTENER). - AccessLoggers(v.ListenerVisitorConfig.newSecureAccessLog()). - RequestTimeout(v.ListenerVisitorConfig.RequestTimeout). - ConnectionIdleTimeout(v.ListenerVisitorConfig.ConnectionIdleTimeout). - StreamIdleTimeout(v.ListenerVisitorConfig.StreamIdleTimeout). - MaxConnectionDuration(v.ListenerVisitorConfig.MaxConnectionDuration). - ConnectionShutdownGracePeriod(v.ListenerVisitorConfig.ConnectionShutdownGracePeriod). + AccessLoggers(v.ListenerConfig.newSecureAccessLog()). + RequestTimeout(v.ListenerConfig.RequestTimeout). + ConnectionIdleTimeout(v.ListenerConfig.ConnectionIdleTimeout). + StreamIdleTimeout(v.ListenerConfig.StreamIdleTimeout). + MaxConnectionDuration(v.ListenerConfig.MaxConnectionDuration). + ConnectionShutdownGracePeriod(v.ListenerConfig.ConnectionShutdownGracePeriod). Get(), ) diff --git a/internal/contour/listener_test.go b/internal/contour/listener_test.go index 3e7fd5aaca7..eeaa2abde02 100644 --- a/internal/contour/listener_test.go +++ b/internal/contour/listener_test.go @@ -152,7 +152,7 @@ func TestListenerVisit(t *testing.T) { Get() tests := map[string]struct { - ListenerVisitorConfig + ListenerConfig fallbackCertificate *types.NamespacedName objs []interface{} want map[string]*v2.Listener @@ -599,7 +599,7 @@ func TestListenerVisit(t *testing.T) { }), }, "http listener on non default port": { // issue 72 - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ HTTPAddress: "127.0.0.100", HTTPPort: 9100, HTTPSAddress: "127.0.0.200", @@ -672,7 +672,7 @@ func TestListenerVisit(t *testing.T) { }), }, "use proxy proto": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ UseProxyProto: true, }, objs: []interface{}{ @@ -746,7 +746,7 @@ func TestListenerVisit(t *testing.T) { }), }, "--envoy-http-access-log": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ HTTPAccessLog: "/tmp/http_access.log", HTTPSAccessLog: "/tmp/https_access.log", }, @@ -823,7 +823,7 @@ func TestListenerVisit(t *testing.T) { }), }, "tls-min-protocol-version from config": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ MinimumTLSVersion: envoy_api_v2_auth.TlsParameters_TLSv1_3, }, objs: []interface{}{ @@ -893,7 +893,7 @@ func TestListenerVisit(t *testing.T) { }), }, "tls-min-protocol-version from config overridden by annotation": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ MinimumTLSVersion: envoy_api_v2_auth.TlsParameters_TLSv1_3, }, objs: []interface{}{ @@ -966,7 +966,7 @@ func TestListenerVisit(t *testing.T) { }), }, "tls-min-protocol-version from config overridden by legacy annotation": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ MinimumTLSVersion: envoy_api_v2_auth.TlsParameters_TLSv1_3, }, objs: []interface{}{ @@ -1039,7 +1039,7 @@ func TestListenerVisit(t *testing.T) { }), }, "tls-min-protocol-version from config overridden by httpproxy": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ MinimumTLSVersion: envoy_api_v2_auth.TlsParameters_TLSv1_3, }, objs: []interface{}{ @@ -1449,7 +1449,7 @@ func TestListenerVisit(t *testing.T) { }), }, "httpproxy with connection idle timeout set in visitor config": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ ConnectionIdleTimeout: timeout.DurationSetting(90 * time.Second), }, objs: []interface{}{ @@ -1503,7 +1503,7 @@ func TestListenerVisit(t *testing.T) { }), }, "httpproxy with stream idle timeout set in visitor config": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ StreamIdleTimeout: timeout.DurationSetting(90 * time.Second), }, objs: []interface{}{ @@ -1557,7 +1557,7 @@ func TestListenerVisit(t *testing.T) { }), }, "httpproxy with max connection duration set in visitor config": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ MaxConnectionDuration: timeout.DurationSetting(90 * time.Second), }, objs: []interface{}{ @@ -1611,7 +1611,7 @@ func TestListenerVisit(t *testing.T) { }), }, "httpproxy with connection shutdown grace period set in visitor config": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ ConnectionShutdownGracePeriod: timeout.DurationSetting(90 * time.Second), }, objs: []interface{}{ @@ -1665,7 +1665,7 @@ func TestListenerVisit(t *testing.T) { }), }, "httpsproxy with secret with connection idle timeout set in visitor config": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ ConnectionIdleTimeout: timeout.DurationSetting(90 * time.Second), }, objs: []interface{}{ @@ -1747,7 +1747,7 @@ func TestListenerVisit(t *testing.T) { }), }, "httpsproxy with secret with stream idle timeout set in visitor config": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ StreamIdleTimeout: timeout.DurationSetting(90 * time.Second), }, objs: []interface{}{ @@ -1829,7 +1829,7 @@ func TestListenerVisit(t *testing.T) { }), }, "httpsproxy with secret with max connection duration set in visitor config": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ MaxConnectionDuration: timeout.DurationSetting(90 * time.Second), }, objs: []interface{}{ @@ -1911,7 +1911,7 @@ func TestListenerVisit(t *testing.T) { }), }, "httpsproxy with secret with connection shutdown grace period set in visitor config": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ ConnectionShutdownGracePeriod: timeout.DurationSetting(90 * time.Second), }, objs: []interface{}{ @@ -1997,7 +1997,7 @@ func TestListenerVisit(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { root := buildDAGFallback(t, tc.fallbackCertificate, tc.objs...) - got := visitListeners(root, &tc.ListenerVisitorConfig) + got := visitListeners(root, &tc.ListenerConfig) assert.Equal(t, tc.want, got) }) } diff --git a/internal/contour/route.go b/internal/contour/route.go index ad5bd5d48f5..f373834a1b1 100644 --- a/internal/contour/route.go +++ b/internal/contour/route.go @@ -24,6 +24,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/envoy" + "github.com/projectcontour/contour/internal/grpc" "github.com/projectcontour/contour/internal/protobuf" "github.com/projectcontour/contour/internal/sorter" ) @@ -35,6 +36,9 @@ type RouteCache struct { Cond } +var _ dag.Observer = &RouteCache{} +var _ grpc.Resource = &RouteCache{} + // Update replaces the contents of the cache with the supplied map. func (c *RouteCache) Update(v map[string]*v2.RouteConfiguration) { c.mu.Lock() @@ -87,6 +91,11 @@ func (c *RouteCache) Query(names []string) []proto.Message { // TypeURL returns the string type of RouteCache Resource. func (*RouteCache) TypeURL() string { return resource.RouteType } +func (r *RouteCache) OnChange(root *dag.DAG) { + routes := visitRoutes(root) + r.Update(routes) +} + type routeVisitor struct { routes map[string]*v2.RouteConfiguration } diff --git a/internal/contour/secret.go b/internal/contour/secret.go index b3ef96f92d8..1a50199de0b 100644 --- a/internal/contour/secret.go +++ b/internal/contour/secret.go @@ -22,6 +22,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/envoy" + "github.com/projectcontour/contour/internal/grpc" "github.com/projectcontour/contour/internal/protobuf" "github.com/projectcontour/contour/internal/sorter" ) @@ -33,6 +34,9 @@ type SecretCache struct { Cond } +var _ dag.Observer = &SecretCache{} +var _ grpc.Resource = &SecretCache{} + // Update replaces the contents of the cache with the supplied map. func (c *SecretCache) Update(v map[string]*envoy_api_v2_auth.Secret) { c.mu.Lock() @@ -72,6 +76,11 @@ func (c *SecretCache) Query(names []string) []proto.Message { func (*SecretCache) TypeURL() string { return resource.SecretType } +func (s *SecretCache) OnChange(root *dag.DAG) { + secrets := visitSecrets(root) + s.Update(secrets) +} + type secretVisitor struct { secrets map[string]*envoy_api_v2_auth.Secret } diff --git a/internal/grpc/server_test.go b/internal/contour/server_test.go similarity index 92% rename from internal/grpc/server_test.go rename to internal/contour/server_test.go index 784e7a3814f..dc7f1db569d 100644 --- a/internal/grpc/server_test.go +++ b/internal/contour/server_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package grpc +package contour import ( "context" @@ -23,7 +23,8 @@ import ( v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2" - "github.com/projectcontour/contour/internal/contour" + "github.com/projectcontour/contour/internal/dag" + cgrpc "github.com/projectcontour/contour/internal/grpc" "github.com/projectcontour/contour/internal/metrics" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" @@ -38,8 +39,8 @@ import ( func TestGRPC(t *testing.T) { // tr and et is recreated before the start of each test. - var et *contour.EndpointsTranslator - var eh *contour.EventHandler + var et *EndpointsTranslator + var eh *EventHandler tests := map[string]func(*testing.T, *grpc.ClientConn){ "StreamClusters": func(t *testing.T, cc *grpc.ClientConn) { @@ -188,25 +189,25 @@ func TestGRPC(t *testing.T) { log.SetOutput(ioutil.Discard) for name, fn := range tests { t.Run(name, func(t *testing.T) { - et = &contour.EndpointsTranslator{ + et = &EndpointsTranslator{ FieldLogger: log, } - ch := contour.CacheHandler{ - Metrics: metrics.NewMetrics(prometheus.NewRegistry()), + + resources := []ResourceCache{ + NewListenerCache(ListenerConfig{}, "", 0), + &SecretCache{}, + &RouteCache{}, + &ClusterCache{}, } - eh = &contour.EventHandler{ - CacheHandler: &ch, - FieldLogger: log, + eh = &EventHandler{ + Observer: dag.ComposeObservers(ObserversOf(resources)...), + FieldLogger: log, + Metrics: metrics.NewMetrics(prometheus.NewRegistry()), } + r := prometheus.NewRegistry() - srv := NewAPI(log, map[string]Resource{ - ch.ClusterCache.TypeURL(): &ch.ClusterCache, - ch.RouteCache.TypeURL(): &ch.RouteCache, - ch.ListenerCache.TypeURL(): &ch.ListenerCache, - ch.SecretCache.TypeURL(): &ch.SecretCache, - et.TypeURL(): et, - }, r) + srv := cgrpc.NewAPI(log, append(ResourcesOf(resources), et), r) l, err := net.Listen("tcp", "127.0.0.1:0") check(t, err) done := make(chan error, 1) diff --git a/internal/contour/visitor_test.go b/internal/contour/visitor_test.go index 69a2b949d19..0ca30bacfca 100644 --- a/internal/contour/visitor_test.go +++ b/internal/contour/visitor_test.go @@ -30,7 +30,7 @@ import ( func TestVisitClusters(t *testing.T) { tests := map[string]struct { - root dag.Visitable + root dag.Vertex want map[string]*envoy_api_v2.Cluster }{ "TCPService forward": { @@ -96,7 +96,7 @@ func TestVisitListeners(t *testing.T) { } tests := map[string]struct { - root dag.Visitable + root dag.Vertex want map[string]*envoy_api_v2.Listener }{ "TCPService forward": { @@ -143,7 +143,7 @@ func TestVisitListeners(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { - got := visitListeners(tc.root, new(ListenerVisitorConfig)) + got := visitListeners(tc.root, new(ListenerConfig)) assert.Equal(t, tc.want, got) }) } @@ -151,7 +151,7 @@ func TestVisitListeners(t *testing.T) { func TestVisitSecrets(t *testing.T) { tests := map[string]struct { - root dag.Visitable + root dag.Vertex want map[string]*envoy_api_v2_auth.Secret }{ "TCPService forward": { diff --git a/internal/dag/dag.go b/internal/dag/dag.go index 0d63dde12df..f6a7b1091f9 100644 --- a/internal/dag/dag.go +++ b/internal/dag/dag.go @@ -27,7 +27,33 @@ import ( "k8s.io/apimachinery/pkg/types" ) -// A DAG represents a directed acylic graph of objects representing the relationship +// Vertex is a node in the DAG that can be visited. +type Vertex interface { + Visit(func(Vertex)) +} + +// Observer is an interface for receiving notification of DAG updates. +type Observer interface { + OnChange(*DAG) +} + +type ComposingObserver struct { + Observers []Observer +} + +func (c ComposingObserver) OnChange(d *DAG) { + for _, o := range c.Observers { + o.OnChange(d) + } +} + +var _ Observer = &ComposingObserver{} + +func ComposeObservers(o ...Observer) Observer { + return ComposingObserver{Observers: o} +} + +// A DAG represents a directed acyclic graph of objects representing the relationship // between Kubernetes Ingress objects, the backend Services, and Secret objects. // The DAG models these relationships as Roots and Vertices. type DAG struct { @@ -304,14 +330,6 @@ func (s *SecureVirtualHost) Valid() bool { return (s.Secret != nil && len(s.routes) > 0) || s.TCPProxy != nil } -type Visitable interface { - Visit(func(Vertex)) -} - -type Vertex interface { - Visitable -} - // A Listener represents a TCP socket that accepts // incoming connections. type Listener struct { diff --git a/internal/e2e/e2e.go b/internal/e2e/e2e.go index 9e255da249b..e4ccbb9e3eb 100644 --- a/internal/e2e/e2e.go +++ b/internal/e2e/e2e.go @@ -65,7 +65,7 @@ func (d *discardWriter) Write(buf []byte) (int, error) { return len(buf), nil } -func setup(t *testing.T, opts ...func(*contour.EventHandler)) (cache.ResourceEventHandler, *grpc.ClientConn, func()) { +func setup(t *testing.T, opts ...interface{}) (cache.ResourceEventHandler, *grpc.ClientConn, func()) { t.Parallel() log := logrus.New() @@ -75,23 +75,33 @@ func setup(t *testing.T, opts ...func(*contour.EventHandler)) (cache.ResourceEve FieldLogger: log, } - r := prometheus.NewRegistry() - ch := &contour.CacheHandler{ - Metrics: metrics.NewMetrics(r), - ListenerCache: contour.NewListenerCache(statsAddress, statsPort), - FieldLogger: log, + conf := contour.ListenerConfig{} + for _, opt := range opts { + if opt, ok := opt.(func(*contour.ListenerConfig)); ok { + opt(&conf) + } + } + + resources := []contour.ResourceCache{ + contour.NewListenerCache(conf, statsAddress, statsPort), + &contour.SecretCache{}, + &contour.RouteCache{}, + &contour.ClusterCache{}, } + r := prometheus.NewRegistry() + rand.Seed(time.Now().Unix()) eh := &contour.EventHandler{ + Observer: dag.ComposeObservers(contour.ObserversOf(resources)...), Builder: dag.Builder{ Source: dag.KubernetesCache{ FieldLogger: log, }, }, - CacheHandler: ch, StatusClient: &k8s.StatusCacher{}, + Metrics: metrics.NewMetrics(r), FieldLogger: log, Sequence: make(chan int, 1), HoldoffDelay: time.Duration(rand.Intn(100)) * time.Millisecond, @@ -99,7 +109,9 @@ func setup(t *testing.T, opts ...func(*contour.EventHandler)) (cache.ResourceEve } for _, opt := range opts { - opt(eh) + if opt, ok := opt.(func(*contour.EventHandler)); ok { + opt(eh) + } } l, err := net.Listen("tcp", "127.0.0.1:0") @@ -107,13 +119,7 @@ func setup(t *testing.T, opts ...func(*contour.EventHandler)) (cache.ResourceEve discard := logrus.New() discard.Out = new(discardWriter) // Resource types in xDS v2. - srv := cgrpc.NewAPI(discard, map[string]cgrpc.Resource{ - ch.ClusterCache.TypeURL(): &ch.ClusterCache, - ch.RouteCache.TypeURL(): &ch.RouteCache, - ch.ListenerCache.TypeURL(): &ch.ListenerCache, - ch.SecretCache.TypeURL(): &ch.SecretCache, - et.TypeURL(): et, - }, r) + srv := cgrpc.NewAPI(discard, append(contour.ResourcesOf(resources), et), r) var g workgroup.Group diff --git a/internal/e2e/lds_test.go b/internal/e2e/lds_test.go index f4ea1364c9b..61d432e818a 100644 --- a/internal/e2e/lds_test.go +++ b/internal/e2e/lds_test.go @@ -611,8 +611,8 @@ func TestLDSStreamEmpty(t *testing.T) { } func TestLDSIngressHTTPUseProxyProtocol(t *testing.T) { - rh, cc, done := setup(t, func(reh *contour.EventHandler) { - reh.CacheHandler.UseProxyProto = true + rh, cc, done := setup(t, func(conf *contour.ListenerConfig) { + conf.UseProxyProto = true }) defer done() @@ -674,8 +674,8 @@ func TestLDSIngressHTTPUseProxyProtocol(t *testing.T) { } func TestLDSIngressHTTPSUseProxyProtocol(t *testing.T) { - rh, cc, done := setup(t, func(reh *contour.EventHandler) { - reh.CacheHandler.UseProxyProto = true + rh, cc, done := setup(t, func(conf *contour.ListenerConfig) { + conf.UseProxyProto = true }) defer done() @@ -777,11 +777,11 @@ func TestLDSIngressHTTPSUseProxyProtocol(t *testing.T) { } func TestLDSCustomAddressAndPort(t *testing.T) { - rh, cc, done := setup(t, func(reh *contour.EventHandler) { - reh.CacheHandler.HTTPAddress = "127.0.0.100" - reh.CacheHandler.HTTPPort = 9100 - reh.CacheHandler.HTTPSAddress = "127.0.0.200" - reh.CacheHandler.HTTPSPort = 9200 + rh, cc, done := setup(t, func(conf *contour.ListenerConfig) { + conf.HTTPAddress = "127.0.0.100" + conf.HTTPPort = 9100 + conf.HTTPSAddress = "127.0.0.200" + conf.HTTPSPort = 9200 }) defer done() @@ -882,9 +882,9 @@ func TestLDSCustomAddressAndPort(t *testing.T) { } func TestLDSCustomAccessLogPaths(t *testing.T) { - rh, cc, done := setup(t, func(reh *contour.EventHandler) { - reh.CacheHandler.HTTPAccessLog = "/tmp/http_access.log" - reh.CacheHandler.HTTPSAccessLog = "/tmp/https_access.log" + rh, cc, done := setup(t, func(conf *contour.ListenerConfig) { + conf.HTTPAccessLog = "/tmp/http_access.log" + conf.HTTPSAccessLog = "/tmp/https_access.log" }) defer done() @@ -1093,8 +1093,8 @@ func TestHTTPProxyHTTPS(t *testing.T) { } func TestHTTPProxyMinimumTLSVersion(t *testing.T) { - rh, cc, done := setup(t, func(reh *contour.EventHandler) { - reh.CacheHandler.MinimumTLSVersion = envoy_api_v2_auth.TlsParameters_TLSv1_2 + rh, cc, done := setup(t, func(conf *contour.ListenerConfig) { + conf.MinimumTLSVersion = envoy_api_v2_auth.TlsParameters_TLSv1_2 }) defer done() diff --git a/internal/e2e/rds_test.go b/internal/e2e/rds_test.go index 79f6d780585..38dc049e640 100644 --- a/internal/e2e/rds_test.go +++ b/internal/e2e/rds_test.go @@ -1599,7 +1599,7 @@ func TestRouteWithTLS_InsecurePaths(t *testing.T) { func TestRouteWithTLS_InsecurePaths_DisablePermitInsecureTrue(t *testing.T) { rh, cc, done := setup(t, func(reh *contour.EventHandler) { - reh.DisablePermitInsecure = true + reh.Builder.DisablePermitInsecure = true }) defer done() @@ -2133,7 +2133,7 @@ func TestHTTPProxyRouteWithTLS_InsecurePaths(t *testing.T) { func TestHTTPProxyRouteWithTLS_InsecurePaths_DisablePermitInsecureTrue(t *testing.T) { rh, cc, done := setup(t, func(reh *contour.EventHandler) { - reh.DisablePermitInsecure = true + reh.Builder.DisablePermitInsecure = true }) defer done() diff --git a/internal/featuretests/fallbackcert_test.go b/internal/featuretests/fallbackcert_test.go index 01719b332ad..c8b9db40189 100644 --- a/internal/featuretests/fallbackcert_test.go +++ b/internal/featuretests/fallbackcert_test.go @@ -20,14 +20,21 @@ import ( envoy_api_v2_auth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth" envoy_api_v2_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" projcontour "github.com/projectcontour/contour/apis/projectcontour/v1" + "github.com/projectcontour/contour/internal/contour" "github.com/projectcontour/contour/internal/envoy" "github.com/projectcontour/contour/internal/fixture" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) func TestFallbackCertificate(t *testing.T) { - rh, c, done := setupWithFallbackCert(t, "fallbacksecret", "admin") + rh, c, done := setup(t, func(eh *contour.EventHandler) { + eh.Builder.FallbackCertificate = &types.NamespacedName{ + Name: "fallbacksecret", + Namespace: "admin", + } + }) defer done() sec1 := &v1.Secret{ diff --git a/internal/featuretests/featuretests.go b/internal/featuretests/featuretests.go index 02dc997c2d4..1e99a935e11 100644 --- a/internal/featuretests/featuretests.go +++ b/internal/featuretests/featuretests.go @@ -41,7 +41,6 @@ import ( "github.com/sirupsen/logrus" "google.golang.org/grpc" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" ) @@ -61,11 +60,7 @@ func (d *discardWriter) Write(buf []byte) (int, error) { return len(buf), nil } -func setup(t *testing.T, opts ...func(*contour.EventHandler)) (cache.ResourceEventHandler, *Contour, func()) { - return setupWithFallbackCert(t, "", "", opts...) -} - -func setupWithFallbackCert(t *testing.T, fallbackCertName, fallbackCertNamespace string, opts ...func(*contour.EventHandler)) (cache.ResourceEventHandler, *Contour, func()) { +func setup(t *testing.T, opts ...interface{}) (cache.ResourceEventHandler, *Contour, func()) { t.Parallel() log := logrus.New() @@ -75,38 +70,45 @@ func setupWithFallbackCert(t *testing.T, fallbackCertName, fallbackCertNamespace FieldLogger: log, } - r := prometheus.NewRegistry() - ch := &contour.CacheHandler{ - Metrics: metrics.NewMetrics(r), - ListenerCache: contour.NewListenerCache(statsAddress, statsPort), - FieldLogger: log, + conf := contour.ListenerConfig{} + for _, opt := range opts { + if opt, ok := opt.(func(*contour.ListenerConfig)); ok { + opt(&conf) + } + } + + resources := []contour.ResourceCache{ + contour.NewListenerCache(conf, statsAddress, statsPort), + &contour.SecretCache{}, + &contour.RouteCache{}, + &contour.ClusterCache{}, } + r := prometheus.NewRegistry() + rand.Seed(time.Now().Unix()) statusCache := &k8s.StatusCacher{} eh := &contour.EventHandler{ IsLeader: make(chan struct{}), - CacheHandler: ch, StatusClient: statusCache, FieldLogger: log, + Metrics: metrics.NewMetrics(r), Sequence: make(chan int, 1), HoldoffDelay: time.Duration(rand.Intn(100)) * time.Millisecond, HoldoffMaxDelay: time.Duration(rand.Intn(500)) * time.Millisecond, + Observer: dag.ComposeObservers(contour.ObserversOf(resources)...), Builder: dag.Builder{ Source: dag.KubernetesCache{ FieldLogger: log, }, - FallbackCertificate: &types.NamespacedName{ - Name: fallbackCertName, - Namespace: fallbackCertNamespace, - }, }, } - for _, opt := range opts { - opt(eh) + if opt, ok := opt.(func(*contour.EventHandler)); ok { + opt(eh) + } } // Make this event handler win the leader election. @@ -117,13 +119,7 @@ func setupWithFallbackCert(t *testing.T, fallbackCertName, fallbackCertNamespace discard := logrus.New() discard.Out = new(discardWriter) // Resource types in xDS v2. - srv := cgrpc.NewAPI(discard, map[string]cgrpc.Resource{ - ch.ClusterCache.TypeURL(): &ch.ClusterCache, - ch.RouteCache.TypeURL(): &ch.RouteCache, - ch.ListenerCache.TypeURL(): &ch.ListenerCache, - ch.SecretCache.TypeURL(): &ch.SecretCache, - et.TypeURL(): et, - }, r) + srv := cgrpc.NewAPI(discard, append(contour.ResourcesOf(resources), et), r) var g workgroup.Group diff --git a/internal/featuretests/timeouts_test.go b/internal/featuretests/timeouts_test.go index 74bb0ad086a..69d575692eb 100644 --- a/internal/featuretests/timeouts_test.go +++ b/internal/featuretests/timeouts_test.go @@ -27,7 +27,7 @@ import ( ) func TestTimeoutsNotSpecified(t *testing.T) { - // the contour.EventHandler.ListenerVisitorConfig has no timeout values specified + // the contour.EventHandler.ListenerConfig has no timeout values specified rh, c, done := setup(t) defer done() @@ -85,11 +85,11 @@ func TestTimeoutsNotSpecified(t *testing.T) { } func TestNonZeroTimeoutsSpecified(t *testing.T) { - withTimeouts := func(eh *contour.EventHandler) { - eh.ListenerVisitorConfig.ConnectionIdleTimeout = timeout.DurationSetting(7 * time.Second) - eh.ListenerVisitorConfig.StreamIdleTimeout = timeout.DurationSetting(70 * time.Second) - eh.ListenerVisitorConfig.MaxConnectionDuration = timeout.DurationSetting(700 * time.Second) - eh.ListenerVisitorConfig.ConnectionShutdownGracePeriod = timeout.DurationSetting(7000 * time.Second) + withTimeouts := func(conf *contour.ListenerConfig) { + conf.ConnectionIdleTimeout = timeout.DurationSetting(7 * time.Second) + conf.StreamIdleTimeout = timeout.DurationSetting(70 * time.Second) + conf.MaxConnectionDuration = timeout.DurationSetting(700 * time.Second) + conf.ConnectionShutdownGracePeriod = timeout.DurationSetting(7000 * time.Second) } rh, c, done := setup(t, withTimeouts) diff --git a/internal/grpc/server.go b/internal/grpc/server.go index 4b068585655..754588215c0 100644 --- a/internal/grpc/server.go +++ b/internal/grpc/server.go @@ -31,11 +31,17 @@ import ( ) // NewAPI returns a *grpc.Server which responds to the Envoy v2 xDS gRPC API. -func NewAPI(log logrus.FieldLogger, resources map[string]Resource, registry *prometheus.Registry, opts ...grpc.ServerOption) *grpc.Server { +func NewAPI(log logrus.FieldLogger, resources []Resource, registry *prometheus.Registry, opts ...grpc.ServerOption) *grpc.Server { + resourceMap := map[string]Resource{} + + for i, r := range resources { + resourceMap[r.TypeURL()] = resources[i] + } + s := &grpcServer{ xdsHandler{ FieldLogger: log, - resources: resources, + resources: resourceMap, }, grpc_prometheus.NewServerMetrics(), }