Skip to content

Commit

Permalink
internal/contour: decouple the xDS cache handlers (projectcontour#2753)
Browse files Browse the repository at this point in the history
The `contour.CacheHandler` struct contains the xDS resource caches as
embedded members. If we extract a DAG observer interface, however, we
can remove knowledge of specific resources from `CacheHandler`.
All it really needs is a set of objects that can vist the DAG when it
changes to generate any relevant xDS resources, and that can be called
on to publish those into the xDS GRPC interface.

Separating out the new `contour.ResourceCache` interface gives us the
opportunity to add new DAG consumers by only having to modify code
when the Contour server starts up. There's no longer any cache handler
plumbing needed.

Once we can compose the xDS caches under the `Observer` interface, the
need for a separate cache handler struct goes away, and we can attach
everything we need directly to the event handler.

This updates projectcontour#2775.

Signed-off-by: James Peach <jpeach@vmware.com>
  • Loading branch information
jpeach authored Aug 10, 2020
1 parent 7c89eaf commit ef09d9d
Show file tree
Hide file tree
Showing 22 changed files with 369 additions and 286 deletions.
87 changes: 45 additions & 42 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,12 @@ func registerServe(app *kingpin.Application) (*kingpin.CmdClause, *serveContext)

// doServe runs the contour serve subcommand.
func doServe(log logrus.FieldLogger, ctx *serveContext) error {

// step 1. establish k8s core & dynamic client connections
// Establish k8s core & dynamic client connections.
clients, err := k8s.NewClients(ctx.Kubeconfig, ctx.InCluster)
if err != nil {
return fmt.Errorf("failed to create Kubernetes clients: %w", err)
}

// step 2. create informer factories

// Factory for cluster-wide informers.
clusterInformerFactory := clients.NewInformerFactory()

Expand All @@ -166,7 +163,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
}
}

// setup prometheus registry and register base metrics.
// Set up Prometheus registry and register base metrics.
registry := prometheus.NewRegistry()
registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
registry.MustRegister(prometheus.NewGoCollector())
Expand All @@ -179,7 +176,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
return err
}

listenerConfig := contour.ListenerVisitorConfig{
listenerConfig := contour.ListenerConfig{
UseProxyProto: ctx.useProxyProto,
HTTPAddress: ctx.httpAddr,
HTTPPort: ctx.httpPort,
Expand All @@ -204,16 +201,27 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {

listenerConfig.DefaultHTTPVersions = defaultHTTPVersions

// step 3. build our mammoth Kubernetes event handler.
contourMetrics := metrics.NewMetrics(registry)

// Endpoints updates are handled directly by the EndpointsTranslator
// due to their high update rate and their orthogonal nature.
endpointHandler := &contour.EndpointsTranslator{
FieldLogger: log.WithField("context", "endpointstranslator"),
}

resources := []contour.ResourceCache{
contour.NewListenerCache(listenerConfig, ctx.statsAddr, ctx.statsPort),
&contour.SecretCache{},
&contour.RouteCache{},
&contour.ClusterCache{},
endpointHandler,
}

// Build the core Kubernetes event handler.
eventHandler := &contour.EventHandler{
CacheHandler: &contour.CacheHandler{
ListenerVisitorConfig: listenerConfig,
ListenerCache: contour.NewListenerCache(ctx.statsAddr, ctx.statsPort),
FieldLogger: log.WithField("context", "CacheHandler"),
Metrics: metrics.NewMetrics(registry),
},
HoldoffDelay: 100 * time.Millisecond,
HoldoffMaxDelay: 500 * time.Millisecond,
Observer: dag.ComposeObservers(contour.ObserversOf(resources)...),
Builder: dag.Builder{
Source: dag.KubernetesCache{
RootNamespaces: ctx.proxyRootNamespaces(),
Expand All @@ -228,21 +236,21 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
// Set the fallback certificate if configured.
if fallbackCert != nil {
log.WithField("context", "fallback-certificate").Infof("enabled fallback certificate with secret: %q", fallbackCert)
eventHandler.FallbackCertificate = fallbackCert
eventHandler.Builder.FallbackCertificate = fallbackCert
}

// wrap eventHandler in a converter for objects from the dynamic client.
// Wrap eventHandler in a converter for objects from the dynamic client.
// and an EventRecorder which tracks API server events.
dynamicHandler := &k8s.DynamicClientHandler{
Next: &contour.EventRecorder{
Next: eventHandler,
Counter: eventHandler.Metrics.EventHandlerOperations,
Counter: contourMetrics.EventHandlerOperations,
},
Converter: converter,
Logger: log.WithField("context", "dynamicHandler"),
}

// step 4. register our resource event handler with the k8s informers,
// Register our resource event handler with the k8s informers,
// using the SyncList to keep track of what to sync later.
var informerSyncList k8s.InformerSyncList

Expand Down Expand Up @@ -274,34 +282,28 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
informerSyncList.InformOnResources(clusterInformerFactory, dynamicHandler, k8s.SecretsResources()...)
}

// step 5. endpoints updates are handled directly by the EndpointsTranslator
// due to their high update rate and their orthogonal nature.
et := &contour.EndpointsTranslator{
FieldLogger: log.WithField("context", "endpointstranslator"),
}

informerSyncList.InformOnResources(clusterInformerFactory,
&k8s.DynamicClientHandler{
Next: &contour.EventRecorder{
Next: et,
Counter: eventHandler.Metrics.EventHandlerOperations,
Next: endpointHandler,
Counter: contourMetrics.EventHandlerOperations,
},
Converter: converter,
Logger: log.WithField("context", "endpointstranslator"),
}, k8s.EndpointsResources()...)

// step 6. setup workgroup runner and register informers.
// Set up workgroup runner and register informers.
var g workgroup.Group
g.Add(startInformer(clusterInformerFactory, log.WithField("context", "contourinformers")))

for ns, factory := range namespacedInformerFactories {
g.Add(startInformer(factory, log.WithField("context", "corenamespacedinformers").WithField("namespace", ns)))
}

// step 7. register our event handler with the workgroup
// Register our event handler with the workgroup.
g.Add(eventHandler.Start())

// step 8. create metrics service and register with workgroup.
// Create metrics service and register with workgroup.
metricsvc := httpsvc.Service{
Addr: ctx.metricsAddr,
Port: ctx.metricsPort,
Expand All @@ -319,7 +321,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {

g.Add(metricsvc.Start)

// step 9. create a separate health service if required.
// Create a separate health service if required.
if ctx.healthAddr != ctx.metricsAddr || ctx.healthPort != ctx.metricsPort {
healthsvc := httpsvc.Service{
Addr: ctx.healthAddr,
Expand All @@ -334,7 +336,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
g.Add(healthsvc.Start)
}

// step 10. create debug service and register with workgroup.
// Create debug service and register with workgroup.
debugsvc := debug.Service{
Service: httpsvc.Service{
Addr: ctx.debugAddr,
Expand All @@ -345,9 +347,17 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
}
g.Add(debugsvc.Start)

// step 11. register leadership election.
// Register leadership election.
eventHandler.IsLeader = setupLeadershipElection(&g, log, ctx, clients, eventHandler.UpdateNow)

// Once we have the leadership detection channel, we can
// push DAG rebuild metrics onto the observer stack.
eventHandler.Observer = &contour.RebuildMetricsObserver{
Metrics: contourMetrics,
IsLeader: eventHandler.IsLeader,
NextObserver: eventHandler.Observer,
}

sh := k8s.StatusUpdateHandler{
Log: log.WithField("context", "StatusUpdateWriter"),
Clients: clients,
Expand All @@ -362,7 +372,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
Updater: sh.Writer(),
}

// step 11. set up ingress load balancer status writer
// Set up ingress load balancer status writer.
lbsw := loadBalancerStatusWriter{
log: log.WithField("context", "loadBalancerStatusWriter"),
clients: clients,
Expand All @@ -374,7 +384,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
}
g.Add(lbsw.Start)

// step 12. register an informer to watch envoy's service if we haven't been given static details.
// Register an informer to watch envoy's service if we haven't been given static details.
if ctx.IngressStatusAddress == "" {
dynamicServiceHandler := &k8s.DynamicClientHandler{
Next: &k8s.ServiceStatusLoadBalancerWatcher{
Expand Down Expand Up @@ -406,15 +416,8 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
}
log.Printf("informer caches synced")

resources := map[string]cgrpc.Resource{
eventHandler.CacheHandler.ClusterCache.TypeURL(): &eventHandler.CacheHandler.ClusterCache,
eventHandler.CacheHandler.RouteCache.TypeURL(): &eventHandler.CacheHandler.RouteCache,
eventHandler.CacheHandler.ListenerCache.TypeURL(): &eventHandler.CacheHandler.ListenerCache,
eventHandler.CacheHandler.SecretCache.TypeURL(): &eventHandler.CacheHandler.SecretCache,
et.TypeURL(): et,
}
opts := ctx.grpcOptions()
s := cgrpc.NewAPI(log, resources, registry, opts...)
s := cgrpc.NewAPI(log, contour.ResourcesOf(resources), registry, opts...)
addr := net.JoinHostPort(ctx.xdsAddr, strconv.Itoa(ctx.xdsPort))
l, err := net.Listen("tcp", addr)
if err != nil {
Expand All @@ -437,7 +440,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
return s.Serve(l)
})

// step 14. Setup SIGTERM handler
// Set up SIGTERM handler for graceful shutdown.
g.Add(func(stop <-chan struct{}) error {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
Expand Down
71 changes: 0 additions & 71 deletions internal/contour/cachehandler.go

This file was deleted.

5 changes: 5 additions & 0 deletions internal/contour/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ func (c *ClusterCache) Query(names []string) []proto.Message {

func (*ClusterCache) TypeURL() string { return resource.ClusterType }

func (c *ClusterCache) OnChange(root *dag.DAG) {
clusters := visitClusters(root)
c.Update(clusters)
}

type clusterVisitor struct {
clusters map[string]*v2.Cluster
}
Expand Down
6 changes: 6 additions & 0 deletions internal/contour/endpointstranslator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
envoy_api_v2_endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2"
"github.com/golang/protobuf/proto"
"github.com/projectcontour/contour/internal/dag"
"github.com/projectcontour/contour/internal/envoy"
"github.com/projectcontour/contour/internal/k8s"
"github.com/projectcontour/contour/internal/protobuf"
Expand All @@ -40,6 +41,11 @@ type EndpointsTranslator struct {
entries map[string]*v2.ClusterLoadAssignment
}

func (e *EndpointsTranslator) OnChange(d *dag.DAG) {
// TODO(jpeach) Update the internal model to map which
// services are targets of which cluster load assignments.
}

func (e *EndpointsTranslator) OnAdd(obj interface{}) {
switch obj := obj.(type) {
case *v1.Endpoints:
Expand Down
Loading

0 comments on commit ef09d9d

Please sign in to comment.