Skip to content

Commit

Permalink
Watch all certs
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay committed Aug 18, 2020
1 parent b0b408e commit 3b5408d
Show file tree
Hide file tree
Showing 33 changed files with 380 additions and 236 deletions.
2 changes: 1 addition & 1 deletion cmd/agent/app/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ processors:
- model: jaeger
protocol: compact
server:
hostPort: 3.3.3.3:6831
hostPort: 3.3.3.3:6831
socketBufferSize: 16384
- model: jaeger
protocol: binary
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/reporter/grpc/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (b *ConnBuilder) CreateConnection(logger *zap.Logger) (*grpc.ClientConn, er
var dialTarget string
if b.TLS.Enabled { // user requested a secure connection
logger.Info("Agent requested secure grpc connection to collector(s)")
tlsConf, err := b.TLS.Config()
tlsConf, err := b.TLS.Config(logger)
if err != nil {
return nil, fmt.Errorf("failed to load TLS config: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/reporter/grpc/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func TestProxyClientTLS(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
var opts []grpc.ServerOption
if test.serverTLS.Enabled {
tlsCfg, err := test.serverTLS.Config()
tlsCfg, err := test.serverTLS.Config(zap.NewNop())
require.NoError(t, err)
opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(tlsCfg))}
}
Expand Down
17 changes: 11 additions & 6 deletions cmd/agent/app/reporter/grpc/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package grpc

import (
"io"

"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand All @@ -26,9 +28,10 @@ import (

// ProxyBuilder holds objects communicating with collector
type ProxyBuilder struct {
reporter *reporter.ClientMetricsReporter
manager configmanager.ClientConfigManager
conn *grpc.ClientConn
reporter *reporter.ClientMetricsReporter
manager configmanager.ClientConfigManager
conn *grpc.ClientConn
tlsCloser io.Closer
}

// NewCollectorProxy creates ProxyBuilder
Expand All @@ -46,9 +49,10 @@ func NewCollectorProxy(builder *ConnBuilder, agentTags map[string]string, mFacto
MetricsFactory: mFactory,
})
return &ProxyBuilder{
conn: conn,
reporter: r3,
manager: configmanager.WrapWithMetrics(grpcManager.NewConfigManager(conn), grpcMetrics),
conn: conn,
reporter: r3,
manager: configmanager.WrapWithMetrics(grpcManager.NewConfigManager(conn), grpcMetrics),
tlsCloser: builder.TLS,
}, nil
}

Expand All @@ -70,5 +74,6 @@ func (b ProxyBuilder) GetManager() configmanager.ClientConfigManager {
// Close closes connections used by proxy.
func (b ProxyBuilder) Close() error {
b.reporter.Close()
b.tlsCloser.Close()
return b.conn.Close()
}
3 changes: 3 additions & 0 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ by default uses only in-memory database.`,
logger.Error("Failed to close span writer", zap.Error(err))
}
}
if err := storageFactory.Close(); err != nil {
logger.Error("failed to close storage factory", zap.Error(err))
}
tracerCloser.Close()
})
return nil
Expand Down
7 changes: 7 additions & 0 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package app

import (
"context"
"io"
"net/http"
"time"

Expand Down Expand Up @@ -46,6 +47,7 @@ type Collector struct {
hServer *http.Server
zkServer *http.Server
grpcServer *grpc.Server
tlsCloser io.Closer
}

// CollectorParams to construct a new Jaeger Collector.
Expand Down Expand Up @@ -107,6 +109,7 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
c.hServer = httpServer
}

c.tlsCloser = builderOpts.TLS
if zkServer, err := server.StartZipkinServer(&server.ZipkinServerParams{
HostPort: builderOpts.CollectorZipkinHTTPHostPort,
Handler: c.spanHandlers.ZipkinSpansHandler,
Expand Down Expand Up @@ -154,6 +157,10 @@ func (c *Collector) Close() error {
c.logger.Error("failed to close span processor.", zap.Error(err))
}

if err := c.tlsCloser.Close(); err != nil {
c.logger.Error("failed to close TLS certificate watcher", zap.Error(err))
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func StartGRPCServer(params *GRPCServerParams) (*grpc.Server, error) {

if params.TLSConfig.Enabled {
// user requested a server with TLS, setup creds
tlsCfg, err := params.TLSConfig.Config()
tlsCfg, err := params.TLSConfig.Config(params.Logger)
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ func main() {
logger.Error("failed to close span writer", zap.Error(err))
}
}
if err := storageFactory.Close(); err != nil {
logger.Error("failed to close storage factory", zap.Error(err))
}

})
return nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
ProtocolVersion: options.ProtocolVersion,
AuthenticationConfig: options.AuthenticationConfig,
}
saramaConsumer, err := consumerConfig.NewConsumer()
saramaConsumer, err := consumerConfig.NewConsumer(logger)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions cmd/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func main() {
consumer.Start()

svc.RunAndThen(func() {
if err := options.TLS.Close(); err != nil {
logger.Error("Failed to close TLS certificates wathcer", zap.Error(err))
}
if err = consumer.Close(); err != nil {
logger.Error("Failed to close consumer", zap.Error(err))
}
Expand All @@ -85,6 +88,9 @@ func main() {
logger.Error("Failed to close span writer", zap.Error(err))
}
}
if err := storageFactory.Close(); err != nil {
logger.Error("failed to close storage factory", zap.Error(err))
}
})
return nil
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package elasticsearchexporter

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterhelper"

Expand All @@ -36,5 +38,8 @@ func new(config *Config, params component.ExporterCreateParams) (component.Trace
}
return exporterhelper.NewTraceExporter(
config,
w.WriteTraces)
w.WriteTraces,
exporterhelper.WithShutdown(func(ctx context.Context) error {
return esCfg.TLS.Close()
}))
}
23 changes: 15 additions & 8 deletions cmd/opentelemetry/cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func main() {
if exp == nil {
svc.ReportFatalError(fmt.Errorf("exporter type for storage %s not found", storageType))
}
queryServer, tracerCloser, err := startQuery(v, svc.GetLogger(), exp)
queryServer, tracerCloser, storageCloser, err := startQuery(v, svc.GetLogger(), exp)
if err != nil {
svc.ReportFatalError(err)
}
Expand All @@ -157,6 +157,9 @@ func main() {
if tracerCloser != nil {
tracerCloser.Close()
}
if storageCloser != nil {
storageCloser.Close()
}
} else if state == service.Closed {
break
}
Expand All @@ -177,18 +180,18 @@ func getStorageExporter(storageType string, exporters map[configmodels.Exporter]
return nil
}

func startQuery(v *viper.Viper, logger *zap.Logger, exporter configmodels.Exporter) (*queryApp.Server, io.Closer, error) {
func startQuery(v *viper.Viper, logger *zap.Logger, exporter configmodels.Exporter) (*queryApp.Server, io.Closer, io.Closer, error) {
storageFactory, err := getFactory(exporter, v, logger)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
spanReader, err := storageFactory.CreateSpanReader()
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
dependencyReader, err := storageFactory.CreateDependencyReader()
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
queryOpts := new(queryApp.QueryOptions).InitFromViper(v, logger)
queryServiceOptions := queryOpts.BuildQueryServiceOptions(storageFactory, logger)
Expand All @@ -200,12 +203,16 @@ func startQuery(v *viper.Viper, logger *zap.Logger, exporter configmodels.Export
tracerCloser := initTracer(logger)
server, err := queryApp.NewServer(logger, queryService, queryOpts, opentracing.GlobalTracer())
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if err := server.Start(); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
var storageCloser io.Closer
if closer, ok := storageFactory.(io.Closer); ok {
storageCloser = closer
}
return server, tracerCloser, nil
return server, tracerCloser, storageCloser, nil
}

func getFactory(exporter configmodels.Exporter, v *viper.Viper, logger *zap.Logger) (storage.Factory, error) {
Expand Down
3 changes: 2 additions & 1 deletion cmd/opentelemetry/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ replace k8s.io/client-go => k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
replace github.com/jaegertracing/jaeger => ./../../

require (
github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f
github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798 // indirect
github.com/Shopify/sarama v1.26.4
github.com/elastic/go-elasticsearch/v6 v6.8.10
github.com/elastic/go-elasticsearch/v7 v7.0.0
github.com/golang/protobuf v1.4.2 // indirect
Expand Down
6 changes: 6 additions & 0 deletions cmd/opentelemetry/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f h1:SgZvxOvp9NLnAjkIiby0LQgXH0yQNTk2eDzbYPVoTA4=
github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs=
github.com/Shopify/sarama v1.26.4 h1:+17TxUq/PJEAfZAll0T7XJjSgQWCpaQSoki/x5yN8o8=
github.com/Shopify/sarama v1.26.4/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/Songmu/retry v0.1.0 h1:hPA5xybQsksLR/ry/+t/7cFajPW+dqjmjhzZhioBILA=
Expand Down Expand Up @@ -228,6 +230,7 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o=
github.com/frankban/quicktest v1.7.3 h1:kV0lw0TH1j1hozahVmcpFCsbV5hcS4ZalH+U7UoeTow=
github.com/frankban/quicktest v1.7.3/go.mod h1:V1d2J5pfxYH6EjBAgSK7YNXcXlTWxUHdE1sVDXkjnig=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
Expand Down Expand Up @@ -869,7 +872,9 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL
github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.10.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.10.5 h1:7q6vHIqubShURwQz8cQK6yIe/xC3IF0Vm7TGfqjewrc=
github.com/klauspost/compress v1.10.5/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -1477,6 +1482,7 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200117160349-530e935923ad/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200214034016-1d94cc7ab1c6 h1:Sy5bstxEqwwbYs6n0/pBuxKENqOeZUgD45Gp3Q3pqLg=
golang.org/x/crypto v0.0.0-20200214034016-1d94cc7ab1c6/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200320181102-891825fb96df h1:lDWgvUvNnaTnNBc/dwOty86cFeKoKWbwy2wQj0gIxbU=
Expand Down
3 changes: 2 additions & 1 deletion cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func createGRPCServer(querySvc *querysvc.QueryService, options *QueryOptions, lo
var grpcOpts []grpc.ServerOption

if options.TLS.Enabled {
tlsCfg, err := options.TLS.Config()
tlsCfg, err := options.TLS.Config(logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -183,6 +183,7 @@ func (s *Server) Start() error {

// Close stops http, GRPC servers and closes the port listener.
func (s *Server) Close() {
s.queryOptions.TLS.Close()
s.grpcServer.Stop()
s.httpServer.Close()
s.conn.Close()
Expand Down
3 changes: 3 additions & 0 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func main() {

svc.RunAndThen(func() {
server.Close()
if err := storageFactory.Close(); err != nil {
logger.Error("failed to close storage factory", zap.Error(err))
}
})
return nil
},
Expand Down
27 changes: 14 additions & 13 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
package config

import (
"crypto/tls"
"fmt"
"time"

"github.com/gocql/gocql"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/cassandra"
gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql"
Expand Down Expand Up @@ -90,12 +90,15 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {

// SessionBuilder creates new cassandra.Session
type SessionBuilder interface {
NewSession() (cassandra.Session, error)
NewSession(logger *zap.Logger) (cassandra.Session, error)
}

// NewSession creates a new Cassandra session
func (c *Configuration) NewSession() (cassandra.Session, error) {
cluster := c.NewCluster()
func (c *Configuration) NewSession(logger *zap.Logger) (cassandra.Session, error) {
cluster, err := c.NewCluster(logger)
if err != nil {
return nil, err
}
session, err := cluster.CreateSession()
if err != nil {
return nil, err
Expand All @@ -104,7 +107,7 @@ func (c *Configuration) NewSession() (cassandra.Session, error) {
}

// NewCluster creates a new gocql cluster from the configuration
func (c *Configuration) NewCluster() *gocql.ClusterConfig {
func (c *Configuration) NewCluster(logger *zap.Logger) (*gocql.ClusterConfig, error) {
cluster := gocql.NewCluster(c.Servers...)
cluster.Keyspace = c.Keyspace
cluster.NumConns = c.ConnectionsPerHost
Expand Down Expand Up @@ -144,23 +147,21 @@ func (c *Configuration) NewCluster() *gocql.ClusterConfig {
Password: c.Authenticator.Basic.Password,
}
}
tlsCfg, err := c.TLS.Config(logger)
if err != nil {
return nil, err
}
if c.TLS.Enabled {
cluster.SslOpts = &gocql.SslOptions{
Config: &tls.Config{
ServerName: c.TLS.ServerName,
},
CertPath: c.TLS.CertPath,
KeyPath: c.TLS.KeyPath,
CaPath: c.TLS.CAPath,
EnableHostVerification: !c.TLS.SkipHostVerify,
Config: tlsCfg,
}
}
// If tunneling connection to C*, disable cluster autodiscovery features.
if c.DisableAutoDiscovery {
cluster.DisableInitialHostLookup = true
cluster.IgnorePeerAddr = true
}
return cluster
return cluster, nil
}

func (c *Configuration) String() string {
Expand Down
Loading

0 comments on commit 3b5408d

Please sign in to comment.