Skip to content

Commit

Permalink
Allow specifying simple gRPC transport credentials
Browse files Browse the repository at this point in the history
  • Loading branch information
Tristan Colgate committed Sep 19, 2018
1 parent 984f42e commit ec28508
Show file tree
Hide file tree
Showing 11 changed files with 350 additions and 338 deletions.
236 changes: 27 additions & 209 deletions Gopkg.lock

Large diffs are not rendered by default.

20 changes: 17 additions & 3 deletions cmd/thanos/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,25 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, func(log.Logger, *prometheus.Registry, bool, string, bool) (*cluster.Peer, error)) {
grpcBindAddr := cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components if you use gossip, 'grpc-advertise-address' is empty and you require cross-node connection.").
func regCommonServerFlags(cmd *kingpin.CmdClause) (
grpcBindAddr *string,
httpBindAddr *string,
grpcTLSSrvCert *string,
grpcTLSSrvKey *string,
grpcTLSSrvClientCA *string,
peerFunc func(log.Logger, *prometheus.Registry, bool, string, bool) (*cluster.Peer, error)) {

grpcBindAddr = cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components if you use gossip, 'grpc-advertise-address' is empty and you require cross-node connection.").
Default("0.0.0.0:10901").String()

grpcAdvertiseAddr := cmd.Flag("grpc-advertise-address", "Explicit (external) host:port address to advertise for gRPC StoreAPI in gossip cluster. If empty, 'grpc-address' will be used.").
String()

httpBindAddr := regHTTPAddrFlag(cmd)
grpcTLSSrvKey = cmd.Flag("grpc-server-tls-key", "TLS Key for the gRPC server, leave blank to disable TLS").Default("").String()
grpcTLSSrvCert = cmd.Flag("grpc-server-tls-cert", "TLS Certificate for gRPC server, leave blank to disable TLS").Default("").String()
grpcTLSSrvClientCA = cmd.Flag("grpc-server-tls-client-ca", "TLS CA to verify clients against").Default("").String()

httpBindAddr = regHTTPAddrFlag(cmd)

clusterBindAddr := cmd.Flag("cluster.address", "Listen ip:port address for gossip cluster.").
Default("0.0.0.0:10900").String()
Expand Down Expand Up @@ -53,6 +64,9 @@ func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, func(log.Lo

return grpcBindAddr,
httpBindAddr,
grpcTLSSrvCert,
grpcTLSSrvKey,
grpcTLSSrvClientCA,
func(logger log.Logger, reg *prometheus.Registry, waitIfEmpty bool, httpAdvertiseAddr string, queryAPIEnabled bool) (*cluster.Peer, error) {
host, port, err := cluster.CalculateAdvertiseAddress(*grpcBindAddr, *grpcAdvertiseAddr)
if err != nil {
Expand Down
45 changes: 43 additions & 2 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
Expand Down Expand Up @@ -32,6 +35,7 @@ import (
"github.com/prometheus/common/version"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
Expand Down Expand Up @@ -196,7 +200,7 @@ func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) {
// - request histogram
// - tracing
// - panic recovery with panic counter
func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer) []grpc.ServerOption {
func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, cert, key, clientCA string) ([]grpc.ServerOption, error) {
met := grpc_prometheus.NewServerMetrics()
met.EnableHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
Expand All @@ -214,7 +218,7 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o
return status.Errorf(codes.Internal, "%s", p)
}
reg.MustRegister(met, panicsTotal)
return []grpc.ServerOption{
opts := []grpc.ServerOption{
grpc.MaxSendMsgSize(math.MaxInt32),
grpc_middleware.WithUnaryServerChain(
met.UnaryServerInterceptor(),
Expand All @@ -227,6 +231,43 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o
grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
}

if key == "" || cert == "" {
level.Info(logger).Log("msg", "disabled TLS, key and cert must be set to enable")
return opts, nil
}

tlsCfg := &tls.Config{}

tlsCert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, errors.Wrap(err, "server credentials")
}

level.Info(logger).Log("msg", "enabled gRPC server side TLS")

tlsCfg.Certificates = []tls.Certificate{tlsCert}

if clientCA != "" {
caPEM, err := ioutil.ReadFile(clientCA)
if err != nil {
return nil, errors.Wrap(err, "reading client CA")
}

certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caPEM) {
return nil, errors.Wrap(err, "building client CA")
}
tlsCfg.ClientCAs = certPool
tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert

level.Info(logger).Log("msg", "gRPC server TLS client verification enabled")
}

creds := credentials.NewTLS(tlsCfg)
opts = append(opts, grpc.Creds(creds))

return opts, nil
}

// metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics.
Expand Down
90 changes: 84 additions & 6 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
Expand All @@ -28,18 +31,24 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"gopkg.in/alecthomas/kingpin.v2"
)

// registerQuery registers a query command.
func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes")

grpcBindAddr, httpBindAddr, newPeerFn := regCommonServerFlags(cmd)
grpcBindAddr, httpBindAddr, srvCert, srvKey, srvClientCA, newPeerFn := regCommonServerFlags(cmd)

httpAdvertiseAddr := cmd.Flag("http-advertise-address", "Explicit (external) host:port address to advertise for HTTP QueryAPI in gossip cluster. If empty, 'http-address' will be used.").
String()

secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool()
cert := cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").String()
key := cmd.Flag("grpc-client-tls-key", "TLS Key for the client's certificate").Default("").String()
caCert := cmd.Flag("grpc-client-tls-ca", "TLS CA Certificates to use to verify gRPC servers").Default("").String()

queryTimeout := modelDuration(cmd.Flag("query.timeout", "Maximum time to process query by query node.").
Default("2m"))

Expand Down Expand Up @@ -83,6 +92,13 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
reg,
tracer,
*grpcBindAddr,
*srvCert,
*srvKey,
*srvClientCA,
*secure,
*cert,
*key,
*caCert,
*httpBindAddr,
*maxConcurrentQueries,
time.Duration(*queryTimeout),
Expand All @@ -95,7 +111,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
}
}

func storeClientGRPCOpts(reg *prometheus.Registry, tracer opentracing.Tracer) []grpc.DialOption {
func storeClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert string) ([]grpc.DialOption, error) {
grpcMets := grpc_prometheus.NewClientMetrics()
grpcMets.EnableClientHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
Expand All @@ -108,7 +124,6 @@ func storeClientGRPCOpts(reg *prometheus.Registry, tracer opentracing.Tracer) []
// Current limit is ~2GB.
// TODO(bplotka): Split sent chunks on store node per max 4MB chunks if needed.
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpcMets.UnaryClientInterceptor(),
Expand All @@ -127,7 +142,52 @@ func storeClientGRPCOpts(reg *prometheus.Registry, tracer opentracing.Tracer) []
reg.MustRegister(grpcMets)
}

return dialOpts
if !secure {
dialOpts = append(dialOpts, grpc.WithInsecure())
return dialOpts, nil
}

level.Info(logger).Log("msg", "Enabling client to server TLS")

var certPool *x509.CertPool

if caCert != "" {
caPEM, err := ioutil.ReadFile(caCert)
if err != nil {
return nil, errors.Wrap(err, "reading client CA")
}

certPool = x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caPEM) {
return nil, errors.Wrap(err, "building client CA")
}
level.Info(logger).Log("msg", "TLS Client using provided certificate pool")
} else {
var err error
certPool, err = x509.SystemCertPool()
if err != nil {
return nil, errors.Wrap(err, "reading system certificate pool")
}
level.Info(logger).Log("msg", "TLS Client using system certificate pool")
}

tlsCfg := &tls.Config{
RootCAs: certPool,
}

if cert != "" {
cert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, errors.Wrap(err, "client credentials")
}
tlsCfg.Certificates = []tls.Certificate{cert}
level.Info(logger).Log("msg", "TLS Client authentication enabled")
}

creds := credentials.NewTLS(tlsCfg)
dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))

return dialOpts, nil
}

// runQuery starts a server that exposes PromQL Query API. It is responsible for querying configured
Expand All @@ -138,6 +198,13 @@ func runQuery(
reg *prometheus.Registry,
tracer opentracing.Tracer,
grpcBindAddr string,
srvCert string,
srvKey string,
srvClientCA string,
secure bool,
cert string,
key string,
caCert string,
httpBindAddr string,
maxConcurrentQueries int,
queryTimeout time.Duration,
Expand All @@ -155,6 +222,12 @@ func runQuery(

staticSpecs = append(staticSpecs, query.NewGRPCStoreSpec(addr))
}

dialOpts, err := storeClientGRPCOpts(logger, reg, tracer, secure, cert, key, caCert)
if err != nil {
return errors.Wrap(err, "building gRPC client")
}

var (
stores = query.NewStoreSet(
logger,
Expand All @@ -172,7 +245,7 @@ func runQuery(
}
return specs
},
storeClientGRPCOpts(reg, tracer),
dialOpts,
)
proxy = store.NewProxyStore(logger, func(context.Context) ([]store.Client, error) {
return stores.Get(), nil
Expand Down Expand Up @@ -241,7 +314,12 @@ func runQuery(
}
logger := log.With(logger, "component", "query")

s := grpc.NewServer(defaultGRPCServerOpts(logger, reg, tracer)...)
opts, err := defaultGRPCServerOpts(logger, reg, tracer, srvCert, srvKey, srvClientCA)
if err != nil {
return errors.Wrapf(err, "build gRPC server")
}

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, proxy)

g.Add(func() error {
Expand Down
14 changes: 12 additions & 2 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import (
func registerRule(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "ruler evaluating Prometheus rules against given Query nodes, exposing Store API and storing old blocks in bucket")

grpcBindAddr, httpBindAddr, newPeerFn := regCommonServerFlags(cmd)
grpcBindAddr, httpBindAddr, cert, key, clientCA, newPeerFn := regCommonServerFlags(cmd)

labelStrs := cmd.Flag("label", "Labels to be applied to all generated metrics (repeated).").
PlaceHolder("<name>=\"<value>\"").Strings()
Expand Down Expand Up @@ -104,6 +104,9 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
lset,
*alertmgrs,
*grpcBindAddr,
*cert,
*key,
*clientCA,
*httpBindAddr,
time.Duration(*evalInterval),
*dataDir,
Expand All @@ -127,6 +130,9 @@ func runRule(
lset labels.Labels,
alertmgrURLs []string,
grpcBindAddr string,
cert string,
key string,
clientCA string,
httpBindAddr string,
evalInterval time.Duration,
dataDir string,
Expand Down Expand Up @@ -369,7 +375,11 @@ func runRule(

store := store.NewTSDBStore(logger, reg, db, lset)

s := grpc.NewServer(defaultGRPCServerOpts(logger, reg, tracer)...)
opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC options")
}
s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, store)

g.Add(func() error {
Expand Down
14 changes: 12 additions & 2 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "sidecar for Prometheus server")

grpcBindAddr, httpBindAddr, newPeerFn := regCommonServerFlags(cmd)
grpcBindAddr, httpBindAddr, cert, key, clientCA, newPeerFn := regCommonServerFlags(cmd)

promURL := cmd.Flag("prometheus.url", "URL at which to reach Prometheus's API. For better performance use local network.").
Default("http://localhost:9090").URL()
Expand Down Expand Up @@ -71,6 +71,9 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri
reg,
tracer,
*grpcBindAddr,
*cert,
*key,
*clientCA,
*httpBindAddr,
*promURL,
*dataDir,
Expand All @@ -88,6 +91,9 @@ func runSidecar(
reg *prometheus.Registry,
tracer opentracing.Tracer,
grpcBindAddr string,
cert string,
key string,
clientCA string,
httpBindAddr string,
promURL *url.URL,
dataDir string,
Expand Down Expand Up @@ -203,7 +209,11 @@ func runSidecar(
return errors.Wrap(err, "create Prometheus store")
}

s := grpc.NewServer(defaultGRPCServerOpts(logger, reg, tracer)...)
opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, promStore)

g.Add(func() error {
Expand Down
Loading

0 comments on commit ec28508

Please sign in to comment.