diff --git a/cmd/contour/serve.go b/cmd/contour/serve.go index c583d97fb36..a77d6165590 100644 --- a/cmd/contour/serve.go +++ b/cmd/contour/serve.go @@ -29,13 +29,13 @@ import ( "github.com/projectcontour/contour/internal/contour" "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/debug" - cgrpc "github.com/projectcontour/contour/internal/grpc" "github.com/projectcontour/contour/internal/health" "github.com/projectcontour/contour/internal/httpsvc" "github.com/projectcontour/contour/internal/k8s" "github.com/projectcontour/contour/internal/metrics" "github.com/projectcontour/contour/internal/timeout" "github.com/projectcontour/contour/internal/workgroup" + "github.com/projectcontour/contour/internal/xds" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "gopkg.in/alecthomas/kingpin.v2" @@ -408,7 +408,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { } g.Add(func(stop <-chan struct{}) error { - log := log.WithField("context", "grpc") + log := log.WithField("context", "xds") log.Printf("waiting for informer caches to sync") if err := informerSyncList.WaitForSync(stop); err != nil { @@ -416,8 +416,11 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { } log.Printf("informer caches synced") - opts := ctx.grpcOptions() - s := cgrpc.NewAPI(log, contour.ResourcesOf(resources), registry, opts...) + rpcServer := xds.RegisterServer( + xds.NewContourServer(log, contour.ResourcesOf(resources)...), + registry, + ctx.grpcOptions()...) + addr := net.JoinHostPort(ctx.xdsAddr, strconv.Itoa(ctx.xdsPort)) l, err := net.Listen("tcp", addr) if err != nil { @@ -434,10 +437,10 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { go func() { <-stop - s.GracefulStop() + rpcServer.GracefulStop() }() - return s.Serve(l) + return rpcServer.Serve(l) }) // Set up SIGTERM handler for graceful shutdown. diff --git a/go.mod b/go.mod index cb7419bb7a6..e7724dc79f8 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 github.com/prometheus/common v0.6.0 github.com/sirupsen/logrus v1.4.2 + github.com/stretchr/testify v1.5.1 // indirect golang.org/x/tools v0.0.0-20190929041059-e7abfedfabcf // indirect google.golang.org/grpc v1.25.1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 diff --git a/go.sum b/go.sum index e2b8f4d1c12..b29383e61fd 100644 --- a/go.sum +++ b/go.sum @@ -357,6 +357,8 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= diff --git a/internal/contour/resources.go b/internal/contour/resources.go index 9eae11d73b6..d34ce316da2 100644 --- a/internal/contour/resources.go +++ b/internal/contour/resources.go @@ -18,7 +18,7 @@ package contour import ( "github.com/projectcontour/contour/internal/dag" - "github.com/projectcontour/contour/internal/grpc" + "github.com/projectcontour/contour/internal/xds" ) // ResourceCache is a store of an xDS resource type. It is able to @@ -26,12 +26,12 @@ import ( // serve those resources over xDS. type ResourceCache interface { dag.Observer - grpc.Resource + xds.Resource } -// ResourcesOf transliterates a slice of ResourceCache into a slice of grpc.Resource. -func ResourcesOf(in []ResourceCache) []grpc.Resource { - out := make([]grpc.Resource, len(in)) +// ResourcesOf transliterates a slice of ResourceCache into a slice of xds.Resource. +func ResourcesOf(in []ResourceCache) []xds.Resource { + out := make([]xds.Resource, len(in)) for i := range in { out[i] = in[i] } diff --git a/internal/contour/server_test.go b/internal/contour/server_test.go index 6af46be17bd..e609eab9fbc 100644 --- a/internal/contour/server_test.go +++ b/internal/contour/server_test.go @@ -24,8 +24,7 @@ import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2" "github.com/projectcontour/contour/internal/dag" - cgrpc "github.com/projectcontour/contour/internal/grpc" - "github.com/prometheus/client_golang/prometheus" + "github.com/projectcontour/contour/internal/xds" "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -197,6 +196,7 @@ func TestGRPC(t *testing.T) { &SecretCache{}, &RouteCache{}, &ClusterCache{}, + et, } eh = &EventHandler{ @@ -204,8 +204,7 @@ func TestGRPC(t *testing.T) { FieldLogger: log, } - r := prometheus.NewRegistry() - srv := cgrpc.NewAPI(log, append(ResourcesOf(resources), et), r) + srv := xds.RegisterServer(xds.NewContourServer(log, ResourcesOf(resources)...), nil) l, err := net.Listen("tcp", "127.0.0.1:0") check(t, err) done := make(chan error, 1) diff --git a/internal/featuretests/featuretests.go b/internal/featuretests/featuretests.go index 3d2f6cd1507..351ca41449d 100644 --- a/internal/featuretests/featuretests.go +++ b/internal/featuretests/featuretests.go @@ -32,12 +32,12 @@ import ( "github.com/projectcontour/contour/internal/contour" "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/fixture" - cgrpc "github.com/projectcontour/contour/internal/grpc" "github.com/projectcontour/contour/internal/k8s" "github.com/projectcontour/contour/internal/metrics" "github.com/projectcontour/contour/internal/protobuf" "github.com/projectcontour/contour/internal/sorter" "github.com/projectcontour/contour/internal/workgroup" + "github.com/projectcontour/contour/internal/xds" "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" v1 "k8s.io/api/core/v1" @@ -75,6 +75,7 @@ func setup(t *testing.T, opts ...interface{}) (cache.ResourceEventHandler, *Cont &contour.SecretCache{}, &contour.RouteCache{}, &contour.ClusterCache{}, + et, } r := prometheus.NewRegistry() @@ -111,8 +112,10 @@ func setup(t *testing.T, opts ...interface{}) (cache.ResourceEventHandler, *Cont l, err := net.Listen("tcp", "127.0.0.1:0") check(t, err) - // Resource types in xDS v2. - srv := cgrpc.NewAPI(log, append(contour.ResourcesOf(resources), et), r) + + srv := xds.RegisterServer( + xds.NewContourServer(log, contour.ResourcesOf(resources)...), + r /* Prometheus registry */) var g workgroup.Group diff --git a/internal/grpc/server.go b/internal/grpc/server.go deleted file mode 100644 index 754588215c0..00000000000 --- a/internal/grpc/server.go +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright Project Contour Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package grpc provides a gRPC implementation of the Envoy v2 xDS API. -package grpc - -import ( - "context" - - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "github.com/prometheus/client_golang/prometheus" - - v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" - discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" - loadstats "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" - "github.com/sirupsen/logrus" -) - -// NewAPI returns a *grpc.Server which responds to the Envoy v2 xDS gRPC API. -func NewAPI(log logrus.FieldLogger, resources []Resource, registry *prometheus.Registry, opts ...grpc.ServerOption) *grpc.Server { - resourceMap := map[string]Resource{} - - for i, r := range resources { - resourceMap[r.TypeURL()] = resources[i] - } - - s := &grpcServer{ - xdsHandler{ - FieldLogger: log, - resources: resourceMap, - }, - grpc_prometheus.NewServerMetrics(), - } - registry.MustRegister(s.metrics) - opts = append(opts, grpc.StreamInterceptor(s.metrics.StreamServerInterceptor()), - grpc.UnaryInterceptor(s.metrics.UnaryServerInterceptor())) - g := grpc.NewServer(opts...) - v2.RegisterClusterDiscoveryServiceServer(g, s) - v2.RegisterEndpointDiscoveryServiceServer(g, s) - v2.RegisterListenerDiscoveryServiceServer(g, s) - v2.RegisterRouteDiscoveryServiceServer(g, s) - discovery.RegisterSecretDiscoveryServiceServer(g, s) - s.metrics.InitializeMetrics(g) - return g -} - -// grpcServer implements the LDS, RDS, CDS, and EDS, gRPC endpoints. -type grpcServer struct { - xdsHandler - metrics *grpc_prometheus.ServerMetrics -} - -func (s *grpcServer) FetchClusters(_ context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "FetchClusters unimplemented") -} - -func (s *grpcServer) FetchEndpoints(_ context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "FetchEndpoints unimplemented") -} - -func (s *grpcServer) DeltaEndpoints(v2.EndpointDiscoveryService_DeltaEndpointsServer) error { - return status.Errorf(codes.Unimplemented, "DeltaEndpoints unimplemented") -} - -func (s *grpcServer) FetchListeners(_ context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "FetchListeners unimplemented") -} - -func (s *grpcServer) DeltaListeners(v2.ListenerDiscoveryService_DeltaListenersServer) error { - return status.Errorf(codes.Unimplemented, "DeltaListeners unimplemented") -} - -func (s *grpcServer) FetchRoutes(_ context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "FetchRoutes unimplemented") -} - -func (s *grpcServer) FetchSecrets(_ context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "FetchSecrets unimplemented") -} - -func (s *grpcServer) DeltaSecrets(discovery.SecretDiscoveryService_DeltaSecretsServer) error { - return status.Errorf(codes.Unimplemented, "DeltaSecrets unimplemented") -} - -func (s *grpcServer) StreamClusters(srv v2.ClusterDiscoveryService_StreamClustersServer) error { - return s.stream(srv) -} - -func (s *grpcServer) StreamEndpoints(srv v2.EndpointDiscoveryService_StreamEndpointsServer) error { - return s.stream(srv) -} - -func (s *grpcServer) StreamLoadStats(srv loadstats.LoadReportingService_StreamLoadStatsServer) error { - return status.Errorf(codes.Unimplemented, "StreamLoadStats unimplemented") -} - -func (s *grpcServer) DeltaClusters(v2.ClusterDiscoveryService_DeltaClustersServer) error { - return status.Errorf(codes.Unimplemented, "IncrementalClusters unimplemented") -} - -func (s *grpcServer) DeltaRoutes(v2.RouteDiscoveryService_DeltaRoutesServer) error { - return status.Errorf(codes.Unimplemented, "IncrementalRoutes unimplemented") -} - -func (s *grpcServer) StreamListeners(srv v2.ListenerDiscoveryService_StreamListenersServer) error { - return s.stream(srv) -} - -func (s *grpcServer) StreamRoutes(srv v2.RouteDiscoveryService_StreamRoutesServer) error { - return s.stream(srv) -} - -func (s *grpcServer) StreamSecrets(srv discovery.SecretDiscoveryService_StreamSecretsServer) error { - return s.stream(srv) -} diff --git a/internal/grpc/xds.go b/internal/xds/contour.go similarity index 67% rename from internal/grpc/xds.go rename to internal/xds/contour.go index 632e5fe51a0..71ef333cb8a 100644 --- a/internal/grpc/xds.go +++ b/internal/xds/contour.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package grpc +package xds import ( "context" @@ -19,7 +19,8 @@ import ( "strconv" "sync/atomic" - envoy_api_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" + v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/any" @@ -42,23 +43,56 @@ type Resource interface { TypeURL() string } -// xdsHandler implements the Envoy xDS gRPC protocol. -type xdsHandler struct { - logrus.FieldLogger - connections counter - resources map[string]Resource // registered resource types -} - type grpcStream interface { Context() context.Context - Send(*envoy_api_v2.DiscoveryResponse) error - Recv() (*envoy_api_v2.DiscoveryRequest, error) + Send(*v2.DiscoveryResponse) error + Recv() (*v2.DiscoveryRequest, error) +} + +// counter holds an atomically incrementing counter. +type counter uint64 + +func (c *counter) next() uint64 { + return atomic.AddUint64((*uint64)(c), 1) +} + +var connections counter + +// NewContourServer creates an internally implemented Server that streams the +// provided set of Resource objects. The returned Server implements the xDS +// State of the World (SotW) variant. +func NewContourServer(log logrus.FieldLogger, resources ...Resource) Server { + c := contourServer{ + FieldLogger: log, + resources: map[string]Resource{}, + } + + for i, r := range resources { + c.resources[r.TypeURL()] = resources[i] + } + + return &c +} + +type contourServer struct { + // Since we only implement the streaming state of the world + // protocol, embed the default null implementations to handle + // the unimplemented gRPC endpoints. + discovery.UnimplementedAggregatedDiscoveryServiceServer + discovery.UnimplementedSecretDiscoveryServiceServer + v2.UnimplementedRouteDiscoveryServiceServer + v2.UnimplementedEndpointDiscoveryServiceServer + v2.UnimplementedClusterDiscoveryServiceServer + v2.UnimplementedListenerDiscoveryServiceServer + + logrus.FieldLogger + resources map[string]Resource } // stream processes a stream of DiscoveryRequests. -func (xh *xdsHandler) stream(st grpcStream) error { - // bump connection counter and set it as a field on the logger - log := xh.WithField("connection", xh.connections.next()) +func (s *contourServer) stream(st grpcStream) error { + // Bump connection counter and set it as a field on the logger. + log := s.WithField("connection", connections.next()) // Notify whether the stream terminated on error. done := func(log *logrus.Entry, err error) error { @@ -102,7 +136,7 @@ func (xh *xdsHandler) stream(st grpcStream) error { // from the request we derive the resource to stream which have // been registered according to the typeURL. - r, ok := xh.resources[req.TypeUrl] + r, ok := s.resources[req.TypeUrl] if !ok { return done(log, fmt.Errorf("no resource registered for typeURL %q", req.TypeUrl)) } @@ -140,7 +174,7 @@ func (xh *xdsHandler) stream(st grpcStream) error { any = append(any, a) } - resp := &envoy_api_v2.DiscoveryResponse{ + resp := &v2.DiscoveryResponse{ VersionInfo: strconv.Itoa(last), Resources: any, TypeUrl: r.TypeURL(), @@ -157,9 +191,22 @@ func (xh *xdsHandler) stream(st grpcStream) error { } } -// counter holds an atomically incrementing counter. -type counter uint64 +func (s *contourServer) StreamClusters(srv v2.ClusterDiscoveryService_StreamClustersServer) error { + return s.stream(srv) +} -func (c *counter) next() uint64 { - return atomic.AddUint64((*uint64)(c), 1) +func (s *contourServer) StreamEndpoints(srv v2.EndpointDiscoveryService_StreamEndpointsServer) error { + return s.stream(srv) +} + +func (s *contourServer) StreamListeners(srv v2.ListenerDiscoveryService_StreamListenersServer) error { + return s.stream(srv) +} + +func (s *contourServer) StreamRoutes(srv v2.RouteDiscoveryService_StreamRoutesServer) error { + return s.stream(srv) +} + +func (s *contourServer) StreamSecrets(srv discovery.SecretDiscoveryService_StreamSecretsServer) error { + return s.stream(srv) } diff --git a/internal/grpc/xds_test.go b/internal/xds/contour_test.go similarity index 96% rename from internal/grpc/xds_test.go rename to internal/xds/contour_test.go index d0cfb662335..109c8cc3536 100644 --- a/internal/grpc/xds_test.go +++ b/internal/xds/contour_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package grpc +package xds import ( "context" @@ -29,12 +29,12 @@ func TestXDSHandlerStream(t *testing.T) { log := logrus.New() log.SetOutput(ioutil.Discard) tests := map[string]struct { - xh xdsHandler + xh contourServer stream grpcStream want error }{ "recv returns error immediately": { - xh: xdsHandler{FieldLogger: log}, + xh: contourServer{FieldLogger: log}, stream: &mockStream{ context: context.Background, recv: func() (*v2.DiscoveryRequest, error) { @@ -44,7 +44,7 @@ func TestXDSHandlerStream(t *testing.T) { want: io.EOF, }, "no registered typeURL": { - xh: xdsHandler{FieldLogger: log}, + xh: contourServer{FieldLogger: log}, stream: &mockStream{ context: context.Background, recv: func() (*v2.DiscoveryRequest, error) { @@ -56,7 +56,7 @@ func TestXDSHandlerStream(t *testing.T) { want: fmt.Errorf("no resource registered for typeURL %q", "com.heptio.potato"), }, "failed to convert values to any": { - xh: xdsHandler{ + xh: contourServer{ FieldLogger: log, resources: map[string]Resource{ "com.heptio.potato": &mockResource{ @@ -81,7 +81,7 @@ func TestXDSHandlerStream(t *testing.T) { want: fmt.Errorf("proto: Marshal called with nil"), }, "failed to send": { - xh: xdsHandler{ + xh: contourServer{ FieldLogger: log, resources: map[string]Resource{ "com.heptio.potato": &mockResource{ @@ -109,7 +109,7 @@ func TestXDSHandlerStream(t *testing.T) { want: io.EOF, }, "context canceled": { - xh: xdsHandler{ + xh: contourServer{ FieldLogger: log, resources: map[string]Resource{ "com.heptio.potato": &mockResource{ diff --git a/internal/xds/server.go b/internal/xds/server.go new file mode 100644 index 00000000000..8cab5a7fcba --- /dev/null +++ b/internal/xds/server.go @@ -0,0 +1,70 @@ +// Copyright Project Contour Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xds + +import ( + clusterservice "github.com/envoyproxy/go-control-plane/envoy/api/v2" + endpointservice "github.com/envoyproxy/go-control-plane/envoy/api/v2" + listenerservice "github.com/envoyproxy/go-control-plane/envoy/api/v2" + routeservice "github.com/envoyproxy/go-control-plane/envoy/api/v2" + v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc" +) + +// Server is a collection of handlers for streaming discovery requests. +type Server interface { + clusterservice.ClusterDiscoveryServiceServer + discovery.AggregatedDiscoveryServiceServer + discovery.SecretDiscoveryServiceServer + endpointservice.EndpointDiscoveryServiceServer + listenerservice.ListenerDiscoveryServiceServer + routeservice.RouteDiscoveryServiceServer +} + +// RegisterServer registers the given xDS protocol Server with the gRPC +// runtime. If registry is non-nil gRPC server metrics will be automatically +// configured and enabled. +func RegisterServer(srv Server, registry *prometheus.Registry, opts ...grpc.ServerOption) *grpc.Server { + var metrics *grpc_prometheus.ServerMetrics + + // TODO(jpeach) Figure out how to decouple this. + if registry != nil { + metrics = grpc_prometheus.NewServerMetrics() + registry.MustRegister(metrics) + + opts = append(opts, + grpc.StreamInterceptor(metrics.StreamServerInterceptor()), + grpc.UnaryInterceptor(metrics.UnaryServerInterceptor()), + ) + + } + + g := grpc.NewServer(opts...) + + discovery.RegisterAggregatedDiscoveryServiceServer(g, srv) + discovery.RegisterSecretDiscoveryServiceServer(g, srv) + v2.RegisterClusterDiscoveryServiceServer(g, srv) + v2.RegisterEndpointDiscoveryServiceServer(g, srv) + v2.RegisterListenerDiscoveryServiceServer(g, srv) + v2.RegisterRouteDiscoveryServiceServer(g, srv) + + if metrics != nil { + metrics.InitializeMetrics(g) + } + + return g +}