From 13c945432076826629d017cef538e51ff9b25b71 Mon Sep 17 00:00:00 2001 From: James Peach Date: Fri, 31 Jul 2020 17:36:04 +1000 Subject: [PATCH] internal/contour: decouple the xDS cache handlers The `contour.CacheHandler` struct contains the xDS resource caches as embedded members. If we extract a DAG observer interface, however, we can remove knowledge of specific resources from `CacheHandler`. All it really needs is a set of objects that can vist the DAG when it changes to generate any relevant xDS resources, and that can be called on to publish those into the xDS GRPC interface. Separating out the new `contour.ResourceCache` interface gives us the opportunity to add new DAG consumers by only having to modify code when the Contour server starts up. There's no longer any cache handler plumbing needed. Once we can compose the xDS caches under the `Observer` interface, the need for a separate cache handler struct goes away, and we can attach everything we need directly to the event handler. This updates #2775. Signed-off-by: James Peach --- cmd/contour/serve.go | 74 ++++++++++---------- internal/contour/cachehandler.go | 65 ++++++------------ internal/contour/cluster.go | 9 +++ internal/contour/endpointstranslator.go | 9 +++ internal/contour/handler.go | 39 ++++++----- internal/contour/listener.go | 78 +++++++++++++--------- internal/contour/listener_test.go | 34 +++++----- internal/contour/route.go | 9 +++ internal/contour/secret.go | 9 +++ internal/{grpc => contour}/server_test.go | 35 +++++----- internal/contour/visitor_test.go | 8 +-- internal/dag/dag.go | 36 +++++++--- internal/e2e/e2e.go | 36 +++++----- internal/e2e/lds_test.go | 28 ++++---- internal/e2e/rds_test.go | 4 +- internal/featuretests/fallbackcert_test.go | 9 ++- internal/featuretests/featuretests.go | 46 ++++++------- internal/featuretests/timeouts_test.go | 12 ++-- internal/grpc/server.go | 10 ++- 19 files changed, 305 insertions(+), 245 deletions(-) rename internal/{grpc => contour}/server_test.go (92%) diff --git a/cmd/contour/serve.go b/cmd/contour/serve.go index dd5343d7a92..21d6ee02281 100644 --- a/cmd/contour/serve.go +++ b/cmd/contour/serve.go @@ -127,15 +127,12 @@ func registerServe(app *kingpin.Application) (*kingpin.CmdClause, *serveContext) // doServe runs the contour serve subcommand. func doServe(log logrus.FieldLogger, ctx *serveContext) error { - - // step 1. establish k8s core & dynamic client connections + // Establish k8s core & dynamic client connections. clients, err := k8s.NewClients(ctx.Kubeconfig, ctx.InCluster) if err != nil { return fmt.Errorf("failed to create Kubernetes clients: %w", err) } - // step 2. create informer factories - // Factory for cluster-wide informers. clusterInformerFactory := clients.NewInformerFactory() @@ -162,7 +159,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { } } - // setup prometheus registry and register base metrics. + // Set up Prometheus registry and register base metrics. registry := prometheus.NewRegistry() registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) registry.MustRegister(prometheus.NewGoCollector()) @@ -175,7 +172,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { return err } - listenerConfig := contour.ListenerVisitorConfig{ + listenerConfig := contour.ListenerConfig{ UseProxyProto: ctx.useProxyProto, HTTPAddress: ctx.httpAddr, HTTPPort: ctx.httpPort, @@ -200,16 +197,25 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { listenerConfig.DefaultHTTPVersions = defaultHTTPVersions - // step 3. build our mammoth Kubernetes event handler. + // Endpoints updates are handled directly by the EndpointsTranslator + // due to their high update rate and their orthogonal nature. + endpointHandler := &contour.EndpointsTranslator{ + FieldLogger: log.WithField("context", "endpointstranslator"), + } + + resources := []contour.ResourceCache{ + contour.NewListenerCache(listenerConfig, ctx.statsAddr, ctx.statsPort), + &contour.SecretCache{}, + &contour.RouteCache{}, + &contour.ClusterCache{}, + endpointHandler, + } + + // Build the core Kubernetes event handler. eventHandler := &contour.EventHandler{ - CacheHandler: &contour.CacheHandler{ - ListenerVisitorConfig: listenerConfig, - ListenerCache: contour.NewListenerCache(ctx.statsAddr, ctx.statsPort), - FieldLogger: log.WithField("context", "CacheHandler"), - Metrics: metrics.NewMetrics(registry), - }, HoldoffDelay: 100 * time.Millisecond, HoldoffMaxDelay: 500 * time.Millisecond, + Observer: dag.ComposeObservers(contour.ObserversOf(resources)...), Builder: dag.Builder{ Source: dag.KubernetesCache{ RootNamespaces: ctx.proxyRootNamespaces(), @@ -219,15 +225,16 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { DisablePermitInsecure: ctx.DisablePermitInsecure, }, FieldLogger: log.WithField("context", "contourEventHandler"), + Metrics: metrics.NewMetrics(registry), } // Set the fallback certificate if configured. if fallbackCert != nil { log.WithField("context", "fallback-certificate").Infof("enabled fallback certificate with secret: %q", fallbackCert) - eventHandler.FallbackCertificate = fallbackCert + eventHandler.Builder.FallbackCertificate = fallbackCert } - // wrap eventHandler in a converter for objects from the dynamic client. + // Wrap eventHandler in a converter for objects from the dynamic client. // and an EventRecorder which tracks API server events. dynamicHandler := &k8s.DynamicClientHandler{ Next: &contour.EventRecorder{ @@ -238,7 +245,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { Logger: log.WithField("context", "dynamicHandler"), } - // step 4. register our resource event handler with the k8s informers, + // Register our resource event handler with the k8s informers, // using the SyncList to keep track of what to sync later. var informerSyncList k8s.InformerSyncList @@ -270,23 +277,17 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { informerSyncList.InformOnResources(clusterInformerFactory, dynamicHandler, k8s.SecretsResources()...) } - // step 5. endpoints updates are handled directly by the EndpointsTranslator - // due to their high update rate and their orthogonal nature. - et := &contour.EndpointsTranslator{ - FieldLogger: log.WithField("context", "endpointstranslator"), - } - informerSyncList.InformOnResources(clusterInformerFactory, &k8s.DynamicClientHandler{ Next: &contour.EventRecorder{ - Next: et, + Next: endpointHandler, Counter: eventHandler.Metrics.EventHandlerOperations, }, Converter: converter, Logger: log.WithField("context", "endpointstranslator"), }, k8s.EndpointsResources()...) - // step 6. setup workgroup runner and register informers. + // Set up workgroup runner and register informers. var g workgroup.Group g.Add(startInformer(clusterInformerFactory, log.WithField("context", "contourinformers"))) @@ -294,10 +295,10 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { g.Add(startInformer(factory, log.WithField("context", "corenamespacedinformers").WithField("namespace", ns))) } - // step 7. register our event handler with the workgroup + // Register our event handler with the workgroup. g.Add(eventHandler.Start()) - // step 8. create metrics service and register with workgroup. + // Create metrics service and register with workgroup. metricsvc := httpsvc.Service{ Addr: ctx.metricsAddr, Port: ctx.metricsPort, @@ -315,7 +316,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { g.Add(metricsvc.Start) - // step 9. create a separate health service if required. + // Create a separate health service if required. if ctx.healthAddr != ctx.metricsAddr || ctx.healthPort != ctx.metricsPort { healthsvc := httpsvc.Service{ Addr: ctx.healthAddr, @@ -330,7 +331,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { g.Add(healthsvc.Start) } - // step 10. create debug service and register with workgroup. + // Create debug service and register with workgroup. debugsvc := debug.Service{ Service: httpsvc.Service{ Addr: ctx.debugAddr, @@ -341,7 +342,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { } g.Add(debugsvc.Start) - // step 11. register leadership election. + // Register leadership election. eventHandler.IsLeader = setupLeadershipElection(&g, log, ctx, clients, eventHandler.UpdateNow) sh := k8s.StatusUpdateHandler{ @@ -358,7 +359,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { Updater: sh.Writer(), } - // step 11. set up ingress load balancer status writer + // Set up ingress load balancer status writer. lbsw := loadBalancerStatusWriter{ log: log.WithField("context", "loadBalancerStatusWriter"), clients: clients, @@ -370,7 +371,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { } g.Add(lbsw.Start) - // step 12. register an informer to watch envoy's service if we haven't been given static details. + // Register an informer to watch envoy's service if we haven't been given static details. if ctx.IngressStatusAddress == "" { dynamicServiceHandler := &k8s.DynamicClientHandler{ Next: &k8s.ServiceStatusLoadBalancerWatcher{ @@ -402,15 +403,8 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { } log.Printf("informer caches synced") - resources := map[string]cgrpc.Resource{ - eventHandler.CacheHandler.ClusterCache.TypeURL(): &eventHandler.CacheHandler.ClusterCache, - eventHandler.CacheHandler.RouteCache.TypeURL(): &eventHandler.CacheHandler.RouteCache, - eventHandler.CacheHandler.ListenerCache.TypeURL(): &eventHandler.CacheHandler.ListenerCache, - eventHandler.CacheHandler.SecretCache.TypeURL(): &eventHandler.CacheHandler.SecretCache, - et.TypeURL(): et, - } opts := ctx.grpcOptions() - s := cgrpc.NewAPI(log, resources, registry, opts...) + s := cgrpc.NewAPI(log, contour.ResourcesOf(resources), registry, opts...) addr := net.JoinHostPort(ctx.xdsAddr, strconv.Itoa(ctx.xdsPort)) l, err := net.Listen("tcp", addr) if err != nil { @@ -433,7 +427,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { return s.Serve(l) }) - // step 14. Setup SIGTERM handler + // Set up SIGTERM handler for graceful shutdown. g.Add(func(stop <-chan struct{}) error { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGTERM, syscall.SIGINT) diff --git a/internal/contour/cachehandler.go b/internal/contour/cachehandler.go index b3029d69906..9eae11d73b6 100644 --- a/internal/contour/cachehandler.go +++ b/internal/contour/cachehandler.go @@ -17,55 +17,32 @@ package contour import ( - "time" - "github.com/projectcontour/contour/internal/dag" - "github.com/projectcontour/contour/internal/metrics" - "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" + "github.com/projectcontour/contour/internal/grpc" ) -// CacheHandler manages the state of xDS caches. -type CacheHandler struct { - ListenerVisitorConfig - ListenerCache - RouteCache - ClusterCache - SecretCache - - *metrics.Metrics - - logrus.FieldLogger -} - -func (ch *CacheHandler) OnChange(dag *dag.DAG) { - timer := prometheus.NewTimer(ch.CacheHandlerOnUpdateSummary) - defer timer.ObserveDuration() - - ch.updateSecrets(dag) - ch.updateListeners(dag) - ch.updateRoutes(dag) - ch.updateClusters(dag) - - ch.SetDAGLastRebuilt(time.Now()) -} - -func (ch *CacheHandler) updateSecrets(root dag.Visitable) { - secrets := visitSecrets(root) - ch.SecretCache.Update(secrets) -} - -func (ch *CacheHandler) updateListeners(root dag.Visitable) { - listeners := visitListeners(root, &ch.ListenerVisitorConfig) - ch.ListenerCache.Update(listeners) +// ResourceCache is a store of an xDS resource type. It is able to +// visit the dag.DAG to update the its resource collection, then +// serve those resources over xDS. +type ResourceCache interface { + dag.Observer + grpc.Resource } -func (ch *CacheHandler) updateRoutes(root dag.Visitable) { - routes := visitRoutes(root) - ch.RouteCache.Update(routes) +// ResourcesOf transliterates a slice of ResourceCache into a slice of grpc.Resource. +func ResourcesOf(in []ResourceCache) []grpc.Resource { + out := make([]grpc.Resource, len(in)) + for i := range in { + out[i] = in[i] + } + return out } -func (ch *CacheHandler) updateClusters(root dag.Visitable) { - clusters := visitClusters(root) - ch.ClusterCache.Update(clusters) +// ObserversOf transliterates a slice of ResourceCache into a slice of dag.Observer. +func ObserversOf(in []ResourceCache) []dag.Observer { + out := make([]dag.Observer, len(in)) + for i := range in { + out[i] = in[i] + } + return out } diff --git a/internal/contour/cluster.go b/internal/contour/cluster.go index 5c3c5fd5450..6f46bc4bc13 100644 --- a/internal/contour/cluster.go +++ b/internal/contour/cluster.go @@ -18,6 +18,7 @@ import ( "sync" resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2" + "github.com/projectcontour/contour/internal/grpc" v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" "github.com/golang/protobuf/proto" @@ -34,6 +35,9 @@ type ClusterCache struct { Cond } +var _ dag.Observer = &ClusterCache{} +var _ grpc.Resource = &ClusterCache{} + // Update replaces the contents of the cache with the supplied map. func (c *ClusterCache) Update(v map[string]*v2.Cluster) { c.mu.Lock() @@ -75,6 +79,11 @@ func (c *ClusterCache) Query(names []string) []proto.Message { func (*ClusterCache) TypeURL() string { return resource.ClusterType } +func (c *ClusterCache) OnChange(root *dag.DAG) { + clusters := visitClusters(root) + c.Update(clusters) +} + type clusterVisitor struct { clusters map[string]*v2.Cluster } diff --git a/internal/contour/endpointstranslator.go b/internal/contour/endpointstranslator.go index 2c9629e380b..350d433ed63 100644 --- a/internal/contour/endpointstranslator.go +++ b/internal/contour/endpointstranslator.go @@ -21,6 +21,7 @@ import ( envoy_api_v2_endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2" "github.com/golang/protobuf/proto" + "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/envoy" "github.com/projectcontour/contour/internal/k8s" "github.com/projectcontour/contour/internal/protobuf" @@ -40,6 +41,14 @@ type EndpointsTranslator struct { entries map[string]*v2.ClusterLoadAssignment } +var _ k8scache.ResourceEventHandler = &EndpointsTranslator{} +var _ dag.Observer = &EndpointsTranslator{} + +func (e *EndpointsTranslator) OnChange(d *dag.DAG) { + // TODO(jpeach) Update the internal model to map which + // services are targets of which cluster load assignments. +} + func (e *EndpointsTranslator) OnAdd(obj interface{}) { switch obj := obj.(type) { case *v1.Endpoints: diff --git a/internal/contour/handler.go b/internal/contour/handler.go index b45600e1a97..cbbe9492fb3 100644 --- a/internal/contour/handler.go +++ b/internal/contour/handler.go @@ -24,18 +24,20 @@ import ( projcontour "github.com/projectcontour/contour/apis/projectcontour/v1" "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/k8s" + "github.com/projectcontour/contour/internal/metrics" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ) // EventHandler implements cache.ResourceEventHandler, filters k8s events towards -// a dag.Builder and calls through to the CacheHandler to notify it that a new DAG +// a dag.Builder and calls through to the Observer to notify it that a new DAG // is available. type EventHandler struct { - dag.Builder - - *CacheHandler + Builder dag.Builder + Observer dag.Observer + Metrics *metrics.Metrics HoldoffDelay, HoldoffMaxDelay time.Duration @@ -53,7 +55,8 @@ type EventHandler struct { // Sequence is a channel that receives a incrementing sequence number // for each update processed. The updates may be processed immediately, or // delayed by a holdoff timer. In each case a non blocking send to Sequence - // will be made once CacheHandler.OnUpdate has been called. + // will be made once the resource update is received (note + // that the DAG is not guaranteed to be called each time). Sequence chan int // seq is the sequence counter of the number of times @@ -104,7 +107,7 @@ func (e *EventHandler) run(stop <-chan struct{}) error { var ( // outstanding counts the number of events received but not - // yet send to the CacheHandler. + // yet included in a DAG rebuild. outstanding int // timer holds the timer which will expire after e.HoldoffDelay @@ -113,7 +116,7 @@ func (e *EventHandler) run(stop <-chan struct{}) error { // pending is a reference to the current timer's channel. pending <-chan time.Time - // lastDAGUpdate holds the last time updateDAG was called. + // lastDAGUpdate holds the last time rebuildDAG was called. // lastDAGUpdate is seeded to the current time on entry to // run to allow the holdoff timer to batch the updates from // the API informers. @@ -131,7 +134,7 @@ func (e *EventHandler) run(stop <-chan struct{}) error { // pending may be nil if there are no pending events. // 2. We're processing an event. // 3. The holdoff timer from a previous event has fired and we're - // building a new DAG and sending to the CacheHandler. + // building a new DAG and sending to the Observer. // 4. We're stopping. // // Only one of these things can happen at a time. @@ -159,7 +162,7 @@ func (e *EventHandler) run(stop <-chan struct{}) error { } case <-pending: e.WithField("last_update", time.Since(lastDAGUpdate)).WithField("outstanding", reset()).Info("performing delayed update") - e.updateDAG() + e.rebuildDAG() e.incSequence() lastDAGUpdate = time.Now() case <-stop: @@ -171,7 +174,7 @@ func (e *EventHandler) run(stop <-chan struct{}) error { // onUpdate processes the event received. onUpdate returns // true if the event changed the cache in a way that requires -// notifying the CacheHandler. +// notifying the Observer. func (e *EventHandler) onUpdate(op interface{}) bool { switch op := op.(type) { case opAdd: @@ -206,16 +209,20 @@ func (e *EventHandler) incSequence() { } } -// updateDAG builds a new DAG and sends it to the CacheHandler -// the updates the status on objects and updates the metrics. -func (e *EventHandler) updateDAG() { - dag := e.Builder.Build() - e.CacheHandler.OnChange(dag) +// rebuildDAG builds a new DAG and sends it to the Observer, +// the updates the status on objects, and updates the metrics. +func (e *EventHandler) rebuildDAG() { + latestDAG := e.Builder.Build() + e.Metrics.SetDAGLastRebuilt(time.Now()) + + timer := prometheus.NewTimer(e.Metrics.CacheHandlerOnUpdateSummary) + e.Observer.OnChange(latestDAG) + timer.ObserveDuration() select { case <-e.IsLeader: // we're the leader, update status and metrics - statuses := dag.Statuses() + statuses := latestDAG.Statuses() e.setStatus(statuses) metrics := calculateRouteMetric(statuses) diff --git a/internal/contour/listener.go b/internal/contour/listener.go index 7c56e2ddc96..6b618d7ca49 100644 --- a/internal/contour/listener.go +++ b/internal/contour/listener.go @@ -26,6 +26,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/envoy" + "github.com/projectcontour/contour/internal/grpc" "github.com/projectcontour/contour/internal/protobuf" "github.com/projectcontour/contour/internal/sorter" "github.com/projectcontour/contour/internal/timeout" @@ -44,8 +45,8 @@ const ( DEFAULT_ACCESS_LOG_TYPE = "envoy" ) -// ListenerVisitorConfig holds configuration parameters for visitListeners. -type ListenerVisitorConfig struct { +// ListenerConfig holds configuration parameters for Listener. +type ListenerConfig struct { // Envoy's HTTP (non TLS) listener address. // If not set, defaults to DEFAULT_HTTP_LISTENER_ADDRESS. HTTPAddress string @@ -115,7 +116,7 @@ type ListenerVisitorConfig struct { // httpAddress returns the port for the HTTP (non TLS) // listener or DEFAULT_HTTP_LISTENER_ADDRESS if not configured. -func (lvc *ListenerVisitorConfig) httpAddress() string { +func (lvc *ListenerConfig) httpAddress() string { if lvc.HTTPAddress != "" { return lvc.HTTPAddress } @@ -124,7 +125,7 @@ func (lvc *ListenerVisitorConfig) httpAddress() string { // httpPort returns the port for the HTTP (non TLS) // listener or DEFAULT_HTTP_LISTENER_PORT if not configured. -func (lvc *ListenerVisitorConfig) httpPort() int { +func (lvc *ListenerConfig) httpPort() int { if lvc.HTTPPort != 0 { return lvc.HTTPPort } @@ -133,7 +134,7 @@ func (lvc *ListenerVisitorConfig) httpPort() int { // httpAccessLog returns the access log for the HTTP (non TLS) // listener or DEFAULT_HTTP_ACCESS_LOG if not configured. -func (lvc *ListenerVisitorConfig) httpAccessLog() string { +func (lvc *ListenerConfig) httpAccessLog() string { if lvc.HTTPAccessLog != "" { return lvc.HTTPAccessLog } @@ -142,7 +143,7 @@ func (lvc *ListenerVisitorConfig) httpAccessLog() string { // httpsAddress returns the port for the HTTPS (TLS) // listener or DEFAULT_HTTPS_LISTENER_ADDRESS if not configured. -func (lvc *ListenerVisitorConfig) httpsAddress() string { +func (lvc *ListenerConfig) httpsAddress() string { if lvc.HTTPSAddress != "" { return lvc.HTTPSAddress } @@ -151,7 +152,7 @@ func (lvc *ListenerVisitorConfig) httpsAddress() string { // httpsPort returns the port for the HTTPS (TLS) listener // or DEFAULT_HTTPS_LISTENER_PORT if not configured. -func (lvc *ListenerVisitorConfig) httpsPort() int { +func (lvc *ListenerConfig) httpsPort() int { if lvc.HTTPSPort != 0 { return lvc.HTTPSPort } @@ -160,7 +161,7 @@ func (lvc *ListenerVisitorConfig) httpsPort() int { // httpsAccessLog returns the access log for the HTTPS (TLS) // listener or DEFAULT_HTTPS_ACCESS_LOG if not configured. -func (lvc *ListenerVisitorConfig) httpsAccessLog() string { +func (lvc *ListenerConfig) httpsAccessLog() string { if lvc.HTTPSAccessLog != "" { return lvc.HTTPSAccessLog } @@ -169,7 +170,7 @@ func (lvc *ListenerVisitorConfig) httpsAccessLog() string { // accesslogType returns the access log type that should be configured // across all listener types or DEFAULT_ACCESS_LOG_TYPE if not configured. -func (lvc *ListenerVisitorConfig) accesslogType() string { +func (lvc *ListenerConfig) accesslogType() string { if lvc.AccessLogType != "" { return lvc.AccessLogType } @@ -178,14 +179,14 @@ func (lvc *ListenerVisitorConfig) accesslogType() string { // accesslogFields returns the access log fields that should be configured // for Envoy, or a default set if not configured. -func (lvc *ListenerVisitorConfig) accesslogFields() []string { +func (lvc *ListenerConfig) accesslogFields() []string { if lvc.AccessLogFields != nil { return lvc.AccessLogFields } return envoy.DefaultFields } -func (lvc *ListenerVisitorConfig) newInsecureAccessLog() []*envoy_api_v2_accesslog.AccessLog { +func (lvc *ListenerConfig) newInsecureAccessLog() []*envoy_api_v2_accesslog.AccessLog { switch lvc.accesslogType() { case "json": return envoy.FileAccessLogJSON(lvc.httpAccessLog(), lvc.accesslogFields()) @@ -194,7 +195,7 @@ func (lvc *ListenerVisitorConfig) newInsecureAccessLog() []*envoy_api_v2_accessl } } -func (lvc *ListenerVisitorConfig) newSecureAccessLog() []*envoy_api_v2_accesslog.AccessLog { +func (lvc *ListenerConfig) newSecureAccessLog() []*envoy_api_v2_accesslog.AccessLog { switch lvc.accesslogType() { case "json": return envoy.FileAccessLogJSON(lvc.httpsAccessLog(), lvc.accesslogFields()) @@ -205,7 +206,7 @@ func (lvc *ListenerVisitorConfig) newSecureAccessLog() []*envoy_api_v2_accesslog // minTLSVersion returns the requested minimum TLS protocol // version or envoy_api_v2_auth.TlsParameters_TLSv1_1 if not configured. -func (lvc *ListenerVisitorConfig) minTLSVersion() envoy_api_v2_auth.TlsParameters_TlsProtocol { +func (lvc *ListenerConfig) minTLSVersion() envoy_api_v2_auth.TlsParameters_TlsProtocol { if lvc.MinimumTLSVersion > envoy_api_v2_auth.TlsParameters_TLSv1_1 { return lvc.MinimumTLSVersion } @@ -217,13 +218,19 @@ type ListenerCache struct { mu sync.Mutex values map[string]*v2.Listener staticValues map[string]*v2.Listener + + Config ListenerConfig Cond } +var _ dag.Observer = &ListenerCache{} +var _ grpc.Resource = &ListenerCache{} + // NewListenerCache returns an instance of a ListenerCache -func NewListenerCache(address string, port int) ListenerCache { +func NewListenerCache(config ListenerConfig, address string, port int) *ListenerCache { stats := envoy.StatsListener(address, port) - return ListenerCache{ + return &ListenerCache{ + Config: config, staticValues: map[string]*v2.Listener{ stats.Name: stats, }, @@ -281,16 +288,21 @@ func (c *ListenerCache) Query(names []string) []proto.Message { func (*ListenerCache) TypeURL() string { return resource.ListenerType } +func (l *ListenerCache) OnChange(root *dag.DAG) { + listeners := visitListeners(root, &l.Config) + l.Update(listeners) +} + type listenerVisitor struct { - *ListenerVisitorConfig + *ListenerConfig listeners map[string]*v2.Listener http bool // at least one dag.VirtualHost encountered } -func visitListeners(root dag.Vertex, lvc *ListenerVisitorConfig) map[string]*v2.Listener { +func visitListeners(root dag.Vertex, lvc *ListenerConfig) map[string]*v2.Listener { lv := listenerVisitor{ - ListenerVisitorConfig: lvc, + ListenerConfig: lvc, listeners: map[string]*v2.Listener{ ENVOY_HTTPS_LISTENER: envoy.Listener( ENVOY_HTTPS_LISTENER, @@ -385,12 +397,12 @@ func (v *listenerVisitor) visit(vertex dag.Vertex) { DefaultFilters(). RouteConfigName(path.Join("https", vh.VirtualHost.Name)). MetricsPrefix(ENVOY_HTTPS_LISTENER). - AccessLoggers(v.ListenerVisitorConfig.newSecureAccessLog()). - RequestTimeout(v.ListenerVisitorConfig.RequestTimeout). - ConnectionIdleTimeout(v.ListenerVisitorConfig.ConnectionIdleTimeout). - StreamIdleTimeout(v.ListenerVisitorConfig.StreamIdleTimeout). - MaxConnectionDuration(v.ListenerVisitorConfig.MaxConnectionDuration). - ConnectionShutdownGracePeriod(v.ListenerVisitorConfig.ConnectionShutdownGracePeriod). + AccessLoggers(v.ListenerConfig.newSecureAccessLog()). + RequestTimeout(v.ListenerConfig.RequestTimeout). + ConnectionIdleTimeout(v.ListenerConfig.ConnectionIdleTimeout). + StreamIdleTimeout(v.ListenerConfig.StreamIdleTimeout). + MaxConnectionDuration(v.ListenerConfig.MaxConnectionDuration). + ConnectionShutdownGracePeriod(v.ListenerConfig.ConnectionShutdownGracePeriod). Get(), ) @@ -399,7 +411,7 @@ func (v *listenerVisitor) visit(vertex dag.Vertex) { filters = envoy.Filters( envoy.TCPProxy(ENVOY_HTTPS_LISTENER, vh.TCPProxy, - v.ListenerVisitorConfig.newSecureAccessLog()), + v.ListenerConfig.newSecureAccessLog()), ) // Do not offer ALPN for TCP proxying, since @@ -412,7 +424,7 @@ func (v *listenerVisitor) visit(vertex dag.Vertex) { // Secret is provided when TLS is terminated and nil when TLS passthrough is used. if vh.Secret != nil { // Choose the higher of the configured or requested TLS version. - vers := max(v.ListenerVisitorConfig.minTLSVersion(), vh.MinTLSVersion) + vers := max(v.ListenerConfig.minTLSVersion(), vh.MinTLSVersion) downstreamTLS = envoy.DownstreamTLSContext( vh.Secret, @@ -434,7 +446,7 @@ func (v *listenerVisitor) visit(vertex dag.Vertex) { // the value defined in the Contour Configuration file if defined. downstreamTLS = envoy.DownstreamTLSContext( vh.FallbackCertificate, - v.ListenerVisitorConfig.minTLSVersion(), + v.ListenerConfig.minTLSVersion(), vh.DownstreamValidation, alpnProtos...) @@ -444,12 +456,12 @@ func (v *listenerVisitor) visit(vertex dag.Vertex) { DefaultFilters(). RouteConfigName(ENVOY_FALLBACK_ROUTECONFIG). MetricsPrefix(ENVOY_HTTPS_LISTENER). - AccessLoggers(v.ListenerVisitorConfig.newSecureAccessLog()). - RequestTimeout(v.ListenerVisitorConfig.RequestTimeout). - ConnectionIdleTimeout(v.ListenerVisitorConfig.ConnectionIdleTimeout). - StreamIdleTimeout(v.ListenerVisitorConfig.StreamIdleTimeout). - MaxConnectionDuration(v.ListenerVisitorConfig.MaxConnectionDuration). - ConnectionShutdownGracePeriod(v.ListenerVisitorConfig.ConnectionShutdownGracePeriod). + AccessLoggers(v.ListenerConfig.newSecureAccessLog()). + RequestTimeout(v.ListenerConfig.RequestTimeout). + ConnectionIdleTimeout(v.ListenerConfig.ConnectionIdleTimeout). + StreamIdleTimeout(v.ListenerConfig.StreamIdleTimeout). + MaxConnectionDuration(v.ListenerConfig.MaxConnectionDuration). + ConnectionShutdownGracePeriod(v.ListenerConfig.ConnectionShutdownGracePeriod). Get(), ) diff --git a/internal/contour/listener_test.go b/internal/contour/listener_test.go index 3e7fd5aaca7..eeaa2abde02 100644 --- a/internal/contour/listener_test.go +++ b/internal/contour/listener_test.go @@ -152,7 +152,7 @@ func TestListenerVisit(t *testing.T) { Get() tests := map[string]struct { - ListenerVisitorConfig + ListenerConfig fallbackCertificate *types.NamespacedName objs []interface{} want map[string]*v2.Listener @@ -599,7 +599,7 @@ func TestListenerVisit(t *testing.T) { }), }, "http listener on non default port": { // issue 72 - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ HTTPAddress: "127.0.0.100", HTTPPort: 9100, HTTPSAddress: "127.0.0.200", @@ -672,7 +672,7 @@ func TestListenerVisit(t *testing.T) { }), }, "use proxy proto": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ UseProxyProto: true, }, objs: []interface{}{ @@ -746,7 +746,7 @@ func TestListenerVisit(t *testing.T) { }), }, "--envoy-http-access-log": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ HTTPAccessLog: "/tmp/http_access.log", HTTPSAccessLog: "/tmp/https_access.log", }, @@ -823,7 +823,7 @@ func TestListenerVisit(t *testing.T) { }), }, "tls-min-protocol-version from config": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ MinimumTLSVersion: envoy_api_v2_auth.TlsParameters_TLSv1_3, }, objs: []interface{}{ @@ -893,7 +893,7 @@ func TestListenerVisit(t *testing.T) { }), }, "tls-min-protocol-version from config overridden by annotation": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ MinimumTLSVersion: envoy_api_v2_auth.TlsParameters_TLSv1_3, }, objs: []interface{}{ @@ -966,7 +966,7 @@ func TestListenerVisit(t *testing.T) { }), }, "tls-min-protocol-version from config overridden by legacy annotation": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ MinimumTLSVersion: envoy_api_v2_auth.TlsParameters_TLSv1_3, }, objs: []interface{}{ @@ -1039,7 +1039,7 @@ func TestListenerVisit(t *testing.T) { }), }, "tls-min-protocol-version from config overridden by httpproxy": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ MinimumTLSVersion: envoy_api_v2_auth.TlsParameters_TLSv1_3, }, objs: []interface{}{ @@ -1449,7 +1449,7 @@ func TestListenerVisit(t *testing.T) { }), }, "httpproxy with connection idle timeout set in visitor config": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ ConnectionIdleTimeout: timeout.DurationSetting(90 * time.Second), }, objs: []interface{}{ @@ -1503,7 +1503,7 @@ func TestListenerVisit(t *testing.T) { }), }, "httpproxy with stream idle timeout set in visitor config": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ StreamIdleTimeout: timeout.DurationSetting(90 * time.Second), }, objs: []interface{}{ @@ -1557,7 +1557,7 @@ func TestListenerVisit(t *testing.T) { }), }, "httpproxy with max connection duration set in visitor config": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ MaxConnectionDuration: timeout.DurationSetting(90 * time.Second), }, objs: []interface{}{ @@ -1611,7 +1611,7 @@ func TestListenerVisit(t *testing.T) { }), }, "httpproxy with connection shutdown grace period set in visitor config": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ ConnectionShutdownGracePeriod: timeout.DurationSetting(90 * time.Second), }, objs: []interface{}{ @@ -1665,7 +1665,7 @@ func TestListenerVisit(t *testing.T) { }), }, "httpsproxy with secret with connection idle timeout set in visitor config": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ ConnectionIdleTimeout: timeout.DurationSetting(90 * time.Second), }, objs: []interface{}{ @@ -1747,7 +1747,7 @@ func TestListenerVisit(t *testing.T) { }), }, "httpsproxy with secret with stream idle timeout set in visitor config": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ StreamIdleTimeout: timeout.DurationSetting(90 * time.Second), }, objs: []interface{}{ @@ -1829,7 +1829,7 @@ func TestListenerVisit(t *testing.T) { }), }, "httpsproxy with secret with max connection duration set in visitor config": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ MaxConnectionDuration: timeout.DurationSetting(90 * time.Second), }, objs: []interface{}{ @@ -1911,7 +1911,7 @@ func TestListenerVisit(t *testing.T) { }), }, "httpsproxy with secret with connection shutdown grace period set in visitor config": { - ListenerVisitorConfig: ListenerVisitorConfig{ + ListenerConfig: ListenerConfig{ ConnectionShutdownGracePeriod: timeout.DurationSetting(90 * time.Second), }, objs: []interface{}{ @@ -1997,7 +1997,7 @@ func TestListenerVisit(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { root := buildDAGFallback(t, tc.fallbackCertificate, tc.objs...) - got := visitListeners(root, &tc.ListenerVisitorConfig) + got := visitListeners(root, &tc.ListenerConfig) assert.Equal(t, tc.want, got) }) } diff --git a/internal/contour/route.go b/internal/contour/route.go index ad5bd5d48f5..f373834a1b1 100644 --- a/internal/contour/route.go +++ b/internal/contour/route.go @@ -24,6 +24,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/envoy" + "github.com/projectcontour/contour/internal/grpc" "github.com/projectcontour/contour/internal/protobuf" "github.com/projectcontour/contour/internal/sorter" ) @@ -35,6 +36,9 @@ type RouteCache struct { Cond } +var _ dag.Observer = &RouteCache{} +var _ grpc.Resource = &RouteCache{} + // Update replaces the contents of the cache with the supplied map. func (c *RouteCache) Update(v map[string]*v2.RouteConfiguration) { c.mu.Lock() @@ -87,6 +91,11 @@ func (c *RouteCache) Query(names []string) []proto.Message { // TypeURL returns the string type of RouteCache Resource. func (*RouteCache) TypeURL() string { return resource.RouteType } +func (r *RouteCache) OnChange(root *dag.DAG) { + routes := visitRoutes(root) + r.Update(routes) +} + type routeVisitor struct { routes map[string]*v2.RouteConfiguration } diff --git a/internal/contour/secret.go b/internal/contour/secret.go index b3ef96f92d8..1a50199de0b 100644 --- a/internal/contour/secret.go +++ b/internal/contour/secret.go @@ -22,6 +22,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/envoy" + "github.com/projectcontour/contour/internal/grpc" "github.com/projectcontour/contour/internal/protobuf" "github.com/projectcontour/contour/internal/sorter" ) @@ -33,6 +34,9 @@ type SecretCache struct { Cond } +var _ dag.Observer = &SecretCache{} +var _ grpc.Resource = &SecretCache{} + // Update replaces the contents of the cache with the supplied map. func (c *SecretCache) Update(v map[string]*envoy_api_v2_auth.Secret) { c.mu.Lock() @@ -72,6 +76,11 @@ func (c *SecretCache) Query(names []string) []proto.Message { func (*SecretCache) TypeURL() string { return resource.SecretType } +func (s *SecretCache) OnChange(root *dag.DAG) { + secrets := visitSecrets(root) + s.Update(secrets) +} + type secretVisitor struct { secrets map[string]*envoy_api_v2_auth.Secret } diff --git a/internal/grpc/server_test.go b/internal/contour/server_test.go similarity index 92% rename from internal/grpc/server_test.go rename to internal/contour/server_test.go index 784e7a3814f..dc7f1db569d 100644 --- a/internal/grpc/server_test.go +++ b/internal/contour/server_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package grpc +package contour import ( "context" @@ -23,7 +23,8 @@ import ( v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2" - "github.com/projectcontour/contour/internal/contour" + "github.com/projectcontour/contour/internal/dag" + cgrpc "github.com/projectcontour/contour/internal/grpc" "github.com/projectcontour/contour/internal/metrics" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" @@ -38,8 +39,8 @@ import ( func TestGRPC(t *testing.T) { // tr and et is recreated before the start of each test. - var et *contour.EndpointsTranslator - var eh *contour.EventHandler + var et *EndpointsTranslator + var eh *EventHandler tests := map[string]func(*testing.T, *grpc.ClientConn){ "StreamClusters": func(t *testing.T, cc *grpc.ClientConn) { @@ -188,25 +189,25 @@ func TestGRPC(t *testing.T) { log.SetOutput(ioutil.Discard) for name, fn := range tests { t.Run(name, func(t *testing.T) { - et = &contour.EndpointsTranslator{ + et = &EndpointsTranslator{ FieldLogger: log, } - ch := contour.CacheHandler{ - Metrics: metrics.NewMetrics(prometheus.NewRegistry()), + + resources := []ResourceCache{ + NewListenerCache(ListenerConfig{}, "", 0), + &SecretCache{}, + &RouteCache{}, + &ClusterCache{}, } - eh = &contour.EventHandler{ - CacheHandler: &ch, - FieldLogger: log, + eh = &EventHandler{ + Observer: dag.ComposeObservers(ObserversOf(resources)...), + FieldLogger: log, + Metrics: metrics.NewMetrics(prometheus.NewRegistry()), } + r := prometheus.NewRegistry() - srv := NewAPI(log, map[string]Resource{ - ch.ClusterCache.TypeURL(): &ch.ClusterCache, - ch.RouteCache.TypeURL(): &ch.RouteCache, - ch.ListenerCache.TypeURL(): &ch.ListenerCache, - ch.SecretCache.TypeURL(): &ch.SecretCache, - et.TypeURL(): et, - }, r) + srv := cgrpc.NewAPI(log, append(ResourcesOf(resources), et), r) l, err := net.Listen("tcp", "127.0.0.1:0") check(t, err) done := make(chan error, 1) diff --git a/internal/contour/visitor_test.go b/internal/contour/visitor_test.go index 69a2b949d19..0ca30bacfca 100644 --- a/internal/contour/visitor_test.go +++ b/internal/contour/visitor_test.go @@ -30,7 +30,7 @@ import ( func TestVisitClusters(t *testing.T) { tests := map[string]struct { - root dag.Visitable + root dag.Vertex want map[string]*envoy_api_v2.Cluster }{ "TCPService forward": { @@ -96,7 +96,7 @@ func TestVisitListeners(t *testing.T) { } tests := map[string]struct { - root dag.Visitable + root dag.Vertex want map[string]*envoy_api_v2.Listener }{ "TCPService forward": { @@ -143,7 +143,7 @@ func TestVisitListeners(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { - got := visitListeners(tc.root, new(ListenerVisitorConfig)) + got := visitListeners(tc.root, new(ListenerConfig)) assert.Equal(t, tc.want, got) }) } @@ -151,7 +151,7 @@ func TestVisitListeners(t *testing.T) { func TestVisitSecrets(t *testing.T) { tests := map[string]struct { - root dag.Visitable + root dag.Vertex want map[string]*envoy_api_v2_auth.Secret }{ "TCPService forward": { diff --git a/internal/dag/dag.go b/internal/dag/dag.go index 0d63dde12df..f6a7b1091f9 100644 --- a/internal/dag/dag.go +++ b/internal/dag/dag.go @@ -27,7 +27,33 @@ import ( "k8s.io/apimachinery/pkg/types" ) -// A DAG represents a directed acylic graph of objects representing the relationship +// Vertex is a node in the DAG that can be visited. +type Vertex interface { + Visit(func(Vertex)) +} + +// Observer is an interface for receiving notification of DAG updates. +type Observer interface { + OnChange(*DAG) +} + +type ComposingObserver struct { + Observers []Observer +} + +func (c ComposingObserver) OnChange(d *DAG) { + for _, o := range c.Observers { + o.OnChange(d) + } +} + +var _ Observer = &ComposingObserver{} + +func ComposeObservers(o ...Observer) Observer { + return ComposingObserver{Observers: o} +} + +// A DAG represents a directed acyclic graph of objects representing the relationship // between Kubernetes Ingress objects, the backend Services, and Secret objects. // The DAG models these relationships as Roots and Vertices. type DAG struct { @@ -304,14 +330,6 @@ func (s *SecureVirtualHost) Valid() bool { return (s.Secret != nil && len(s.routes) > 0) || s.TCPProxy != nil } -type Visitable interface { - Visit(func(Vertex)) -} - -type Vertex interface { - Visitable -} - // A Listener represents a TCP socket that accepts // incoming connections. type Listener struct { diff --git a/internal/e2e/e2e.go b/internal/e2e/e2e.go index 9e255da249b..e4ccbb9e3eb 100644 --- a/internal/e2e/e2e.go +++ b/internal/e2e/e2e.go @@ -65,7 +65,7 @@ func (d *discardWriter) Write(buf []byte) (int, error) { return len(buf), nil } -func setup(t *testing.T, opts ...func(*contour.EventHandler)) (cache.ResourceEventHandler, *grpc.ClientConn, func()) { +func setup(t *testing.T, opts ...interface{}) (cache.ResourceEventHandler, *grpc.ClientConn, func()) { t.Parallel() log := logrus.New() @@ -75,23 +75,33 @@ func setup(t *testing.T, opts ...func(*contour.EventHandler)) (cache.ResourceEve FieldLogger: log, } - r := prometheus.NewRegistry() - ch := &contour.CacheHandler{ - Metrics: metrics.NewMetrics(r), - ListenerCache: contour.NewListenerCache(statsAddress, statsPort), - FieldLogger: log, + conf := contour.ListenerConfig{} + for _, opt := range opts { + if opt, ok := opt.(func(*contour.ListenerConfig)); ok { + opt(&conf) + } + } + + resources := []contour.ResourceCache{ + contour.NewListenerCache(conf, statsAddress, statsPort), + &contour.SecretCache{}, + &contour.RouteCache{}, + &contour.ClusterCache{}, } + r := prometheus.NewRegistry() + rand.Seed(time.Now().Unix()) eh := &contour.EventHandler{ + Observer: dag.ComposeObservers(contour.ObserversOf(resources)...), Builder: dag.Builder{ Source: dag.KubernetesCache{ FieldLogger: log, }, }, - CacheHandler: ch, StatusClient: &k8s.StatusCacher{}, + Metrics: metrics.NewMetrics(r), FieldLogger: log, Sequence: make(chan int, 1), HoldoffDelay: time.Duration(rand.Intn(100)) * time.Millisecond, @@ -99,7 +109,9 @@ func setup(t *testing.T, opts ...func(*contour.EventHandler)) (cache.ResourceEve } for _, opt := range opts { - opt(eh) + if opt, ok := opt.(func(*contour.EventHandler)); ok { + opt(eh) + } } l, err := net.Listen("tcp", "127.0.0.1:0") @@ -107,13 +119,7 @@ func setup(t *testing.T, opts ...func(*contour.EventHandler)) (cache.ResourceEve discard := logrus.New() discard.Out = new(discardWriter) // Resource types in xDS v2. - srv := cgrpc.NewAPI(discard, map[string]cgrpc.Resource{ - ch.ClusterCache.TypeURL(): &ch.ClusterCache, - ch.RouteCache.TypeURL(): &ch.RouteCache, - ch.ListenerCache.TypeURL(): &ch.ListenerCache, - ch.SecretCache.TypeURL(): &ch.SecretCache, - et.TypeURL(): et, - }, r) + srv := cgrpc.NewAPI(discard, append(contour.ResourcesOf(resources), et), r) var g workgroup.Group diff --git a/internal/e2e/lds_test.go b/internal/e2e/lds_test.go index f4ea1364c9b..61d432e818a 100644 --- a/internal/e2e/lds_test.go +++ b/internal/e2e/lds_test.go @@ -611,8 +611,8 @@ func TestLDSStreamEmpty(t *testing.T) { } func TestLDSIngressHTTPUseProxyProtocol(t *testing.T) { - rh, cc, done := setup(t, func(reh *contour.EventHandler) { - reh.CacheHandler.UseProxyProto = true + rh, cc, done := setup(t, func(conf *contour.ListenerConfig) { + conf.UseProxyProto = true }) defer done() @@ -674,8 +674,8 @@ func TestLDSIngressHTTPUseProxyProtocol(t *testing.T) { } func TestLDSIngressHTTPSUseProxyProtocol(t *testing.T) { - rh, cc, done := setup(t, func(reh *contour.EventHandler) { - reh.CacheHandler.UseProxyProto = true + rh, cc, done := setup(t, func(conf *contour.ListenerConfig) { + conf.UseProxyProto = true }) defer done() @@ -777,11 +777,11 @@ func TestLDSIngressHTTPSUseProxyProtocol(t *testing.T) { } func TestLDSCustomAddressAndPort(t *testing.T) { - rh, cc, done := setup(t, func(reh *contour.EventHandler) { - reh.CacheHandler.HTTPAddress = "127.0.0.100" - reh.CacheHandler.HTTPPort = 9100 - reh.CacheHandler.HTTPSAddress = "127.0.0.200" - reh.CacheHandler.HTTPSPort = 9200 + rh, cc, done := setup(t, func(conf *contour.ListenerConfig) { + conf.HTTPAddress = "127.0.0.100" + conf.HTTPPort = 9100 + conf.HTTPSAddress = "127.0.0.200" + conf.HTTPSPort = 9200 }) defer done() @@ -882,9 +882,9 @@ func TestLDSCustomAddressAndPort(t *testing.T) { } func TestLDSCustomAccessLogPaths(t *testing.T) { - rh, cc, done := setup(t, func(reh *contour.EventHandler) { - reh.CacheHandler.HTTPAccessLog = "/tmp/http_access.log" - reh.CacheHandler.HTTPSAccessLog = "/tmp/https_access.log" + rh, cc, done := setup(t, func(conf *contour.ListenerConfig) { + conf.HTTPAccessLog = "/tmp/http_access.log" + conf.HTTPSAccessLog = "/tmp/https_access.log" }) defer done() @@ -1093,8 +1093,8 @@ func TestHTTPProxyHTTPS(t *testing.T) { } func TestHTTPProxyMinimumTLSVersion(t *testing.T) { - rh, cc, done := setup(t, func(reh *contour.EventHandler) { - reh.CacheHandler.MinimumTLSVersion = envoy_api_v2_auth.TlsParameters_TLSv1_2 + rh, cc, done := setup(t, func(conf *contour.ListenerConfig) { + conf.MinimumTLSVersion = envoy_api_v2_auth.TlsParameters_TLSv1_2 }) defer done() diff --git a/internal/e2e/rds_test.go b/internal/e2e/rds_test.go index 79f6d780585..38dc049e640 100644 --- a/internal/e2e/rds_test.go +++ b/internal/e2e/rds_test.go @@ -1599,7 +1599,7 @@ func TestRouteWithTLS_InsecurePaths(t *testing.T) { func TestRouteWithTLS_InsecurePaths_DisablePermitInsecureTrue(t *testing.T) { rh, cc, done := setup(t, func(reh *contour.EventHandler) { - reh.DisablePermitInsecure = true + reh.Builder.DisablePermitInsecure = true }) defer done() @@ -2133,7 +2133,7 @@ func TestHTTPProxyRouteWithTLS_InsecurePaths(t *testing.T) { func TestHTTPProxyRouteWithTLS_InsecurePaths_DisablePermitInsecureTrue(t *testing.T) { rh, cc, done := setup(t, func(reh *contour.EventHandler) { - reh.DisablePermitInsecure = true + reh.Builder.DisablePermitInsecure = true }) defer done() diff --git a/internal/featuretests/fallbackcert_test.go b/internal/featuretests/fallbackcert_test.go index 01719b332ad..c8b9db40189 100644 --- a/internal/featuretests/fallbackcert_test.go +++ b/internal/featuretests/fallbackcert_test.go @@ -20,14 +20,21 @@ import ( envoy_api_v2_auth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth" envoy_api_v2_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" projcontour "github.com/projectcontour/contour/apis/projectcontour/v1" + "github.com/projectcontour/contour/internal/contour" "github.com/projectcontour/contour/internal/envoy" "github.com/projectcontour/contour/internal/fixture" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) func TestFallbackCertificate(t *testing.T) { - rh, c, done := setupWithFallbackCert(t, "fallbacksecret", "admin") + rh, c, done := setup(t, func(eh *contour.EventHandler) { + eh.Builder.FallbackCertificate = &types.NamespacedName{ + Name: "fallbacksecret", + Namespace: "admin", + } + }) defer done() sec1 := &v1.Secret{ diff --git a/internal/featuretests/featuretests.go b/internal/featuretests/featuretests.go index 02dc997c2d4..1e99a935e11 100644 --- a/internal/featuretests/featuretests.go +++ b/internal/featuretests/featuretests.go @@ -41,7 +41,6 @@ import ( "github.com/sirupsen/logrus" "google.golang.org/grpc" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" ) @@ -61,11 +60,7 @@ func (d *discardWriter) Write(buf []byte) (int, error) { return len(buf), nil } -func setup(t *testing.T, opts ...func(*contour.EventHandler)) (cache.ResourceEventHandler, *Contour, func()) { - return setupWithFallbackCert(t, "", "", opts...) -} - -func setupWithFallbackCert(t *testing.T, fallbackCertName, fallbackCertNamespace string, opts ...func(*contour.EventHandler)) (cache.ResourceEventHandler, *Contour, func()) { +func setup(t *testing.T, opts ...interface{}) (cache.ResourceEventHandler, *Contour, func()) { t.Parallel() log := logrus.New() @@ -75,38 +70,45 @@ func setupWithFallbackCert(t *testing.T, fallbackCertName, fallbackCertNamespace FieldLogger: log, } - r := prometheus.NewRegistry() - ch := &contour.CacheHandler{ - Metrics: metrics.NewMetrics(r), - ListenerCache: contour.NewListenerCache(statsAddress, statsPort), - FieldLogger: log, + conf := contour.ListenerConfig{} + for _, opt := range opts { + if opt, ok := opt.(func(*contour.ListenerConfig)); ok { + opt(&conf) + } + } + + resources := []contour.ResourceCache{ + contour.NewListenerCache(conf, statsAddress, statsPort), + &contour.SecretCache{}, + &contour.RouteCache{}, + &contour.ClusterCache{}, } + r := prometheus.NewRegistry() + rand.Seed(time.Now().Unix()) statusCache := &k8s.StatusCacher{} eh := &contour.EventHandler{ IsLeader: make(chan struct{}), - CacheHandler: ch, StatusClient: statusCache, FieldLogger: log, + Metrics: metrics.NewMetrics(r), Sequence: make(chan int, 1), HoldoffDelay: time.Duration(rand.Intn(100)) * time.Millisecond, HoldoffMaxDelay: time.Duration(rand.Intn(500)) * time.Millisecond, + Observer: dag.ComposeObservers(contour.ObserversOf(resources)...), Builder: dag.Builder{ Source: dag.KubernetesCache{ FieldLogger: log, }, - FallbackCertificate: &types.NamespacedName{ - Name: fallbackCertName, - Namespace: fallbackCertNamespace, - }, }, } - for _, opt := range opts { - opt(eh) + if opt, ok := opt.(func(*contour.EventHandler)); ok { + opt(eh) + } } // Make this event handler win the leader election. @@ -117,13 +119,7 @@ func setupWithFallbackCert(t *testing.T, fallbackCertName, fallbackCertNamespace discard := logrus.New() discard.Out = new(discardWriter) // Resource types in xDS v2. - srv := cgrpc.NewAPI(discard, map[string]cgrpc.Resource{ - ch.ClusterCache.TypeURL(): &ch.ClusterCache, - ch.RouteCache.TypeURL(): &ch.RouteCache, - ch.ListenerCache.TypeURL(): &ch.ListenerCache, - ch.SecretCache.TypeURL(): &ch.SecretCache, - et.TypeURL(): et, - }, r) + srv := cgrpc.NewAPI(discard, append(contour.ResourcesOf(resources), et), r) var g workgroup.Group diff --git a/internal/featuretests/timeouts_test.go b/internal/featuretests/timeouts_test.go index 74bb0ad086a..69d575692eb 100644 --- a/internal/featuretests/timeouts_test.go +++ b/internal/featuretests/timeouts_test.go @@ -27,7 +27,7 @@ import ( ) func TestTimeoutsNotSpecified(t *testing.T) { - // the contour.EventHandler.ListenerVisitorConfig has no timeout values specified + // the contour.EventHandler.ListenerConfig has no timeout values specified rh, c, done := setup(t) defer done() @@ -85,11 +85,11 @@ func TestTimeoutsNotSpecified(t *testing.T) { } func TestNonZeroTimeoutsSpecified(t *testing.T) { - withTimeouts := func(eh *contour.EventHandler) { - eh.ListenerVisitorConfig.ConnectionIdleTimeout = timeout.DurationSetting(7 * time.Second) - eh.ListenerVisitorConfig.StreamIdleTimeout = timeout.DurationSetting(70 * time.Second) - eh.ListenerVisitorConfig.MaxConnectionDuration = timeout.DurationSetting(700 * time.Second) - eh.ListenerVisitorConfig.ConnectionShutdownGracePeriod = timeout.DurationSetting(7000 * time.Second) + withTimeouts := func(conf *contour.ListenerConfig) { + conf.ConnectionIdleTimeout = timeout.DurationSetting(7 * time.Second) + conf.StreamIdleTimeout = timeout.DurationSetting(70 * time.Second) + conf.MaxConnectionDuration = timeout.DurationSetting(700 * time.Second) + conf.ConnectionShutdownGracePeriod = timeout.DurationSetting(7000 * time.Second) } rh, c, done := setup(t, withTimeouts) diff --git a/internal/grpc/server.go b/internal/grpc/server.go index 4b068585655..754588215c0 100644 --- a/internal/grpc/server.go +++ b/internal/grpc/server.go @@ -31,11 +31,17 @@ import ( ) // NewAPI returns a *grpc.Server which responds to the Envoy v2 xDS gRPC API. -func NewAPI(log logrus.FieldLogger, resources map[string]Resource, registry *prometheus.Registry, opts ...grpc.ServerOption) *grpc.Server { +func NewAPI(log logrus.FieldLogger, resources []Resource, registry *prometheus.Registry, opts ...grpc.ServerOption) *grpc.Server { + resourceMap := map[string]Resource{} + + for i, r := range resources { + resourceMap[r.TypeURL()] = resources[i] + } + s := &grpcServer{ xdsHandler{ FieldLogger: log, - resources: resources, + resources: resourceMap, }, grpc_prometheus.NewServerMetrics(), }