Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow specifying simple gRPC transport credentials #508

Merged
merged 2 commits into from
Sep 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 17 additions & 3 deletions cmd/thanos/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,25 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, func(log.Logger, *prometheus.Registry, bool, string, bool) (*cluster.Peer, error)) {
grpcBindAddr := cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components if you use gossip, 'grpc-advertise-address' is empty and you require cross-node connection.").
func regCommonServerFlags(cmd *kingpin.CmdClause) (
grpcBindAddr *string,
httpBindAddr *string,
grpcTLSSrvCert *string,
grpcTLSSrvKey *string,
grpcTLSSrvClientCA *string,
peerFunc func(log.Logger, *prometheus.Registry, bool, string, bool) (*cluster.Peer, error)) {

grpcBindAddr = cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components if you use gossip, 'grpc-advertise-address' is empty and you require cross-node connection.").
Default("0.0.0.0:10901").String()

grpcAdvertiseAddr := cmd.Flag("grpc-advertise-address", "Explicit (external) host:port address to advertise for gRPC StoreAPI in gossip cluster. If empty, 'grpc-address' will be used.").
String()

httpBindAddr := regHTTPAddrFlag(cmd)
grpcTLSSrvCert = cmd.Flag("grpc-server-tls-cert", "TLS Certificate for gRPC server, leave blank to disable TLS").Default("").String()
grpcTLSSrvKey = cmd.Flag("grpc-server-tls-key", "TLS Key for the gRPC server, leave blank to disable TLS").Default("").String()
grpcTLSSrvClientCA = cmd.Flag("grpc-server-tls-client-ca", "TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert)").Default("").String()

httpBindAddr = regHTTPAddrFlag(cmd)

clusterBindAddr := cmd.Flag("cluster.address", "Listen ip:port address for gossip cluster.").
Default("0.0.0.0:10900").String()
Expand Down Expand Up @@ -53,6 +64,9 @@ func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, func(log.Lo

return grpcBindAddr,
httpBindAddr,
grpcTLSSrvCert,
grpcTLSSrvKey,
grpcTLSSrvClientCA,
func(logger log.Logger, reg *prometheus.Registry, waitIfEmpty bool, httpAdvertiseAddr string, queryAPIEnabled bool) (*cluster.Peer, error) {
host, port, err := cluster.CalculateAdvertiseAddress(*grpcBindAddr, *grpcAdvertiseAddr)
if err != nil {
Expand Down
52 changes: 50 additions & 2 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
Expand Down Expand Up @@ -32,6 +35,7 @@ import (
"github.com/prometheus/common/version"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
Expand Down Expand Up @@ -196,7 +200,7 @@ func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) {
// - request histogram
// - tracing
// - panic recovery with panic counter
func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer) []grpc.ServerOption {
func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, cert, key, clientCA string) ([]grpc.ServerOption, error) {
met := grpc_prometheus.NewServerMetrics()
met.EnableHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
Expand All @@ -214,7 +218,7 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o
return status.Errorf(codes.Internal, "%s", p)
}
reg.MustRegister(met, panicsTotal)
return []grpc.ServerOption{
opts := []grpc.ServerOption{
grpc.MaxSendMsgSize(math.MaxInt32),
grpc_middleware.WithUnaryServerChain(
met.UnaryServerInterceptor(),
Expand All @@ -227,6 +231,50 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o
grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
}

tcolgate marked this conversation as resolved.
Show resolved Hide resolved
if key == "" && cert == "" {
if clientCA != "" {
return nil, errors.New("when a client CA is used a server key and certificate must also be provided")
}

level.Info(logger).Log("msg", "disabled TLS, key and cert must be set to enable")
return opts, nil
}

if key == "" || cert == "" {
return nil, errors.New("both server key and certificate must be provided")
}

tlsCfg := &tls.Config{
MinVersion: tls.VersionTLS12,
}

tlsCert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, errors.Wrap(err, "server credentials")
}

level.Info(logger).Log("msg", "enabled gRPC server side TLS")

tlsCfg.Certificates = []tls.Certificate{tlsCert}
tcolgate marked this conversation as resolved.
Show resolved Hide resolved

if clientCA != "" {
caPEM, err := ioutil.ReadFile(clientCA)
if err != nil {
return nil, errors.Wrap(err, "reading client CA")
}

certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caPEM) {
return nil, errors.Wrap(err, "building client CA")
}
tlsCfg.ClientCAs = certPool
tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert

level.Info(logger).Log("msg", "gRPC server TLS client verification enabled")
}

return append(opts, grpc.Creds(credentials.NewTLS(tlsCfg))), nil
}

// metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics.
Expand Down
88 changes: 82 additions & 6 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
Expand All @@ -28,18 +31,24 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"gopkg.in/alecthomas/kingpin.v2"
)

// registerQuery registers a query command.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW what happened to GitHub - coloring for golang sucks ... now there is some red mark on go comments??

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or is it only my browser?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's doing it for me too, I've no idea what is going on there.

func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes")

grpcBindAddr, httpBindAddr, newPeerFn := regCommonServerFlags(cmd)
grpcBindAddr, httpBindAddr, srvCert, srvKey, srvClientCA, newPeerFn := regCommonServerFlags(cmd)

httpAdvertiseAddr := cmd.Flag("http-advertise-address", "Explicit (external) host:port address to advertise for HTTP QueryAPI in gossip cluster. If empty, 'http-address' will be used.").
String()

secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool()
cert := cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").String()
tcolgate marked this conversation as resolved.
Show resolved Hide resolved
key := cmd.Flag("grpc-client-tls-key", "TLS Key for the client's certificate").Default("").String()
caCert := cmd.Flag("grpc-client-tls-ca", "TLS CA Certificates to use to verify gRPC servers").Default("").String()

queryTimeout := modelDuration(cmd.Flag("query.timeout", "Maximum time to process query by query node.").
Default("2m"))

Expand Down Expand Up @@ -83,6 +92,13 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
reg,
tracer,
*grpcBindAddr,
*srvCert,
*srvKey,
*srvClientCA,
*secure,
*cert,
*key,
*caCert,
*httpBindAddr,
*maxConcurrentQueries,
time.Duration(*queryTimeout),
Expand All @@ -95,7 +111,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
}
}

func storeClientGRPCOpts(reg *prometheus.Registry, tracer opentracing.Tracer) []grpc.DialOption {
func storeClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert string) ([]grpc.DialOption, error) {
grpcMets := grpc_prometheus.NewClientMetrics()
grpcMets.EnableClientHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
Expand All @@ -108,7 +124,6 @@ func storeClientGRPCOpts(reg *prometheus.Registry, tracer opentracing.Tracer) []
// Current limit is ~2GB.
// TODO(bplotka): Split sent chunks on store node per max 4MB chunks if needed.
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpcMets.UnaryClientInterceptor(),
Expand All @@ -127,7 +142,50 @@ func storeClientGRPCOpts(reg *prometheus.Registry, tracer opentracing.Tracer) []
reg.MustRegister(grpcMets)
}

return dialOpts
if !secure {
return append(dialOpts, grpc.WithInsecure()), nil
}

level.Info(logger).Log("msg", "Enabling client to server TLS")

var certPool *x509.CertPool

if caCert != "" {
caPEM, err := ioutil.ReadFile(caCert)
if err != nil {
return nil, errors.Wrap(err, "reading client CA")
}

certPool = x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caPEM) {
return nil, errors.Wrap(err, "building client CA")
}
level.Info(logger).Log("msg", "TLS Client using provided certificate pool")
} else {
var err error
certPool, err = x509.SystemCertPool()
tcolgate marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, errors.Wrap(err, "reading system certificate pool")
}
level.Info(logger).Log("msg", "TLS Client using system certificate pool")
}

tlsCfg := &tls.Config{
RootCAs: certPool,
}

if cert != "" {
cert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, errors.Wrap(err, "client credentials")
}
tlsCfg.Certificates = []tls.Certificate{cert}
level.Info(logger).Log("msg", "TLS Client authentication enabled")
}

creds := credentials.NewTLS(tlsCfg)

return append(dialOpts, grpc.WithTransportCredentials(creds)), nil
}

// runQuery starts a server that exposes PromQL Query API. It is responsible for querying configured
Expand All @@ -138,6 +196,13 @@ func runQuery(
reg *prometheus.Registry,
tracer opentracing.Tracer,
grpcBindAddr string,
srvCert string,
srvKey string,
srvClientCA string,
secure bool,
cert string,
key string,
caCert string,
httpBindAddr string,
maxConcurrentQueries int,
queryTimeout time.Duration,
Expand All @@ -155,6 +220,12 @@ func runQuery(

staticSpecs = append(staticSpecs, query.NewGRPCStoreSpec(addr))
}

dialOpts, err := storeClientGRPCOpts(logger, reg, tracer, secure, cert, key, caCert)
if err != nil {
return errors.Wrap(err, "building gRPC client")
}

var (
stores = query.NewStoreSet(
logger,
Expand All @@ -172,7 +243,7 @@ func runQuery(
}
return specs
},
storeClientGRPCOpts(reg, tracer),
dialOpts,
)
proxy = store.NewProxyStore(logger, func(context.Context) ([]store.Client, error) {
return stores.Get(), nil
Expand Down Expand Up @@ -241,7 +312,12 @@ func runQuery(
}
logger := log.With(logger, "component", "query")

s := grpc.NewServer(defaultGRPCServerOpts(logger, reg, tracer)...)
opts, err := defaultGRPCServerOpts(logger, reg, tracer, srvCert, srvKey, srvClientCA)
if err != nil {
return errors.Wrapf(err, "build gRPC server")
}

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, proxy)

g.Add(func() error {
Expand Down
14 changes: 12 additions & 2 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import (
func registerRule(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "ruler evaluating Prometheus rules against given Query nodes, exposing Store API and storing old blocks in bucket")

grpcBindAddr, httpBindAddr, newPeerFn := regCommonServerFlags(cmd)
grpcBindAddr, httpBindAddr, cert, key, clientCA, newPeerFn := regCommonServerFlags(cmd)

labelStrs := cmd.Flag("label", "Labels to be applied to all generated metrics (repeated).").
PlaceHolder("<name>=\"<value>\"").Strings()
Expand Down Expand Up @@ -104,6 +104,9 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
lset,
*alertmgrs,
*grpcBindAddr,
*cert,
*key,
*clientCA,
*httpBindAddr,
time.Duration(*evalInterval),
*dataDir,
Expand All @@ -127,6 +130,9 @@ func runRule(
lset labels.Labels,
alertmgrURLs []string,
grpcBindAddr string,
cert string,
key string,
clientCA string,
httpBindAddr string,
evalInterval time.Duration,
dataDir string,
Expand Down Expand Up @@ -369,7 +375,11 @@ func runRule(

store := store.NewTSDBStore(logger, reg, db, lset)

s := grpc.NewServer(defaultGRPCServerOpts(logger, reg, tracer)...)
opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC options")
}
s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, store)

g.Add(func() error {
Expand Down
14 changes: 12 additions & 2 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "sidecar for Prometheus server")

grpcBindAddr, httpBindAddr, newPeerFn := regCommonServerFlags(cmd)
grpcBindAddr, httpBindAddr, cert, key, clientCA, newPeerFn := regCommonServerFlags(cmd)

promURL := cmd.Flag("prometheus.url", "URL at which to reach Prometheus's API. For better performance use local network.").
Default("http://localhost:9090").URL()
Expand Down Expand Up @@ -71,6 +71,9 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri
reg,
tracer,
*grpcBindAddr,
*cert,
*key,
*clientCA,
*httpBindAddr,
*promURL,
*dataDir,
Expand All @@ -88,6 +91,9 @@ func runSidecar(
reg *prometheus.Registry,
tracer opentracing.Tracer,
grpcBindAddr string,
cert string,
key string,
clientCA string,
httpBindAddr string,
promURL *url.URL,
dataDir string,
Expand Down Expand Up @@ -203,7 +209,11 @@ func runSidecar(
return errors.Wrap(err, "create Prometheus store")
}

s := grpc.NewServer(defaultGRPCServerOpts(logger, reg, tracer)...)
opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, promStore)

g.Add(func() error {
Expand Down
Loading