From e2ccb50c96ec472f4bdf003b32572c0913c68f55 Mon Sep 17 00:00:00 2001 From: Tristan Colgate Date: Fri, 7 Sep 2018 10:18:12 +0100 Subject: [PATCH] Allow specifying simple gRPC transport credentials --- cmd/thanos/flags.go | 23 ++++++++++++-- cmd/thanos/main.go | 39 +++++++++++++++++++++-- cmd/thanos/query.go | 74 +++++++++++++++++++++++++++++++++++++++---- cmd/thanos/rule.go | 14 ++++++-- cmd/thanos/sidecar.go | 16 ++++++++-- cmd/thanos/store.go | 17 ++++++++-- 6 files changed, 165 insertions(+), 18 deletions(-) diff --git a/cmd/thanos/flags.go b/cmd/thanos/flags.go index cb8c2a36884..d48572c4636 100644 --- a/cmd/thanos/flags.go +++ b/cmd/thanos/flags.go @@ -12,14 +12,27 @@ import ( "gopkg.in/alecthomas/kingpin.v2" ) -func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, func(log.Logger, *prometheus.Registry, bool, string, bool) (*cluster.Peer, error)) { - grpcBindAddr := cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components if you use gossip, 'grpc-advertise-address' is empty and you require cross-node connection."). +func regCommonServerFlags(cmd *kingpin.CmdClause) ( + grpcBindAddr *string, + httpBindAddr *string, + grpcTLSInsecure *bool, + grpcTLSSrvCert *string, + grpcTLSSrvKey *string, + grpcTLSSrvClientCA *string, + peerFunc func(log.Logger, *prometheus.Registry, bool, string, bool) (*cluster.Peer, error)) { + + grpcBindAddr = cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components if you use gossip, 'grpc-advertise-address' is empty and you require cross-node connection."). Default("0.0.0.0:10901").String() grpcAdvertiseAddr := cmd.Flag("grpc-advertise-address", "Explicit (external) host:port address to advertise for gRPC StoreAPI in gossip cluster. If empty, 'grpc-address' will be used."). String() - httpBindAddr := regHTTPAddrFlag(cmd) + grpcTLSInsecure = cmd.Flag("grpc-tls-insecure", "Do not use gRPC transport credentials to verify connections").Default("true").Bool() + grpcTLSSrvKey = cmd.Flag("grpc-server-tls-key", "TLS Key for the gRPC server").Default("tls.key").String() + grpcTLSSrvCert = cmd.Flag("grpc-server-tls-cert", "TLS Certificate for gRPC server").Default("tls.crt").String() + grpcTLSSrvClientCA = cmd.Flag("grpc-server-tls-client-ca", "TLS CA to verify clients against").Default("ca.crt").String() + + httpBindAddr = regHTTPAddrFlag(cmd) clusterBindAddr := cmd.Flag("cluster.address", "Listen ip:port address for gossip cluster."). Default("0.0.0.0:10900").String() @@ -49,6 +62,10 @@ func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, func(log.Lo return grpcBindAddr, httpBindAddr, + grpcTLSInsecure, + grpcTLSSrvCert, + grpcTLSSrvKey, + grpcTLSSrvClientCA, func(logger log.Logger, reg *prometheus.Registry, waitIfEmpty bool, httpAdvertiseAddr string, queryAPIEnabled bool) (*cluster.Peer, error) { host, port, err := cluster.CalculateAdvertiseAddress(*grpcBindAddr, *grpcAdvertiseAddr) if err != nil { diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index e0a78387249..1d6f8b03518 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -2,7 +2,10 @@ package main import ( "context" + "crypto/tls" + "crypto/x509" "fmt" + "io/ioutil" "math" "net" "net/http" @@ -32,6 +35,7 @@ import ( "github.com/prometheus/common/version" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/status" kingpin "gopkg.in/alecthomas/kingpin.v2" ) @@ -196,7 +200,7 @@ func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) { // - request histogram // - tracing // - panic recovery with panic counter -func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer) []grpc.ServerOption { +func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, insecure bool, cert, key, clientca string) ([]grpc.ServerOption, error) { met := grpc_prometheus.NewServerMetrics() met.EnableHandlingTimeHistogram( grpc_prometheus.WithHistogramBuckets([]float64{ @@ -214,7 +218,7 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o return status.Errorf(codes.Internal, "%s", p) } reg.MustRegister(met, panicsTotal) - return []grpc.ServerOption{ + opts := []grpc.ServerOption{ grpc.MaxSendMsgSize(math.MaxInt32), grpc_middleware.WithUnaryServerChain( met.UnaryServerInterceptor(), @@ -227,6 +231,37 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), ), } + + if insecure { + return opts, nil + } + + tlsCfg := &tls.Config{} + + tlsCert, err := tls.LoadX509KeyPair(cert, key) + if err != nil { + return nil, errors.Wrap(err, "client credentials") + } + tlsCfg.Certificates = []tls.Certificate{tlsCert} + + if clientca != "" { + caPem, err := ioutil.ReadFile(clientca) + if err != nil { + return nil, err + } + + certPool := x509.NewCertPool() + if !certPool.AppendCertsFromPEM(caPem) { + return nil, errors.Wrap(err, "building client CA") + } + tlsCfg.ClientCAs = certPool + tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert + } + + creds := credentials.NewTLS(tlsCfg) + opts = append(opts, grpc.Creds(creds)) + + return opts, nil } // metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics. diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 97290c7562f..b877f580012 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -2,7 +2,10 @@ package main import ( "context" + "crypto/tls" + "crypto/x509" "fmt" + "io/ioutil" "math" "net" "net/http" @@ -28,6 +31,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/tsdb/labels" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" "gopkg.in/alecthomas/kingpin.v2" ) @@ -35,11 +39,15 @@ import ( func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string) { cmd := app.Command(name, "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes") - grpcBindAddr, httpBindAddr, newPeerFn := regCommonServerFlags(cmd) + grpcBindAddr, httpBindAddr, insecure, srvcert, srvkey, srvclientca, newPeerFn := regCommonServerFlags(cmd) httpAdvertiseAddr := cmd.Flag("http-advertise-address", "Explicit (external) host:port address to advertise for HTTP QueryAPI in gossip cluster. If empty, 'http-address' will be used."). String() + cert := cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("tls.crt").String() + key := cmd.Flag("grpc-client-tls-key", "TLS Key for the client's certificate").Default("tls.key").String() + cacert := cmd.Flag("grpc-client-tls-ca", "TLS CA Certificates to use to verify gRPC servers").Default("ca.crt").String() + queryTimeout := cmd.Flag("query.timeout", "Maximum time to process query by query node."). Default("2m").Duration() @@ -83,6 +91,13 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string reg, tracer, *grpcBindAddr, + *insecure, + *srvcert, + *srvkey, + *srvclientca, + *cert, + *key, + *cacert, *httpBindAddr, *maxConcurrentQueries, *queryTimeout, @@ -95,7 +110,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string } } -func storeClientGRPCOpts(reg *prometheus.Registry, tracer opentracing.Tracer) []grpc.DialOption { +func storeClientGRPCOpts(reg *prometheus.Registry, tracer opentracing.Tracer, insecure bool, cert, key, cacert string) ([]grpc.DialOption, error) { grpcMets := grpc_prometheus.NewClientMetrics() grpcMets.EnableClientHandlingTimeHistogram( grpc_prometheus.WithHistogramBuckets([]float64{ @@ -108,7 +123,6 @@ func storeClientGRPCOpts(reg *prometheus.Registry, tracer opentracing.Tracer) [] // Current limit is ~2GB. // TODO(bplotka): Split sent chunks on store node per max 4MB chunks if needed. grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), - grpc.WithInsecure(), grpc.WithUnaryInterceptor( grpc_middleware.ChainUnaryClient( grpcMets.UnaryClientInterceptor(), @@ -127,7 +141,37 @@ func storeClientGRPCOpts(reg *prometheus.Registry, tracer opentracing.Tracer) [] reg.MustRegister(grpcMets) } - return dialOpts + if insecure { + dialOpts = append(dialOpts, grpc.WithInsecure()) + return dialOpts, nil + } + + caPem, err := ioutil.ReadFile(cacert) + if err != nil { + return nil, err + } + + certPool := x509.NewCertPool() + if !certPool.AppendCertsFromPEM(caPem) { + return nil, errors.Wrap(err, "building client CA") + } + + tlsCfg := &tls.Config{ + RootCAs: certPool, + } + + if cert != "" { + cert, err := tls.LoadX509KeyPair(cert, key) + if err != nil { + return nil, errors.Wrap(err, "client credentials") + } + tlsCfg.Certificates = []tls.Certificate{cert} + } + + creds := credentials.NewTLS(tlsCfg) + dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds)) + + return dialOpts, nil } // runQuery starts a server that exposes PromQL Query API. It is responsible for querying configured @@ -138,6 +182,13 @@ func runQuery( reg *prometheus.Registry, tracer opentracing.Tracer, grpcBindAddr string, + insecure bool, + srvcert string, + srvkey string, + srvclientca string, + cacert string, + cert string, + key string, httpBindAddr string, maxConcurrentQueries int, queryTimeout time.Duration, @@ -155,6 +206,12 @@ func runQuery( staticSpecs = append(staticSpecs, query.NewGRPCStoreSpec(addr)) } + + dialOpts, err := storeClientGRPCOpts(reg, tracer, insecure, cert, key, cacert) + if err != nil { + return errors.Wrap(err, "building gRPC client") + } + var ( stores = query.NewStoreSet( logger, @@ -172,7 +229,7 @@ func runQuery( } return specs }, - storeClientGRPCOpts(reg, tracer), + dialOpts, ) proxy = store.NewProxyStore(logger, func(context.Context) ([]store.Client, error) { return stores.Get(), nil @@ -241,7 +298,12 @@ func runQuery( } logger := log.With(logger, "component", "query") - s := grpc.NewServer(defaultGRPCServerOpts(logger, reg, tracer)...) + opts, err := defaultGRPCServerOpts(logger, reg, tracer, insecure, srvcert, srvkey, srvclientca) + if err != nil { + return errors.Wrapf(err, "build gRPC server") + } + + s := grpc.NewServer(opts...) storepb.RegisterStoreServer(s, proxy) g.Add(func() error { diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index ba5d0c25d68..839adadd68f 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -52,7 +52,7 @@ import ( func registerRule(m map[string]setupFunc, app *kingpin.Application, name string) { cmd := app.Command(name, "ruler evaluating Prometheus rules against given Query nodes, exposing Store API and storing old blocks in bucket") - grpcBindAddr, httpBindAddr, newPeerFn := regCommonServerFlags(cmd) + grpcBindAddr, httpBindAddr, insecure, cert, key, clientca, newPeerFn := regCommonServerFlags(cmd) labelStrs := cmd.Flag("label", "Labels to be applied to all generated metrics (repeated)."). PlaceHolder("=\"\"").Strings() @@ -100,7 +100,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string) NoLockfile: true, WALFlushInterval: 30 * time.Second, } - return runRule(g, logger, reg, tracer, lset, *alertmgrs, *grpcBindAddr, *httpBindAddr, *evalInterval, *dataDir, *ruleFiles, peer, *gcsBucket, s3Config, tsdbOpts, name, alertQueryURL) + return runRule(g, logger, reg, tracer, lset, *alertmgrs, *grpcBindAddr, *insecure, *cert, *key, *clientca, *httpBindAddr, *evalInterval, *dataDir, *ruleFiles, peer, *gcsBucket, s3Config, tsdbOpts, name, alertQueryURL) } } @@ -114,6 +114,10 @@ func runRule( lset labels.Labels, alertmgrURLs []string, grpcBindAddr string, + insecure bool, + cert string, + key string, + clientca string, httpBindAddr string, evalInterval time.Duration, dataDir string, @@ -357,7 +361,11 @@ func runRule( store := store.NewTSDBStore(logger, reg, db, lset) - s := grpc.NewServer(defaultGRPCServerOpts(logger, reg, tracer)...) + opts, err := defaultGRPCServerOpts(logger, reg, tracer, insecure, cert, key, clientca) + if err != nil { + return errors.Wrap(err, "setup gRPC options") + } + s := grpc.NewServer(opts...) storepb.RegisterStoreServer(s, store) g.Add(func() error { diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 284c2fc6510..1b4ff7dbde1 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -35,7 +35,7 @@ import ( func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name string) { cmd := app.Command(name, "sidecar for Prometheus server") - grpcBindAddr, httpBindAddr, newPeerFn := regCommonServerFlags(cmd) + grpcBindAddr, httpBindAddr, insecure, cert, key, clientca, newPeerFn := regCommonServerFlags(cmd) promURL := cmd.Flag("prometheus.url", "URL at which to reach Prometheus's API. For better performance use local network."). Default("http://localhost:9090").URL() @@ -74,6 +74,10 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri reg, tracer, *grpcBindAddr, + *insecure, + *cert, + *key, + *clientca, *httpBindAddr, *promURL, *dataDir, @@ -92,6 +96,10 @@ func runSidecar( reg *prometheus.Registry, tracer opentracing.Tracer, grpcBindAddr string, + insecure bool, + cert string, + key string, + clientca string, httpBindAddr string, promURL *url.URL, dataDir string, @@ -208,7 +216,11 @@ func runSidecar( return errors.Wrap(err, "create Prometheus store") } - s := grpc.NewServer(defaultGRPCServerOpts(logger, reg, tracer)...) + opts, err := defaultGRPCServerOpts(logger, reg, tracer, insecure, cert, key, clientca) + if err != nil { + return errors.Wrap(err, "setup gRPC server") + } + s := grpc.NewServer(opts...) storepb.RegisterStoreServer(s, promStore) g.Add(func() error { diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 0e92c8c7746..bccecf297ea 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -26,7 +26,7 @@ import ( func registerStore(m map[string]setupFunc, app *kingpin.Application, name string) { cmd := app.Command(name, "store node giving access to blocks in a GCS bucket") - grpcBindAddr, httpBindAddr, newPeerFn := regCommonServerFlags(cmd) + grpcBindAddr, httpBindAddr, insecure, cert, key, clientca, newPeerFn := regCommonServerFlags(cmd) dataDir := cmd.Flag("data-dir", "Data directory in which to cache remote blocks."). Default("./data").String() @@ -55,6 +55,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string s3Config, *dataDir, *grpcBindAddr, + *insecure, + *cert, + *key, + *clientca, *httpBindAddr, peer, uint64(*indexCacheSize), @@ -75,6 +79,10 @@ func runStore( s3Config *s3.Config, dataDir string, grpcBindAddr string, + insecure bool, + cert string, + key string, + clientca string, httpBindAddr string, peer *cluster.Peer, indexCacheSizeBytes uint64, @@ -138,7 +146,12 @@ func runStore( return errors.Wrap(err, "listen API address") } - s := grpc.NewServer(defaultGRPCServerOpts(logger, reg, tracer)...) + opts, err := defaultGRPCServerOpts(logger, reg, tracer, insecure, key, cert, clientca) + if err != nil { + return errors.Wrap(err, "grpc server options") + } + + s := grpc.NewServer(opts...) storepb.RegisterStoreServer(s, bs) g.Add(func() error {