Skip to content

Commit

Permalink
Allow specifying simple gRPC transport credentials
Browse files Browse the repository at this point in the history
  • Loading branch information
Tristan Colgate committed Sep 7, 2018
1 parent 82a6211 commit c1ff3db
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 18 deletions.
20 changes: 17 additions & 3 deletions cmd/thanos/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,25 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, func(log.Logger, *prometheus.Registry, bool, string, bool) (*cluster.Peer, error)) {
grpcBindAddr := cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components if you use gossip, 'grpc-advertise-address' is empty and you require cross-node connection.").
func regCommonServerFlags(cmd *kingpin.CmdClause) (
grpcBindAddr *string,
httpBindAddr *string,
grpcTLSInsecure *bool,
grpcTLSSrvCert *string,
grpcTLSSrvKey *string,
peerFunc func(log.Logger, *prometheus.Registry, bool, string, bool) (*cluster.Peer, error)) {

grpcBindAddr = cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components if you use gossip, 'grpc-advertise-address' is empty and you require cross-node connection.").
Default("0.0.0.0:10901").String()

grpcAdvertiseAddr := cmd.Flag("grpc-advertise-address", "Explicit (external) host:port address to advertise for gRPC StoreAPI in gossip cluster. If empty, 'grpc-address' will be used.").
String()

httpBindAddr := regHTTPAddrFlag(cmd)
grpcTLSInsecure = cmd.Flag("grpc-tls-insecure", "Do not use gRPC transport credentials to verify connections").Default("true").Bool()
grpcTLSSrvKey = cmd.Flag("grpc-server-tls-key", "TLS Key for the fRPC server").Default("tls.key").String()
grpcTLSSrvCert = cmd.Flag("grpc-server-tls-cert", "TLS Certificate for gRPC server").Default("tls.crt").String()

httpBindAddr = regHTTPAddrFlag(cmd)

clusterBindAddr := cmd.Flag("cluster.address", "Listen ip:port address for gossip cluster.").
Default("0.0.0.0:10900").String()
Expand Down Expand Up @@ -49,6 +60,9 @@ func regCommonServerFlags(cmd *kingpin.CmdClause) (*string, *string, func(log.Lo

return grpcBindAddr,
httpBindAddr,
grpcTLSInsecure,
grpcTLSSrvCert,
grpcTLSSrvKey,
func(logger log.Logger, reg *prometheus.Registry, waitIfEmpty bool, httpAdvertiseAddr string, queryAPIEnabled bool) (*cluster.Peer, error) {
host, port, err := cluster.CalculateAdvertiseAddress(*grpcBindAddr, *grpcAdvertiseAddr)
if err != nil {
Expand Down
18 changes: 16 additions & 2 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/prometheus/common/version"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
Expand Down Expand Up @@ -196,7 +197,7 @@ func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) {
// - request histogram
// - tracing
// - panic recovery with panic counter
func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer) []grpc.ServerOption {
func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, insecure bool, cert, key string) ([]grpc.ServerOption, error) {
met := grpc_prometheus.NewServerMetrics()
met.EnableHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
Expand All @@ -214,7 +215,7 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o
return status.Errorf(codes.Internal, "%s", p)
}
reg.MustRegister(met, panicsTotal)
return []grpc.ServerOption{
opts := []grpc.ServerOption{
grpc.MaxSendMsgSize(math.MaxInt32),
grpc_middleware.WithUnaryServerChain(
met.UnaryServerInterceptor(),
Expand All @@ -227,6 +228,19 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o
grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
}

if insecure {
return opts, nil
}

creds, err := credentials.NewServerTLSFromFile(cert, key)
if err != nil {
return nil, errors.Wrap(err, "server transport credentials")
}

opts = append(opts, grpc.Creds(creds))

return opts, nil
}

// metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics.
Expand Down
45 changes: 39 additions & 6 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,21 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"gopkg.in/alecthomas/kingpin.v2"
)

// registerQuery registers a query command.
func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes")

grpcBindAddr, httpBindAddr, newPeerFn := regCommonServerFlags(cmd)
grpcBindAddr, httpBindAddr, insecure, cert, key, newPeerFn := regCommonServerFlags(cmd)

httpAdvertiseAddr := cmd.Flag("http-advertise-address", "Explicit (external) host:port address to advertise for HTTP QueryAPI in gossip cluster. If empty, 'http-address' will be used.").
String()

cacert := cmd.Flag("grpc-client-tls-ca", "TLS CA Certificates to use for verifying gRPC servers").Default("ca.crt").String()

queryTimeout := cmd.Flag("query.timeout", "Maximum time to process query by query node.").
Default("2m").Duration()

Expand Down Expand Up @@ -83,6 +86,10 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
reg,
tracer,
*grpcBindAddr,
*insecure,
*cacert,
*cert,
*key,
*httpBindAddr,
*maxConcurrentQueries,
*queryTimeout,
Expand All @@ -95,7 +102,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
}
}

func storeClientGRPCOpts(reg *prometheus.Registry, tracer opentracing.Tracer) []grpc.DialOption {
func storeClientGRPCOpts(reg *prometheus.Registry, tracer opentracing.Tracer, insecure bool, cacert string) ([]grpc.DialOption, error) {
grpcMets := grpc_prometheus.NewClientMetrics()
grpcMets.EnableClientHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
Expand All @@ -108,7 +115,6 @@ func storeClientGRPCOpts(reg *prometheus.Registry, tracer opentracing.Tracer) []
// Current limit is ~2GB.
// TODO(bplotka): Split sent chunks on store node per max 4MB chunks if needed.
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpcMets.UnaryClientInterceptor(),
Expand All @@ -127,7 +133,19 @@ func storeClientGRPCOpts(reg *prometheus.Registry, tracer opentracing.Tracer) []
reg.MustRegister(grpcMets)
}

return dialOpts
if insecure {
dialOpts = append(dialOpts, grpc.WithInsecure())
return dialOpts, nil
}

creds, err := credentials.NewClientTLSFromFile(cacert, "")
if err != nil {
return nil, errors.Wrap(err, "client transport credentials")
}

dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))

return dialOpts, nil
}

// runQuery starts a server that exposes PromQL Query API. It is responsible for querying configured
Expand All @@ -138,6 +156,10 @@ func runQuery(
reg *prometheus.Registry,
tracer opentracing.Tracer,
grpcBindAddr string,
insecure bool,
cacert string,
cert string,
key string,
httpBindAddr string,
maxConcurrentQueries int,
queryTimeout time.Duration,
Expand All @@ -155,6 +177,12 @@ func runQuery(

staticSpecs = append(staticSpecs, query.NewGRPCStoreSpec(addr))
}

dialOpts, err := storeClientGRPCOpts(reg, tracer, insecure, cacert)
if err != nil {
return errors.Wrap(err, "building gRPC client")
}

var (
stores = query.NewStoreSet(
logger,
Expand All @@ -172,7 +200,7 @@ func runQuery(
}
return specs
},
storeClientGRPCOpts(reg, tracer),
dialOpts,
)
proxy = store.NewProxyStore(logger, func(context.Context) ([]store.Client, error) {
return stores.Get(), nil
Expand Down Expand Up @@ -241,7 +269,12 @@ func runQuery(
}
logger := log.With(logger, "component", "query")

s := grpc.NewServer(defaultGRPCServerOpts(logger, reg, tracer)...)
opts, err := defaultGRPCServerOpts(logger, reg, tracer, insecure, cert, key)
if err != nil {
return errors.Wrapf(err, "build gRPC server")
}

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, proxy)

g.Add(func() error {
Expand Down
13 changes: 10 additions & 3 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import (
func registerRule(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "ruler evaluating Prometheus rules against given Query nodes, exposing Store API and storing old blocks in bucket")

grpcBindAddr, httpBindAddr, newPeerFn := regCommonServerFlags(cmd)
grpcBindAddr, httpBindAddr, insecure, cert, key, newPeerFn := regCommonServerFlags(cmd)

labelStrs := cmd.Flag("label", "Labels to be applied to all generated metrics (repeated).").
PlaceHolder("<name>=\"<value>\"").Strings()
Expand Down Expand Up @@ -100,7 +100,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
NoLockfile: true,
WALFlushInterval: 30 * time.Second,
}
return runRule(g, logger, reg, tracer, lset, *alertmgrs, *grpcBindAddr, *httpBindAddr, *evalInterval, *dataDir, *ruleFiles, peer, *gcsBucket, s3Config, tsdbOpts, name, alertQueryURL)
return runRule(g, logger, reg, tracer, lset, *alertmgrs, *grpcBindAddr, *insecure, *cert, *key, *httpBindAddr, *evalInterval, *dataDir, *ruleFiles, peer, *gcsBucket, s3Config, tsdbOpts, name, alertQueryURL)
}
}

Expand All @@ -114,6 +114,9 @@ func runRule(
lset labels.Labels,
alertmgrURLs []string,
grpcBindAddr string,
insecure bool,
cert string,
key string,
httpBindAddr string,
evalInterval time.Duration,
dataDir string,
Expand Down Expand Up @@ -357,7 +360,11 @@ func runRule(

store := store.NewTSDBStore(logger, reg, db, lset)

s := grpc.NewServer(defaultGRPCServerOpts(logger, reg, tracer)...)
opts, err := defaultGRPCServerOpts(logger, reg, tracer, insecure, cert, key)
if err != nil {
return errors.Wrap(err, "setup gRPC options")
}
s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, store)

g.Add(func() error {
Expand Down
14 changes: 12 additions & 2 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "sidecar for Prometheus server")

grpcBindAddr, httpBindAddr, newPeerFn := regCommonServerFlags(cmd)
grpcBindAddr, httpBindAddr, insecure, cert, key, newPeerFn := regCommonServerFlags(cmd)

promURL := cmd.Flag("prometheus.url", "URL at which to reach Prometheus's API. For better performance use local network.").
Default("http://localhost:9090").URL()
Expand Down Expand Up @@ -74,6 +74,9 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri
reg,
tracer,
*grpcBindAddr,
*insecure,
*cert,
*key,
*httpBindAddr,
*promURL,
*dataDir,
Expand All @@ -92,6 +95,9 @@ func runSidecar(
reg *prometheus.Registry,
tracer opentracing.Tracer,
grpcBindAddr string,
insecure bool,
cert string,
key string,
httpBindAddr string,
promURL *url.URL,
dataDir string,
Expand Down Expand Up @@ -208,7 +214,11 @@ func runSidecar(
return errors.Wrap(err, "create Prometheus store")
}

s := grpc.NewServer(defaultGRPCServerOpts(logger, reg, tracer)...)
opts, err := defaultGRPCServerOpts(logger, reg, tracer, insecure, cert, key)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, promStore)

g.Add(func() error {
Expand Down
15 changes: 13 additions & 2 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
func registerStore(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "store node giving access to blocks in a GCS bucket")

grpcBindAddr, httpBindAddr, newPeerFn := regCommonServerFlags(cmd)
grpcBindAddr, httpBindAddr, insecure, cert, key, newPeerFn := regCommonServerFlags(cmd)

dataDir := cmd.Flag("data-dir", "Data directory in which to cache remote blocks.").
Default("./data").String()
Expand Down Expand Up @@ -55,6 +55,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
s3Config,
*dataDir,
*grpcBindAddr,
*insecure,
*cert,
*key,
*httpBindAddr,
peer,
uint64(*indexCacheSize),
Expand All @@ -75,6 +78,9 @@ func runStore(
s3Config *s3.Config,
dataDir string,
grpcBindAddr string,
insecure bool,
cert string,
key string,
httpBindAddr string,
peer *cluster.Peer,
indexCacheSizeBytes uint64,
Expand Down Expand Up @@ -138,7 +144,12 @@ func runStore(
return errors.Wrap(err, "listen API address")
}

s := grpc.NewServer(defaultGRPCServerOpts(logger, reg, tracer)...)
opts, err := defaultGRPCServerOpts(logger, reg, tracer, insecure, key, cert)
if err != nil {
return errors.Wrap(err, "grpc server options")
}

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, bs)

g.Add(func() error {
Expand Down

0 comments on commit c1ff3db

Please sign in to comment.