diff --git a/cmd/thanos/flags.go b/cmd/thanos/flags.go index cb8c2a36884..dfb8f6bc38c 100644 --- a/cmd/thanos/flags.go +++ b/cmd/thanos/flags.go @@ -12,13 +12,17 @@ import ( "gopkg.in/alecthomas/kingpin.v2" ) -func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, func(log.Logger, *prometheus.Registry, bool, string, bool) (*cluster.Peer, error)) { +func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, *bool, *string, *string, func(log.Logger, *prometheus.Registry, bool, string, bool) (*cluster.Peer, error)) { grpcBindAddr := cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components if you use gossip, 'grpc-advertise-address' is empty and you require cross-node connection."). Default("0.0.0.0:10901").String() grpcAdvertiseAddr := cmd.Flag("grpc-advertise-address", "Explicit (external) host:port address to advertise for gRPC StoreAPI in gossip cluster. If empty, 'grpc-address' will be used."). String() + grpcTLSInsecure := cmd.Flag("grpc-tls-insecure", "Do not use gRPC transport credentials to verify connections").Default("true").Bool() + grpcTLSSrvKey := cmd.Flag("grpc-server-tls-key", "TLS Key for the fRPC server").Default("tls.key").String() + grpcTLSSrvCert := cmd.Flag("grpc-server-tls-cert", "TLS Certificate for gRPC server").Default("tls.crt").String() + httpBindAddr := regHTTPAddrFlag(cmd) clusterBindAddr := cmd.Flag("cluster.address", "Listen ip:port address for gossip cluster."). @@ -49,6 +53,9 @@ func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, func(log.Lo return grpcBindAddr, httpBindAddr, + grpcTLSInsecure, + grpcTLSSrvCert, + grpcTLSSrvKey, func(logger log.Logger, reg *prometheus.Registry, waitIfEmpty bool, httpAdvertiseAddr string, queryAPIEnabled bool) (*cluster.Peer, error) { host, port, err := cluster.CalculateAdvertiseAddress(*grpcBindAddr, *grpcAdvertiseAddr) if err != nil { diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index e0a78387249..48d66772288 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/common/version" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/status" kingpin "gopkg.in/alecthomas/kingpin.v2" ) @@ -196,7 +197,7 @@ func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) { // - request histogram // - tracing // - panic recovery with panic counter -func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer) []grpc.ServerOption { +func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, insecure bool, cert, key string) ([]grpc.ServerOption, error) { met := grpc_prometheus.NewServerMetrics() met.EnableHandlingTimeHistogram( grpc_prometheus.WithHistogramBuckets([]float64{ @@ -214,7 +215,7 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o return status.Errorf(codes.Internal, "%s", p) } reg.MustRegister(met, panicsTotal) - return []grpc.ServerOption{ + opts := []grpc.ServerOption{ grpc.MaxSendMsgSize(math.MaxInt32), grpc_middleware.WithUnaryServerChain( met.UnaryServerInterceptor(), @@ -227,6 +228,19 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), ), } + + if insecure { + return opts, nil + } + + tcreds, err := credentials.NewServerTLSFromFile(cert, key) + if err != nil { + return nil, errors.Wrap(err, "server transport credentials") + } + + opts = append(opts, grpc.Creds(tcreds)) + + return opts, nil } // metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics. diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 97290c7562f..a20aef127e8 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/tsdb/labels" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" "gopkg.in/alecthomas/kingpin.v2" ) @@ -35,11 +36,13 @@ import ( func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string) { cmd := app.Command(name, "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes") - grpcBindAddr, httpBindAddr, newPeerFn := regCommonServerFlags(cmd) + grpcBindAddr, httpBindAddr, insecure, cert, key, newPeerFn := regCommonServerFlags(cmd) httpAdvertiseAddr := cmd.Flag("http-advertise-address", "Explicit (external) host:port address to advertise for HTTP QueryAPI in gossip cluster. If empty, 'http-address' will be used."). String() + cacert := cmd.Flag("grpc-client-tls-ca", "TLS CA Certificates to use for verifying gRPC servers").Default("ca.crt").String() + queryTimeout := cmd.Flag("query.timeout", "Maximum time to process query by query node."). Default("2m").Duration() @@ -83,6 +86,10 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string reg, tracer, *grpcBindAddr, + *insecure, + *cacert, + *cert, + *key, *httpBindAddr, *maxConcurrentQueries, *queryTimeout, @@ -95,7 +102,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string } } -func storeClientGRPCOpts(reg *prometheus.Registry, tracer opentracing.Tracer) []grpc.DialOption { +func storeClientGRPCOpts(reg *prometheus.Registry, tracer opentracing.Tracer, insecure bool, cacert string) ([]grpc.DialOption, error) { grpcMets := grpc_prometheus.NewClientMetrics() grpcMets.EnableClientHandlingTimeHistogram( grpc_prometheus.WithHistogramBuckets([]float64{ @@ -108,7 +115,6 @@ func storeClientGRPCOpts(reg *prometheus.Registry, tracer opentracing.Tracer) [] // Current limit is ~2GB. // TODO(bplotka): Split sent chunks on store node per max 4MB chunks if needed. grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), - grpc.WithInsecure(), grpc.WithUnaryInterceptor( grpc_middleware.ChainUnaryClient( grpcMets.UnaryClientInterceptor(), @@ -127,7 +133,19 @@ func storeClientGRPCOpts(reg *prometheus.Registry, tracer opentracing.Tracer) [] reg.MustRegister(grpcMets) } - return dialOpts + if insecure { + dialOpts = append(dialOpts, grpc.WithInsecure()) + return dialOpts, nil + } + + tcreds, err := credentials.NewClientTLSFromFile(cacert, "") + if err != nil { + return nil, errors.Wrap(err, "client transport credentials") + } + + dialOpts = append(dialOpts, grpc.WithTransportCredentials(tcreds)) + + return dialOpts, nil } // runQuery starts a server that exposes PromQL Query API. It is responsible for querying configured @@ -138,6 +156,10 @@ func runQuery( reg *prometheus.Registry, tracer opentracing.Tracer, grpcBindAddr string, + insecure bool, + cacert string, + cert string, + key string, httpBindAddr string, maxConcurrentQueries int, queryTimeout time.Duration, @@ -155,6 +177,12 @@ func runQuery( staticSpecs = append(staticSpecs, query.NewGRPCStoreSpec(addr)) } + + dialOpts, err := storeClientGRPCOpts(reg, tracer, insecure, cacert) + if err != nil { + return errors.Wrap(err, "building gRPC client") + } + var ( stores = query.NewStoreSet( logger, @@ -172,7 +200,7 @@ func runQuery( } return specs }, - storeClientGRPCOpts(reg, tracer), + dialOpts, ) proxy = store.NewProxyStore(logger, func(context.Context) ([]store.Client, error) { return stores.Get(), nil @@ -241,7 +269,12 @@ func runQuery( } logger := log.With(logger, "component", "query") - s := grpc.NewServer(defaultGRPCServerOpts(logger, reg, tracer)...) + opts, err := defaultGRPCServerOpts(logger, reg, tracer, insecure, cert, key) + if err != nil { + return errors.Wrapf(err, "build gRPC server") + } + + s := grpc.NewServer(opts...) storepb.RegisterStoreServer(s, proxy) g.Add(func() error { diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index ba5d0c25d68..fcdcdc738ef 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -52,7 +52,7 @@ import ( func registerRule(m map[string]setupFunc, app *kingpin.Application, name string) { cmd := app.Command(name, "ruler evaluating Prometheus rules against given Query nodes, exposing Store API and storing old blocks in bucket") - grpcBindAddr, httpBindAddr, newPeerFn := regCommonServerFlags(cmd) + grpcBindAddr, httpBindAddr, insecure, cert, key, newPeerFn := regCommonServerFlags(cmd) labelStrs := cmd.Flag("label", "Labels to be applied to all generated metrics (repeated)."). PlaceHolder("=\"\"").Strings() @@ -100,7 +100,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string) NoLockfile: true, WALFlushInterval: 30 * time.Second, } - return runRule(g, logger, reg, tracer, lset, *alertmgrs, *grpcBindAddr, *httpBindAddr, *evalInterval, *dataDir, *ruleFiles, peer, *gcsBucket, s3Config, tsdbOpts, name, alertQueryURL) + return runRule(g, logger, reg, tracer, lset, *alertmgrs, *grpcBindAddr, *insecure, *cert, *key, *httpBindAddr, *evalInterval, *dataDir, *ruleFiles, peer, *gcsBucket, s3Config, tsdbOpts, name, alertQueryURL) } } @@ -114,6 +114,9 @@ func runRule( lset labels.Labels, alertmgrURLs []string, grpcBindAddr string, + insecure bool, + cert string, + key string, httpBindAddr string, evalInterval time.Duration, dataDir string, @@ -357,7 +360,11 @@ func runRule( store := store.NewTSDBStore(logger, reg, db, lset) - s := grpc.NewServer(defaultGRPCServerOpts(logger, reg, tracer)...) + opts, err := defaultGRPCServerOpts(logger, reg, tracer, insecure, cert, key) + if err != nil { + return errors.Wrap(err, "setup gRPC options") + } + s := grpc.NewServer(opts...) storepb.RegisterStoreServer(s, store) g.Add(func() error { diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 284c2fc6510..9eaa5183a2c 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -35,7 +35,7 @@ import ( func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name string) { cmd := app.Command(name, "sidecar for Prometheus server") - grpcBindAddr, httpBindAddr, newPeerFn := regCommonServerFlags(cmd) + grpcBindAddr, httpBindAddr, insecure, cert, key, newPeerFn := regCommonServerFlags(cmd) promURL := cmd.Flag("prometheus.url", "URL at which to reach Prometheus's API. For better performance use local network."). Default("http://localhost:9090").URL() @@ -74,6 +74,9 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri reg, tracer, *grpcBindAddr, + *insecure, + *cert, + *key, *httpBindAddr, *promURL, *dataDir, @@ -92,6 +95,9 @@ func runSidecar( reg *prometheus.Registry, tracer opentracing.Tracer, grpcBindAddr string, + insecure bool, + cert string, + key string, httpBindAddr string, promURL *url.URL, dataDir string, @@ -208,7 +214,11 @@ func runSidecar( return errors.Wrap(err, "create Prometheus store") } - s := grpc.NewServer(defaultGRPCServerOpts(logger, reg, tracer)...) + opts, err := defaultGRPCServerOpts(logger, reg, tracer, insecure, cert, key) + if err != nil { + return errors.Wrap(err, "setup gRPC server") + } + s := grpc.NewServer(opts...) storepb.RegisterStoreServer(s, promStore) g.Add(func() error { diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 0e92c8c7746..b906dcde1ea 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -26,7 +26,7 @@ import ( func registerStore(m map[string]setupFunc, app *kingpin.Application, name string) { cmd := app.Command(name, "store node giving access to blocks in a GCS bucket") - grpcBindAddr, httpBindAddr, newPeerFn := regCommonServerFlags(cmd) + grpcBindAddr, httpBindAddr, insecure, cert, key, newPeerFn := regCommonServerFlags(cmd) dataDir := cmd.Flag("data-dir", "Data directory in which to cache remote blocks."). Default("./data").String() @@ -55,6 +55,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string s3Config, *dataDir, *grpcBindAddr, + *insecure, + *cert, + *key, *httpBindAddr, peer, uint64(*indexCacheSize), @@ -75,6 +78,9 @@ func runStore( s3Config *s3.Config, dataDir string, grpcBindAddr string, + insecure bool, + cert string, + key string, httpBindAddr string, peer *cluster.Peer, indexCacheSizeBytes uint64, @@ -138,7 +144,12 @@ func runStore( return errors.Wrap(err, "listen API address") } - s := grpc.NewServer(defaultGRPCServerOpts(logger, reg, tracer)...) + opts, err := defaultGRPCServerOpts(logger, reg, tracer, insecure, key, cert) + if err != nil { + return errors.Wrap(err, "grpc server options") + } + + s := grpc.NewServer(opts...) storepb.RegisterStoreServer(s, bs) g.Add(func() error {