Skip to content

Commit

Permalink
internal/contour: decouple the xDS cache handlers
Browse files Browse the repository at this point in the history
The `contour.CacheHandler` struct contains the xDS resource caches as
embedded members. If we extract a DAG observer interface, however, we
can remove knowledge of specific resources from `CacheHandler`.
All it really needs is a set of objects that can vist the DAG when it
changes to generate any relevant xDS resources, and that can be called
on to publish those into the xDS GRPC interface.

Separating out the new `contour.ResourceCache` interface gives us the
opportunity to add new DAG consumers by only having to modify code
when the Contour server starts up. There's no longer any cache handler
plumbing needed.

Once we can compose the xDS caches under the `Observer` interface, the
need for a separate cache handler struct goes away, and we can attach
everything we need directly to the event handler.

This updates projectcontour#2775.

Signed-off-by: James Peach <jpeach@vmware.com>
  • Loading branch information
jpeach committed Aug 7, 2020
1 parent f4e352f commit 13c9454
Show file tree
Hide file tree
Showing 19 changed files with 305 additions and 245 deletions.
74 changes: 34 additions & 40 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,12 @@ func registerServe(app *kingpin.Application) (*kingpin.CmdClause, *serveContext)

// doServe runs the contour serve subcommand.
func doServe(log logrus.FieldLogger, ctx *serveContext) error {

// step 1. establish k8s core & dynamic client connections
// Establish k8s core & dynamic client connections.
clients, err := k8s.NewClients(ctx.Kubeconfig, ctx.InCluster)
if err != nil {
return fmt.Errorf("failed to create Kubernetes clients: %w", err)
}

// step 2. create informer factories

// Factory for cluster-wide informers.
clusterInformerFactory := clients.NewInformerFactory()

Expand All @@ -162,7 +159,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
}
}

// setup prometheus registry and register base metrics.
// Set up Prometheus registry and register base metrics.
registry := prometheus.NewRegistry()
registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
registry.MustRegister(prometheus.NewGoCollector())
Expand All @@ -175,7 +172,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
return err
}

listenerConfig := contour.ListenerVisitorConfig{
listenerConfig := contour.ListenerConfig{
UseProxyProto: ctx.useProxyProto,
HTTPAddress: ctx.httpAddr,
HTTPPort: ctx.httpPort,
Expand All @@ -200,16 +197,25 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {

listenerConfig.DefaultHTTPVersions = defaultHTTPVersions

// step 3. build our mammoth Kubernetes event handler.
// Endpoints updates are handled directly by the EndpointsTranslator
// due to their high update rate and their orthogonal nature.
endpointHandler := &contour.EndpointsTranslator{
FieldLogger: log.WithField("context", "endpointstranslator"),
}

resources := []contour.ResourceCache{
contour.NewListenerCache(listenerConfig, ctx.statsAddr, ctx.statsPort),
&contour.SecretCache{},
&contour.RouteCache{},
&contour.ClusterCache{},
endpointHandler,
}

// Build the core Kubernetes event handler.
eventHandler := &contour.EventHandler{
CacheHandler: &contour.CacheHandler{
ListenerVisitorConfig: listenerConfig,
ListenerCache: contour.NewListenerCache(ctx.statsAddr, ctx.statsPort),
FieldLogger: log.WithField("context", "CacheHandler"),
Metrics: metrics.NewMetrics(registry),
},
HoldoffDelay: 100 * time.Millisecond,
HoldoffMaxDelay: 500 * time.Millisecond,
Observer: dag.ComposeObservers(contour.ObserversOf(resources)...),
Builder: dag.Builder{
Source: dag.KubernetesCache{
RootNamespaces: ctx.proxyRootNamespaces(),
Expand All @@ -219,15 +225,16 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
DisablePermitInsecure: ctx.DisablePermitInsecure,
},
FieldLogger: log.WithField("context", "contourEventHandler"),
Metrics: metrics.NewMetrics(registry),
}

// Set the fallback certificate if configured.
if fallbackCert != nil {
log.WithField("context", "fallback-certificate").Infof("enabled fallback certificate with secret: %q", fallbackCert)
eventHandler.FallbackCertificate = fallbackCert
eventHandler.Builder.FallbackCertificate = fallbackCert
}

// wrap eventHandler in a converter for objects from the dynamic client.
// Wrap eventHandler in a converter for objects from the dynamic client.
// and an EventRecorder which tracks API server events.
dynamicHandler := &k8s.DynamicClientHandler{
Next: &contour.EventRecorder{
Expand All @@ -238,7 +245,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
Logger: log.WithField("context", "dynamicHandler"),
}

// step 4. register our resource event handler with the k8s informers,
// Register our resource event handler with the k8s informers,
// using the SyncList to keep track of what to sync later.
var informerSyncList k8s.InformerSyncList

Expand Down Expand Up @@ -270,34 +277,28 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
informerSyncList.InformOnResources(clusterInformerFactory, dynamicHandler, k8s.SecretsResources()...)
}

// step 5. endpoints updates are handled directly by the EndpointsTranslator
// due to their high update rate and their orthogonal nature.
et := &contour.EndpointsTranslator{
FieldLogger: log.WithField("context", "endpointstranslator"),
}

informerSyncList.InformOnResources(clusterInformerFactory,
&k8s.DynamicClientHandler{
Next: &contour.EventRecorder{
Next: et,
Next: endpointHandler,
Counter: eventHandler.Metrics.EventHandlerOperations,
},
Converter: converter,
Logger: log.WithField("context", "endpointstranslator"),
}, k8s.EndpointsResources()...)

// step 6. setup workgroup runner and register informers.
// Set up workgroup runner and register informers.
var g workgroup.Group
g.Add(startInformer(clusterInformerFactory, log.WithField("context", "contourinformers")))

for ns, factory := range namespacedInformerFactories {
g.Add(startInformer(factory, log.WithField("context", "corenamespacedinformers").WithField("namespace", ns)))
}

// step 7. register our event handler with the workgroup
// Register our event handler with the workgroup.
g.Add(eventHandler.Start())

// step 8. create metrics service and register with workgroup.
// Create metrics service and register with workgroup.
metricsvc := httpsvc.Service{
Addr: ctx.metricsAddr,
Port: ctx.metricsPort,
Expand All @@ -315,7 +316,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {

g.Add(metricsvc.Start)

// step 9. create a separate health service if required.
// Create a separate health service if required.
if ctx.healthAddr != ctx.metricsAddr || ctx.healthPort != ctx.metricsPort {
healthsvc := httpsvc.Service{
Addr: ctx.healthAddr,
Expand All @@ -330,7 +331,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
g.Add(healthsvc.Start)
}

// step 10. create debug service and register with workgroup.
// Create debug service and register with workgroup.
debugsvc := debug.Service{
Service: httpsvc.Service{
Addr: ctx.debugAddr,
Expand All @@ -341,7 +342,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
}
g.Add(debugsvc.Start)

// step 11. register leadership election.
// Register leadership election.
eventHandler.IsLeader = setupLeadershipElection(&g, log, ctx, clients, eventHandler.UpdateNow)

sh := k8s.StatusUpdateHandler{
Expand All @@ -358,7 +359,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
Updater: sh.Writer(),
}

// step 11. set up ingress load balancer status writer
// Set up ingress load balancer status writer.
lbsw := loadBalancerStatusWriter{
log: log.WithField("context", "loadBalancerStatusWriter"),
clients: clients,
Expand All @@ -370,7 +371,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
}
g.Add(lbsw.Start)

// step 12. register an informer to watch envoy's service if we haven't been given static details.
// Register an informer to watch envoy's service if we haven't been given static details.
if ctx.IngressStatusAddress == "" {
dynamicServiceHandler := &k8s.DynamicClientHandler{
Next: &k8s.ServiceStatusLoadBalancerWatcher{
Expand Down Expand Up @@ -402,15 +403,8 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
}
log.Printf("informer caches synced")

resources := map[string]cgrpc.Resource{
eventHandler.CacheHandler.ClusterCache.TypeURL(): &eventHandler.CacheHandler.ClusterCache,
eventHandler.CacheHandler.RouteCache.TypeURL(): &eventHandler.CacheHandler.RouteCache,
eventHandler.CacheHandler.ListenerCache.TypeURL(): &eventHandler.CacheHandler.ListenerCache,
eventHandler.CacheHandler.SecretCache.TypeURL(): &eventHandler.CacheHandler.SecretCache,
et.TypeURL(): et,
}
opts := ctx.grpcOptions()
s := cgrpc.NewAPI(log, resources, registry, opts...)
s := cgrpc.NewAPI(log, contour.ResourcesOf(resources), registry, opts...)
addr := net.JoinHostPort(ctx.xdsAddr, strconv.Itoa(ctx.xdsPort))
l, err := net.Listen("tcp", addr)
if err != nil {
Expand All @@ -433,7 +427,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
return s.Serve(l)
})

// step 14. Setup SIGTERM handler
// Set up SIGTERM handler for graceful shutdown.
g.Add(func(stop <-chan struct{}) error {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
Expand Down
65 changes: 21 additions & 44 deletions internal/contour/cachehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,55 +17,32 @@
package contour

import (
"time"

"github.com/projectcontour/contour/internal/dag"
"github.com/projectcontour/contour/internal/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/projectcontour/contour/internal/grpc"
)

// CacheHandler manages the state of xDS caches.
type CacheHandler struct {
ListenerVisitorConfig
ListenerCache
RouteCache
ClusterCache
SecretCache

*metrics.Metrics

logrus.FieldLogger
}

func (ch *CacheHandler) OnChange(dag *dag.DAG) {
timer := prometheus.NewTimer(ch.CacheHandlerOnUpdateSummary)
defer timer.ObserveDuration()

ch.updateSecrets(dag)
ch.updateListeners(dag)
ch.updateRoutes(dag)
ch.updateClusters(dag)

ch.SetDAGLastRebuilt(time.Now())
}

func (ch *CacheHandler) updateSecrets(root dag.Visitable) {
secrets := visitSecrets(root)
ch.SecretCache.Update(secrets)
}

func (ch *CacheHandler) updateListeners(root dag.Visitable) {
listeners := visitListeners(root, &ch.ListenerVisitorConfig)
ch.ListenerCache.Update(listeners)
// ResourceCache is a store of an xDS resource type. It is able to
// visit the dag.DAG to update the its resource collection, then
// serve those resources over xDS.
type ResourceCache interface {
dag.Observer
grpc.Resource
}

func (ch *CacheHandler) updateRoutes(root dag.Visitable) {
routes := visitRoutes(root)
ch.RouteCache.Update(routes)
// ResourcesOf transliterates a slice of ResourceCache into a slice of grpc.Resource.
func ResourcesOf(in []ResourceCache) []grpc.Resource {
out := make([]grpc.Resource, len(in))
for i := range in {
out[i] = in[i]
}
return out
}

func (ch *CacheHandler) updateClusters(root dag.Visitable) {
clusters := visitClusters(root)
ch.ClusterCache.Update(clusters)
// ObserversOf transliterates a slice of ResourceCache into a slice of dag.Observer.
func ObserversOf(in []ResourceCache) []dag.Observer {
out := make([]dag.Observer, len(in))
for i := range in {
out[i] = in[i]
}
return out
}
9 changes: 9 additions & 0 deletions internal/contour/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sync"

resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2"
"github.com/projectcontour/contour/internal/grpc"

v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/proto"
Expand All @@ -34,6 +35,9 @@ type ClusterCache struct {
Cond
}

var _ dag.Observer = &ClusterCache{}
var _ grpc.Resource = &ClusterCache{}

// Update replaces the contents of the cache with the supplied map.
func (c *ClusterCache) Update(v map[string]*v2.Cluster) {
c.mu.Lock()
Expand Down Expand Up @@ -75,6 +79,11 @@ func (c *ClusterCache) Query(names []string) []proto.Message {

func (*ClusterCache) TypeURL() string { return resource.ClusterType }

func (c *ClusterCache) OnChange(root *dag.DAG) {
clusters := visitClusters(root)
c.Update(clusters)
}

type clusterVisitor struct {
clusters map[string]*v2.Cluster
}
Expand Down
9 changes: 9 additions & 0 deletions internal/contour/endpointstranslator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
envoy_api_v2_endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2"
"github.com/golang/protobuf/proto"
"github.com/projectcontour/contour/internal/dag"
"github.com/projectcontour/contour/internal/envoy"
"github.com/projectcontour/contour/internal/k8s"
"github.com/projectcontour/contour/internal/protobuf"
Expand All @@ -40,6 +41,14 @@ type EndpointsTranslator struct {
entries map[string]*v2.ClusterLoadAssignment
}

var _ k8scache.ResourceEventHandler = &EndpointsTranslator{}
var _ dag.Observer = &EndpointsTranslator{}

func (e *EndpointsTranslator) OnChange(d *dag.DAG) {
// TODO(jpeach) Update the internal model to map which
// services are targets of which cluster load assignments.
}

func (e *EndpointsTranslator) OnAdd(obj interface{}) {
switch obj := obj.(type) {
case *v1.Endpoints:
Expand Down
Loading

0 comments on commit 13c9454

Please sign in to comment.