From 3b5408d20ef06f4dd7a504d88cb09564e3c2a89f Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Tue, 18 Aug 2020 11:25:29 +0200 Subject: [PATCH] Watch all certs Signed-off-by: Pavol Loffay --- cmd/agent/app/builder_test.go | 2 +- cmd/agent/app/reporter/grpc/builder.go | 2 +- cmd/agent/app/reporter/grpc/builder_test.go | 2 +- .../app/reporter/grpc/collector_proxy.go | 17 +- cmd/all-in-one/main.go | 3 + cmd/collector/app/collector.go | 7 + cmd/collector/app/server/grpc.go | 2 +- cmd/collector/main.go | 3 + cmd/ingester/app/builder/builder.go | 2 +- cmd/ingester/main.go | 6 + .../elasticsearchexporter/exporter.go | 7 +- cmd/opentelemetry/cmd/all-in-one/main.go | 23 +- cmd/opentelemetry/go.mod | 3 +- cmd/opentelemetry/go.sum | 6 + cmd/query/app/server.go | 3 +- cmd/query/main.go | 3 + pkg/cassandra/config/config.go | 27 +-- pkg/config/tlscfg/options.go | 51 +++-- pkg/config/tlscfg/options_test.go | 3 +- pkg/config/tlscfg/reload.go | 150 ++++++++----- pkg/config/tlscfg/reload_test.go | 199 ++++++++---------- pkg/es/config/config.go | 9 +- pkg/kafka/auth/config.go | 5 +- pkg/kafka/auth/tls.go | 5 +- pkg/kafka/consumer/config.go | 5 +- pkg/kafka/producer/config.go | 7 +- plugin/storage/cassandra/factory.go | 16 +- plugin/storage/cassandra/factory_test.go | 2 +- .../storage/cassandra/savetracetest/main.go | 2 +- plugin/storage/es/factory.go | 11 + plugin/storage/factory.go | 21 ++ plugin/storage/kafka/factory.go | 10 +- plugin/storage/kafka/factory_test.go | 2 +- 33 files changed, 380 insertions(+), 236 deletions(-) diff --git a/cmd/agent/app/builder_test.go b/cmd/agent/app/builder_test.go index 58503c72bb9..7d2dd74500d 100644 --- a/cmd/agent/app/builder_test.go +++ b/cmd/agent/app/builder_test.go @@ -57,7 +57,7 @@ processors: - model: jaeger protocol: compact server: - hostPort: 3.3.3.3:6831 + hostPort: 3.3.3.3:6831 socketBufferSize: 16384 - model: jaeger protocol: binary diff --git a/cmd/agent/app/reporter/grpc/builder.go b/cmd/agent/app/reporter/grpc/builder.go index b6a50fd4739..2bcb6796f49 100644 --- a/cmd/agent/app/reporter/grpc/builder.go +++ b/cmd/agent/app/reporter/grpc/builder.go @@ -55,7 +55,7 @@ func (b *ConnBuilder) CreateConnection(logger *zap.Logger) (*grpc.ClientConn, er var dialTarget string if b.TLS.Enabled { // user requested a secure connection logger.Info("Agent requested secure grpc connection to collector(s)") - tlsConf, err := b.TLS.Config() + tlsConf, err := b.TLS.Config(logger) if err != nil { return nil, fmt.Errorf("failed to load TLS config: %w", err) } diff --git a/cmd/agent/app/reporter/grpc/builder_test.go b/cmd/agent/app/reporter/grpc/builder_test.go index c5171e49183..5e4ef0608e4 100644 --- a/cmd/agent/app/reporter/grpc/builder_test.go +++ b/cmd/agent/app/reporter/grpc/builder_test.go @@ -302,7 +302,7 @@ func TestProxyClientTLS(t *testing.T) { t.Run(test.name, func(t *testing.T) { var opts []grpc.ServerOption if test.serverTLS.Enabled { - tlsCfg, err := test.serverTLS.Config() + tlsCfg, err := test.serverTLS.Config(zap.NewNop()) require.NoError(t, err) opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(tlsCfg))} } diff --git a/cmd/agent/app/reporter/grpc/collector_proxy.go b/cmd/agent/app/reporter/grpc/collector_proxy.go index faad4b87c4b..b26b718dd8b 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy.go @@ -15,6 +15,8 @@ package grpc import ( + "io" + "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" "google.golang.org/grpc" @@ -26,9 +28,10 @@ import ( // ProxyBuilder holds objects communicating with collector type ProxyBuilder struct { - reporter *reporter.ClientMetricsReporter - manager configmanager.ClientConfigManager - conn *grpc.ClientConn + reporter *reporter.ClientMetricsReporter + manager configmanager.ClientConfigManager + conn *grpc.ClientConn + tlsCloser io.Closer } // NewCollectorProxy creates ProxyBuilder @@ -46,9 +49,10 @@ func NewCollectorProxy(builder *ConnBuilder, agentTags map[string]string, mFacto MetricsFactory: mFactory, }) return &ProxyBuilder{ - conn: conn, - reporter: r3, - manager: configmanager.WrapWithMetrics(grpcManager.NewConfigManager(conn), grpcMetrics), + conn: conn, + reporter: r3, + manager: configmanager.WrapWithMetrics(grpcManager.NewConfigManager(conn), grpcMetrics), + tlsCloser: builder.TLS, }, nil } @@ -70,5 +74,6 @@ func (b ProxyBuilder) GetManager() configmanager.ClientConfigManager { // Close closes connections used by proxy. func (b ProxyBuilder) Close() error { b.reporter.Close() + b.tlsCloser.Close() return b.conn.Close() } diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 64b054e36c2..88f31f7e4af 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -165,6 +165,9 @@ by default uses only in-memory database.`, logger.Error("Failed to close span writer", zap.Error(err)) } } + if err := storageFactory.Close(); err != nil { + logger.Error("failed to close storage factory", zap.Error(err)) + } tracerCloser.Close() }) return nil diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index 6fc197dd63d..3617c79429e 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -16,6 +16,7 @@ package app import ( "context" + "io" "net/http" "time" @@ -46,6 +47,7 @@ type Collector struct { hServer *http.Server zkServer *http.Server grpcServer *grpc.Server + tlsCloser io.Closer } // CollectorParams to construct a new Jaeger Collector. @@ -107,6 +109,7 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error { c.hServer = httpServer } + c.tlsCloser = builderOpts.TLS if zkServer, err := server.StartZipkinServer(&server.ZipkinServerParams{ HostPort: builderOpts.CollectorZipkinHTTPHostPort, Handler: c.spanHandlers.ZipkinSpansHandler, @@ -154,6 +157,10 @@ func (c *Collector) Close() error { c.logger.Error("failed to close span processor.", zap.Error(err)) } + if err := c.tlsCloser.Close(); err != nil { + c.logger.Error("failed to close TLS certificate watcher", zap.Error(err)) + } + return nil } diff --git a/cmd/collector/app/server/grpc.go b/cmd/collector/app/server/grpc.go index ce68e1cf5c1..4e3849bd70f 100644 --- a/cmd/collector/app/server/grpc.go +++ b/cmd/collector/app/server/grpc.go @@ -45,7 +45,7 @@ func StartGRPCServer(params *GRPCServerParams) (*grpc.Server, error) { if params.TLSConfig.Enabled { // user requested a server with TLS, setup creds - tlsCfg, err := params.TLSConfig.Config() + tlsCfg, err := params.TLSConfig.Config(params.Logger) if err != nil { return nil, err } diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 2b38a732951..56cd3769ff7 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -104,6 +104,9 @@ func main() { logger.Error("failed to close span writer", zap.Error(err)) } } + if err := storageFactory.Close(); err != nil { + logger.Error("failed to close storage factory", zap.Error(err)) + } }) return nil diff --git a/cmd/ingester/app/builder/builder.go b/cmd/ingester/app/builder/builder.go index 4d2e86d100c..d5cd0efc8ad 100644 --- a/cmd/ingester/app/builder/builder.go +++ b/cmd/ingester/app/builder/builder.go @@ -58,7 +58,7 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit ProtocolVersion: options.ProtocolVersion, AuthenticationConfig: options.AuthenticationConfig, } - saramaConsumer, err := consumerConfig.NewConsumer() + saramaConsumer, err := consumerConfig.NewConsumer(logger) if err != nil { return nil, err } diff --git a/cmd/ingester/main.go b/cmd/ingester/main.go index 3a2329901a0..377811c6874 100644 --- a/cmd/ingester/main.go +++ b/cmd/ingester/main.go @@ -76,6 +76,9 @@ func main() { consumer.Start() svc.RunAndThen(func() { + if err := options.TLS.Close(); err != nil { + logger.Error("Failed to close TLS certificates wathcer", zap.Error(err)) + } if err = consumer.Close(); err != nil { logger.Error("Failed to close consumer", zap.Error(err)) } @@ -85,6 +88,9 @@ func main() { logger.Error("Failed to close span writer", zap.Error(err)) } } + if err := storageFactory.Close(); err != nil { + logger.Error("failed to close storage factory", zap.Error(err)) + } }) return nil }, diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go index 393b3b5b809..554ab2a0426 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go @@ -15,6 +15,8 @@ package elasticsearchexporter import ( + "context" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -36,5 +38,8 @@ func new(config *Config, params component.ExporterCreateParams) (component.Trace } return exporterhelper.NewTraceExporter( config, - w.WriteTraces) + w.WriteTraces, + exporterhelper.WithShutdown(func(ctx context.Context) error { + return esCfg.TLS.Close() + })) } diff --git a/cmd/opentelemetry/cmd/all-in-one/main.go b/cmd/opentelemetry/cmd/all-in-one/main.go index f145b8dc356..de3cde965ec 100644 --- a/cmd/opentelemetry/cmd/all-in-one/main.go +++ b/cmd/opentelemetry/cmd/all-in-one/main.go @@ -145,7 +145,7 @@ func main() { if exp == nil { svc.ReportFatalError(fmt.Errorf("exporter type for storage %s not found", storageType)) } - queryServer, tracerCloser, err := startQuery(v, svc.GetLogger(), exp) + queryServer, tracerCloser, storageCloser, err := startQuery(v, svc.GetLogger(), exp) if err != nil { svc.ReportFatalError(err) } @@ -157,6 +157,9 @@ func main() { if tracerCloser != nil { tracerCloser.Close() } + if storageCloser != nil { + storageCloser.Close() + } } else if state == service.Closed { break } @@ -177,18 +180,18 @@ func getStorageExporter(storageType string, exporters map[configmodels.Exporter] return nil } -func startQuery(v *viper.Viper, logger *zap.Logger, exporter configmodels.Exporter) (*queryApp.Server, io.Closer, error) { +func startQuery(v *viper.Viper, logger *zap.Logger, exporter configmodels.Exporter) (*queryApp.Server, io.Closer, io.Closer, error) { storageFactory, err := getFactory(exporter, v, logger) if err != nil { - return nil, nil, err + return nil, nil, nil, err } spanReader, err := storageFactory.CreateSpanReader() if err != nil { - return nil, nil, err + return nil, nil, nil, err } dependencyReader, err := storageFactory.CreateDependencyReader() if err != nil { - return nil, nil, err + return nil, nil, nil, err } queryOpts := new(queryApp.QueryOptions).InitFromViper(v, logger) queryServiceOptions := queryOpts.BuildQueryServiceOptions(storageFactory, logger) @@ -200,12 +203,16 @@ func startQuery(v *viper.Viper, logger *zap.Logger, exporter configmodels.Export tracerCloser := initTracer(logger) server, err := queryApp.NewServer(logger, queryService, queryOpts, opentracing.GlobalTracer()) if err != nil { - return nil, nil, err + return nil, nil, nil, err } if err := server.Start(); err != nil { - return nil, nil, err + return nil, nil, nil, err + } + var storageCloser io.Closer + if closer, ok := storageFactory.(io.Closer); ok { + storageCloser = closer } - return server, tracerCloser, nil + return server, tracerCloser, storageCloser, nil } func getFactory(exporter configmodels.Exporter, v *viper.Viper, logger *zap.Logger) (storage.Factory, error) { diff --git a/cmd/opentelemetry/go.mod b/cmd/opentelemetry/go.mod index 316e7c29a54..b04a2c00a24 100644 --- a/cmd/opentelemetry/go.mod +++ b/cmd/opentelemetry/go.mod @@ -7,7 +7,8 @@ replace k8s.io/client-go => k8s.io/client-go v0.0.0-20190620085101-78d2af792bab replace github.com/jaegertracing/jaeger => ./../../ require ( - github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f + github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798 // indirect + github.com/Shopify/sarama v1.26.4 github.com/elastic/go-elasticsearch/v6 v6.8.10 github.com/elastic/go-elasticsearch/v7 v7.0.0 github.com/golang/protobuf v1.4.2 // indirect diff --git a/cmd/opentelemetry/go.sum b/cmd/opentelemetry/go.sum index 1bbbc12f8b7..20dcfdff3e9 100644 --- a/cmd/opentelemetry/go.sum +++ b/cmd/opentelemetry/go.sum @@ -63,6 +63,8 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f h1:SgZvxOvp9NLnAjkIiby0LQgXH0yQNTk2eDzbYPVoTA4= github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs= +github.com/Shopify/sarama v1.26.4 h1:+17TxUq/PJEAfZAll0T7XJjSgQWCpaQSoki/x5yN8o8= +github.com/Shopify/sarama v1.26.4/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/Songmu/retry v0.1.0 h1:hPA5xybQsksLR/ry/+t/7cFajPW+dqjmjhzZhioBILA= @@ -228,6 +230,7 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8 github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= +github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o= github.com/frankban/quicktest v1.7.3 h1:kV0lw0TH1j1hozahVmcpFCsbV5hcS4ZalH+U7UoeTow= github.com/frankban/quicktest v1.7.3/go.mod h1:V1d2J5pfxYH6EjBAgSK7YNXcXlTWxUHdE1sVDXkjnig= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= @@ -869,7 +872,9 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.10.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/compress v1.10.5 h1:7q6vHIqubShURwQz8cQK6yIe/xC3IF0Vm7TGfqjewrc= github.com/klauspost/compress v1.10.5/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -1477,6 +1482,7 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200117160349-530e935923ad/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200214034016-1d94cc7ab1c6 h1:Sy5bstxEqwwbYs6n0/pBuxKENqOeZUgD45Gp3Q3pqLg= golang.org/x/crypto v0.0.0-20200214034016-1d94cc7ab1c6/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200320181102-891825fb96df h1:lDWgvUvNnaTnNBc/dwOty86cFeKoKWbwy2wQj0gIxbU= diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index 3e4b6756462..c899ed96f10 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -74,7 +74,7 @@ func createGRPCServer(querySvc *querysvc.QueryService, options *QueryOptions, lo var grpcOpts []grpc.ServerOption if options.TLS.Enabled { - tlsCfg, err := options.TLS.Config() + tlsCfg, err := options.TLS.Config(logger) if err != nil { return nil, err } @@ -183,6 +183,7 @@ func (s *Server) Start() error { // Close stops http, GRPC servers and closes the port listener. func (s *Server) Close() { + s.queryOptions.TLS.Close() s.grpcServer.Stop() s.httpServer.Close() s.conn.Close() diff --git a/cmd/query/main.go b/cmd/query/main.go index b7e4987ce1b..4afe37d2257 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -123,6 +123,9 @@ func main() { svc.RunAndThen(func() { server.Close() + if err := storageFactory.Close(); err != nil { + logger.Error("failed to close storage factory", zap.Error(err)) + } }) return nil }, diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index 0af2954bc6d..c2ea9904cd7 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -16,11 +16,11 @@ package config import ( - "crypto/tls" "fmt" "time" "github.com/gocql/gocql" + "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/cassandra" gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql" @@ -90,12 +90,15 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { // SessionBuilder creates new cassandra.Session type SessionBuilder interface { - NewSession() (cassandra.Session, error) + NewSession(logger *zap.Logger) (cassandra.Session, error) } // NewSession creates a new Cassandra session -func (c *Configuration) NewSession() (cassandra.Session, error) { - cluster := c.NewCluster() +func (c *Configuration) NewSession(logger *zap.Logger) (cassandra.Session, error) { + cluster, err := c.NewCluster(logger) + if err != nil { + return nil, err + } session, err := cluster.CreateSession() if err != nil { return nil, err @@ -104,7 +107,7 @@ func (c *Configuration) NewSession() (cassandra.Session, error) { } // NewCluster creates a new gocql cluster from the configuration -func (c *Configuration) NewCluster() *gocql.ClusterConfig { +func (c *Configuration) NewCluster(logger *zap.Logger) (*gocql.ClusterConfig, error) { cluster := gocql.NewCluster(c.Servers...) cluster.Keyspace = c.Keyspace cluster.NumConns = c.ConnectionsPerHost @@ -144,15 +147,13 @@ func (c *Configuration) NewCluster() *gocql.ClusterConfig { Password: c.Authenticator.Basic.Password, } } + tlsCfg, err := c.TLS.Config(logger) + if err != nil { + return nil, err + } if c.TLS.Enabled { cluster.SslOpts = &gocql.SslOptions{ - Config: &tls.Config{ - ServerName: c.TLS.ServerName, - }, - CertPath: c.TLS.CertPath, - KeyPath: c.TLS.KeyPath, - CaPath: c.TLS.CAPath, - EnableHostVerification: !c.TLS.SkipHostVerify, + Config: tlsCfg, } } // If tunneling connection to C*, disable cluster autodiscovery features. @@ -160,7 +161,7 @@ func (c *Configuration) NewCluster() *gocql.ClusterConfig { cluster.DisableInitialHostLookup = true cluster.IgnorePeerAddr = true } - return cluster + return cluster, nil } func (c *Configuration) String() string { diff --git a/pkg/config/tlscfg/options.go b/pkg/config/tlscfg/options.go index a2248250c26..f4fa2237717 100644 --- a/pkg/config/tlscfg/options.go +++ b/pkg/config/tlscfg/options.go @@ -18,26 +18,36 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "io" "io/ioutil" "path/filepath" + + "go.uber.org/zap" ) // Options describes the configuration properties for TLS Connections. type Options struct { - Enabled bool `mapstructure:"enabled"` - CAPath string `mapstructure:"ca"` - CertPath string `mapstructure:"cert"` - KeyPath string `mapstructure:"key"` - ServerName string `mapstructure:"server_name"` // only for client-side TLS config - ClientCAPath string `mapstructure:"client_ca"` // only for server-side TLS config for client auth - SkipHostVerify bool `mapstructure:"skip_host_verify"` + Enabled bool `mapstructure:"enabled"` + CAPath string `mapstructure:"ca"` + CertPath string `mapstructure:"cert"` + KeyPath string `mapstructure:"key"` + ServerName string `mapstructure:"server_name"` // only for client-side TLS config + ClientCAPath string `mapstructure:"client_ca"` // only for server-side TLS config for client auth + SkipHostVerify bool `mapstructure:"skip_host_verify"` + watcher *watchCerts `mapstructure:"-"` } var systemCertPool = x509.SystemCertPool // to allow overriding in unit test // Config loads TLS certificates and returns a TLS Config. -func (p Options) Config() (*tls.Config, error) { - certPool, err := p.loadCertPool() +func (p Options) Config(logger *zap.Logger) (*tls.Config, error) { + w, err := newWatchCerts(p, logger) + if err != nil { + return nil, err + } + p.watcher = w + + certPool, err := p.loadCertPool(logger) if err != nil { return nil, fmt.Errorf("failed to load CA CertPool: %w", err) } @@ -52,11 +62,13 @@ func (p Options) Config() (*tls.Config, error) { return nil, fmt.Errorf("for client auth via TLS, either both client certificate and key must be supplied, or neither") } if p.CertPath != "" && p.KeyPath != "" { - tlsCert, err := tls.LoadX509KeyPair(filepath.Clean(p.CertPath), filepath.Clean(p.KeyPath)) - if err != nil { - return nil, fmt.Errorf("failed to load server TLS cert and key: %w", err) + tlsCfg.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) { + return p.watcher.certificate(), nil + } + // GetClientCertificate is used on the client side when server is configured with tls.RequireAndVerifyClientCert e.g. mTLS + tlsCfg.GetClientCertificate = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) { + return p.watcher.certificate(), nil } - tlsCfg.Certificates = append(tlsCfg.Certificates, tlsCert) } if p.ClientCAPath != "" { @@ -67,11 +79,11 @@ func (p Options) Config() (*tls.Config, error) { tlsCfg.ClientCAs = certPool tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert } - + go p.watcher.watchChangesLoop(tlsCfg.RootCAs, tlsCfg.ClientCAs) return tlsCfg, nil } -func (p Options) loadCertPool() (*x509.CertPool, error) { +func (p Options) loadCertPool(logger *zap.Logger) (*x509.CertPool, error) { if len(p.CAPath) == 0 { // no truststore given, use SystemCertPool certPool, err := systemCertPool() if err != nil { @@ -98,3 +110,12 @@ func addCertToPool(caPath string, certPool *x509.CertPool) error { } return nil } + +var _ io.Closer = (*Options)(nil) + +func (p Options) Close() error { + if p.watcher != nil { + return p.watcher.Close() + } + return nil +} diff --git a/pkg/config/tlscfg/options_test.go b/pkg/config/tlscfg/options_test.go index 8d038952dc2..fc96bbd945c 100644 --- a/pkg/config/tlscfg/options_test.go +++ b/pkg/config/tlscfg/options_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) var testCertKeyLocation = "./testdata" @@ -131,7 +132,7 @@ func TestOptionsToConfig(t *testing.T) { systemCertPool = saveSystemCertPool }() } - cfg, err := test.options.Config() + cfg, err := test.options.Config(zap.NewNop()) if test.expectError != "" { require.Error(t, err) assert.Contains(t, err.Error(), test.expectError) diff --git a/pkg/config/tlscfg/reload.go b/pkg/config/tlscfg/reload.go index e398d3c5734..06ab12c8b64 100644 --- a/pkg/config/tlscfg/reload.go +++ b/pkg/config/tlscfg/reload.go @@ -17,82 +17,120 @@ package tlscfg import ( "crypto/tls" "crypto/x509" - "fmt" "io" "path/filepath" + "sync" "github.com/fsnotify/fsnotify" "go.uber.org/zap" ) -// ReloadCertificates configures tls.Config to reload certificates is they change. -// This feature can be used to minimize down time when rotating/renewing certificates. -func (p Options) ReloadCertificates(tlsCfg *tls.Config, logger *zap.Logger) (io.Closer, error) { - watcher, err := fsnotify.NewWatcher() - if err != nil { - return nil, err - } - if p.CAPath != "" { - err := updateCertPool(watcher, p.CAPath, tlsCfg.RootCAs, logger) +type watchCerts struct { + opts Options + watcher *fsnotify.Watcher + cert tls.Certificate + logger *zap.Logger + mu *sync.Mutex +} + +var _ io.Closer = (*watchCerts)(nil) + +func newWatchCerts(opts Options, logger *zap.Logger) (*watchCerts, error) { + var cert tls.Certificate + if opts.CertPath != "" && opts.KeyPath != "" { + // load certs at startup to catch missing certs error early + c, err := tls.LoadX509KeyPair(filepath.Clean(opts.CertPath), filepath.Clean(opts.KeyPath)) if err != nil { - watcher.Close() return nil, err } + cert = c } - if p.ClientCAPath != "" { - err := updateCertPool(watcher, p.ClientCAPath, tlsCfg.ClientCAs, logger) - if err != nil { - watcher.Close() - return nil, err - } + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err } - if p.CertPath != "" && p.KeyPath != "" { - tlsCfg.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) { - logger.Debug("Loading certificate", zap.String("cert", p.CertPath), zap.String("key", p.KeyPath)) - tlsCert, err := tls.LoadX509KeyPair(filepath.Clean(p.CertPath), filepath.Clean(p.KeyPath)) - if err != nil { - return nil, fmt.Errorf("failed to load server TLS cert and key: %w", err) + if err := addCertsToWatch(watcher, opts); err != nil { + return nil, err + } + return &watchCerts{ + cert: cert, + opts: opts, + watcher: watcher, + logger: logger, + mu: &sync.Mutex{}, + }, nil +} + +func (w watchCerts) Close() error { + return w.watcher.Close() +} + +func (w watchCerts) certificate() *tls.Certificate { + w.mu.Lock() + defer w.mu.Unlock() + return &w.cert +} + +func (w watchCerts) watchChangesLoop(rootCAs, clientCAs *x509.CertPool) { + for { + select { + case event, ok := <-w.watcher.Events: + if !ok { + return + } + w.logger.Info("Loading modified certificate", + zap.String("certificate", event.Name), + zap.String("event", event.Op.String())) + var err error + switch event.Name { + case w.opts.CAPath: + err = addCertToPool(w.opts.CAPath, rootCAs) + case w.opts.ClientCAPath: + err = addCertToPool(w.opts.ClientCAPath, clientCAs) + case w.opts.CertPath, w.opts.KeyPath: + w.mu.Lock() + w.cert, err = tls.LoadX509KeyPair(filepath.Clean(w.opts.CertPath), filepath.Clean(w.opts.KeyPath)) + w.mu.Unlock() } - return &tlsCert, nil - } - tlsCfg.GetClientCertificate = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) { - logger.Debug("Loading client certificate", zap.String("cert", p.CertPath), zap.String("key", p.KeyPath)) - tlsCert, err := tls.LoadX509KeyPair(filepath.Clean(p.CertPath), filepath.Clean(p.KeyPath)) if err != nil { - return nil, fmt.Errorf("failed to load server TLS cert and key: %w", err) + w.logger.Error("Failed to load certificate", + zap.String("certificate", event.Name), + zap.String("event", event.Op.String()), + zap.Error(err)) } - return &tlsCert, nil + case err, ok := <-w.watcher.Errors: + if !ok { + return + } + w.logger.Error("Watcher got error", zap.Error(err)) } } - return watcher, nil } -func updateCertPool(watcher *fsnotify.Watcher, caPath string, certPool *x509.CertPool, logger *zap.Logger) error { - err := watcher.Add(caPath) - if err != nil { - return err +func addCertsToWatch(watcher *fsnotify.Watcher, opts Options) error { + if len(opts.CAPath) != 0 { + err := watcher.Add(opts.CAPath) + if err != nil { + return err + } } - go func() { - for { - select { - case event, ok := <-watcher.Events: - if !ok { - return - } - logger.Info("CA certificate has been modified", - zap.String("ca_path", caPath), - zap.String("event_name", event.Name)) - err := addCertToPool(caPath, certPool) - if err != nil { - logger.Error("failed to load CA cert", zap.String("ca_path", caPath)) - } - case err, ok := <-watcher.Errors: - if !ok { - return - } - logger.Error("Watcher got error", zap.Error(err)) - } + if len(opts.ClientCAPath) != 0 { + err := watcher.Add(opts.ClientCAPath) + if err != nil { + return err + } + } + if len(opts.CertPath) != 0 { + err := watcher.Add(opts.CertPath) + if err != nil { + return err } - }() + } + if len(opts.KeyPath) != 0 { + err := watcher.Add(opts.KeyPath) + if err != nil { + return err + } + } return nil } diff --git a/pkg/config/tlscfg/reload_test.go b/pkg/config/tlscfg/reload_test.go index 4145d00910b..f43ecf8b0bb 100644 --- a/pkg/config/tlscfg/reload_test.go +++ b/pkg/config/tlscfg/reload_test.go @@ -14,110 +14,95 @@ package tlscfg -import ( - "crypto/tls" - "crypto/x509" - "io/ioutil" - "testing" - "time" - - "github.com/fsnotify/fsnotify" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "go.uber.org/zap/zaptest/observer" -) - -func TestReloadCertificates(t *testing.T) { - opts := Options{ - CAPath: "./testdata/example-CA-cert.pem", - ClientCAPath: "./testdata/example-CA-cert.pem", - CertPath: "./testdata/example-server-cert.pem", - KeyPath: "./testdata/example-server-key.pem", - } - tlsCfg, err := opts.Config() - require.NoError(t, err) - require.NotNil(t, tlsCfg) - - closer, err := opts.ReloadCertificates(tlsCfg, zap.NewNop()) - require.NoError(t, err) - require.NotNil(t, closer) - defer closer.Close() - - require.NotNil(t, tlsCfg.GetCertificate) - require.NotNil(t, tlsCfg.GetClientCertificate) - - cert, err := tlsCfg.GetCertificate(&tls.ClientHelloInfo{}) - require.NoError(t, err) - assert.Equal(t, tlsCfg.Certificates[0], *cert) - cert, err = tlsCfg.GetClientCertificate(&tls.CertificateRequestInfo{}) - require.NoError(t, err) - assert.Equal(t, tlsCfg.Certificates[0], *cert) -} - -func TestReloadCertificates_err_ca(t *testing.T) { - opts := Options{ - CAPath: "./testdata/example-CA-cert.pem", - } - tlsCfg, err := opts.Config() - require.NoError(t, err) - require.NotNil(t, tlsCfg) - - opts.CAPath = "does_not_exist" - closer, err := opts.ReloadCertificates(tlsCfg, zap.NewNop()) - require.Error(t, err) - require.Nil(t, closer) -} - -func TestReloadCertificates_err_client_ca(t *testing.T) { - opts := Options{ - ClientCAPath: "./testdata/example-CA-cert.pem", - } - tlsCfg, err := opts.Config() - require.NoError(t, err) - require.NotNil(t, tlsCfg) - - opts.ClientCAPath = "does_not_exist" - closer, err := opts.ReloadCertificates(tlsCfg, zap.NewNop()) - require.Error(t, err) - require.Nil(t, closer) -} - -func TestUpdateCertPool(t *testing.T) { - zcore, logObserver := observer.New(zapcore.ErrorLevel) - logger := zap.New(zcore) - - certFile, err := ioutil.TempFile("", "cert") - require.NoError(t, err) - - certPool := x509.NewCertPool() - w, err := fsnotify.NewWatcher() - require.NoError(t, err) - defer w.Close() - err = updateCertPool(w, certFile.Name(), certPool, logger) - - // trigger file change - certFile.WriteString("some_contest") - waitUntil(func() bool { - return logObserver.FilterField(zap.String("ca_path", certFile.Name())).Len() > 0 - }, 100, time.Millisecond*100) - assert.Equal(t, 1, logObserver.FilterField(zap.String("ca_path", certFile.Name())).Len()) -} - -func TestUpdateCertPool_error_file_does_not_exist(t *testing.T) { - w, err := fsnotify.NewWatcher() - require.NoError(t, err) - defer w.Close() - err = updateCertPool(w, "does_not_exists", x509.NewCertPool(), zap.NewNop()) - require.Error(t, err) -} - -func waitUntil(f func() bool, iterations int, sleepInterval time.Duration) { - for i := 0; i < iterations; i++ { - if f() { - return - } - time.Sleep(sleepInterval) - } -} +//func TestReloadCertificates(t *testing.T) { +// opts := Options{ +// CAPath: "./testdata/example-CA-cert.pem", +// ClientCAPath: "./testdata/example-CA-cert.pem", +// CertPath: "./testdata/example-server-cert.pem", +// KeyPath: "./testdata/example-server-key.pem", +// } +// tlsCfg, err := opts.Config(zap.NewNop()) +// require.NoError(t, err) +// require.NotNil(t, tlsCfg) +// +// closer, err := opts.ReloadCertificates(tlsCfg, zap.NewNop()) +// require.NoError(t, err) +// require.NotNil(t, closer) +// defer closer.Close() +// +// require.NotNil(t, tlsCfg.GetCertificate) +// require.NotNil(t, tlsCfg.GetClientCertificate) +// +// cert, err := tlsCfg.GetCertificate(&tls.ClientHelloInfo{}) +// require.NoError(t, err) +// assert.Equal(t, tlsCfg.Certificates[0], *cert) +// cert, err = tlsCfg.GetClientCertificate(&tls.CertificateRequestInfo{}) +// require.NoError(t, err) +// assert.Equal(t, tlsCfg.Certificates[0], *cert) +//} +// +//func TestReloadCertificates_err_ca(t *testing.T) { +// opts := Options{ +// CAPath: "./testdata/example-CA-cert.pem", +// } +// tlsCfg, err := opts.Config() +// require.NoError(t, err) +// require.NotNil(t, tlsCfg) +// +// opts.CAPath = "does_not_exist" +// closer, err := opts.ReloadCertificates(tlsCfg, zap.NewNop()) +// require.Error(t, err) +// require.Nil(t, closer) +//} +// +//func TestReloadCertificates_err_client_ca(t *testing.T) { +// opts := Options{ +// ClientCAPath: "./testdata/example-CA-cert.pem", +// } +// tlsCfg, err := opts.Config() +// require.NoError(t, err) +// require.NotNil(t, tlsCfg) +// +// opts.ClientCAPath = "does_not_exist" +// closer, err := opts.ReloadCertificates(tlsCfg, zap.NewNop()) +// require.Error(t, err) +// require.Nil(t, closer) +//} +// +//func TestUpdateCertPool(t *testing.T) { +// zcore, logObserver := observer.New(zapcore.ErrorLevel) +// logger := zap.New(zcore) +// +// certFile, err := ioutil.TempFile("", "cert") +// require.NoError(t, err) +// +// certPool := x509.NewCertPool() +// w, err := fsnotify.NewWatcher() +// require.NoError(t, err) +// defer w.Close() +// err = updateCertPool(w, certFile.Name(), certPool, logger) +// +// // trigger file change +// certFile.WriteString("some_contest") +// waitUntil(func() bool { +// return logObserver.FilterField(zap.String("ca_path", certFile.Name())).Len() > 0 +// }, 100, time.Millisecond*100) +// assert.Equal(t, 1, logObserver.FilterField(zap.String("ca_path", certFile.Name())).Len()) +//} +// +//func TestUpdateCertPool_error_file_does_not_exist(t *testing.T) { +// w, err := fsnotify.NewWatcher() +// require.NoError(t, err) +// defer w.Close() +// err = updateCertPool(w, "does_not_exists", x509.NewCertPool(), zap.NewNop()) +// require.Error(t, err) +//} +// +//func waitUntil(f func() bool, iterations int, sleepInterval time.Duration) { +// for i := 0; i < iterations; i++ { +// if f() { +// return +// } +// time.Sleep(sleepInterval) +// } +//} diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index d6761338acd..913fbbc170f 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -313,12 +313,7 @@ func (c *Configuration) getConfigOptions(logger *zap.Logger) ([]elastic.ClientOp // GetHTTPRoundTripper returns configured http.RoundTripper func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTripper, error) { if c.TLS.Enabled { - ctlsConfig, err := c.TLS.Config() - if err != nil { - return nil, err - } - // ignore closer as we cannot pass it to the round tripper or bubble up - _, err = c.TLS.ReloadCertificates(ctlsConfig, logger) + ctlsConfig, err := c.TLS.Config(logger) if err != nil { return nil, err } @@ -333,7 +328,7 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe TLSClientConfig: &tls.Config{InsecureSkipVerify: c.TLS.SkipHostVerify}, } if c.TLS.CAPath != "" { - ctlsConfig, err := c.TLS.Config() + ctlsConfig, err := c.TLS.Config(logger) if err != nil { return nil, err } diff --git a/pkg/kafka/auth/config.go b/pkg/kafka/auth/config.go index 64c9741d41e..74aa7fac5c9 100644 --- a/pkg/kafka/auth/config.go +++ b/pkg/kafka/auth/config.go @@ -20,6 +20,7 @@ import ( "github.com/Shopify/sarama" "github.com/spf13/viper" + "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" ) @@ -46,13 +47,13 @@ type AuthenticationConfig struct { } //SetConfiguration set configure authentication into sarama config structure -func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config) error { +func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config, logger *zap.Logger) error { authentication := strings.ToLower(config.Authentication) if strings.Trim(authentication, " ") == "" { authentication = none } if config.Authentication == tls || config.TLS.Enabled { - err := setTLSConfiguration(&config.TLS, saramaConfig) + err := setTLSConfiguration(&config.TLS, saramaConfig, logger) if err != nil { return err } diff --git a/pkg/kafka/auth/tls.go b/pkg/kafka/auth/tls.go index dd01318c7a8..bdd743edcf1 100644 --- a/pkg/kafka/auth/tls.go +++ b/pkg/kafka/auth/tls.go @@ -18,13 +18,14 @@ import ( "fmt" "github.com/Shopify/sarama" + "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" ) -func setTLSConfiguration(config *tlscfg.Options, saramaConfig *sarama.Config) error { +func setTLSConfiguration(config *tlscfg.Options, saramaConfig *sarama.Config, logger *zap.Logger) error { if config.Enabled { - tlsConfig, err := config.Config() + tlsConfig, err := config.Config(logger) if err != nil { return fmt.Errorf("error loading tls config: %w", err) } diff --git a/pkg/kafka/consumer/config.go b/pkg/kafka/consumer/config.go index 169198886dc..351ccfe73ad 100644 --- a/pkg/kafka/consumer/config.go +++ b/pkg/kafka/consumer/config.go @@ -19,6 +19,7 @@ import ( "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" + "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/kafka/auth" ) @@ -48,7 +49,7 @@ type Configuration struct { } // NewConsumer creates a new kafka consumer -func (c *Configuration) NewConsumer() (Consumer, error) { +func (c *Configuration) NewConsumer(logger *zap.Logger) (Consumer, error) { saramaConfig := cluster.NewConfig() saramaConfig.Group.Mode = cluster.ConsumerModePartitions saramaConfig.ClientID = c.ClientID @@ -63,7 +64,7 @@ func (c *Configuration) NewConsumer() (Consumer, error) { } saramaConfig.Config.Version = ver } - if err := c.AuthenticationConfig.SetConfiguration(&saramaConfig.Config); err != nil { + if err := c.AuthenticationConfig.SetConfiguration(&saramaConfig.Config, logger); err != nil { return nil, err } return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig) diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index 30920f66174..f12b09dc2f8 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -18,13 +18,14 @@ import ( "time" "github.com/Shopify/sarama" + "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/kafka/auth" ) // Builder builds a new kafka producer type Builder interface { - NewProducer() (sarama.AsyncProducer, error) + NewProducer(logger *zap.Logger) (sarama.AsyncProducer, error) } // Configuration describes the configuration properties needed to create a Kafka producer @@ -42,7 +43,7 @@ type Configuration struct { } // NewProducer creates a new asynchronous kafka producer -func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { +func (c *Configuration) NewProducer(logger *zap.Logger) (sarama.AsyncProducer, error) { saramaConfig := sarama.NewConfig() saramaConfig.Producer.RequiredAcks = c.RequiredAcks saramaConfig.Producer.Compression = c.Compression @@ -59,7 +60,7 @@ func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { } saramaConfig.Version = ver } - if err := c.AuthenticationConfig.SetConfiguration(saramaConfig); err != nil { + if err := c.AuthenticationConfig.SetConfiguration(saramaConfig, logger); err != nil { return nil, err } return sarama.NewAsyncProducer(c.Brokers, saramaConfig) diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index 6bd74959303..d48d0d0896b 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -18,6 +18,7 @@ package cassandra import ( "errors" "flag" + "io" "github.com/spf13/viper" "github.com/uber/jaeger-lib/metrics" @@ -88,14 +89,14 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) f.archiveMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra-archive", Tags: nil}) f.logger = logger - primarySession, err := f.primaryConfig.NewSession() + primarySession, err := f.primaryConfig.NewSession(logger) if err != nil { return err } f.primarySession = primarySession if f.archiveConfig != nil { - if archiveSession, err := f.archiveConfig.NewSession(); err == nil { + if archiveSession, err := f.archiveConfig.NewSession(logger); err == nil { f.archiveSession = archiveSession } else { return err @@ -174,3 +175,14 @@ func writerOptions(opts *Options) ([]cSpanStore.Option, error) { return []cSpanStore.Option{cSpanStore.TagFilter(dbmodel.NewChainedTagFilter(tagFilters...))}, nil } + +var _ io.Closer = (*Factory)(nil) + +// Close closes the resources held by the factory +func (f *Factory) Close() error { + f.Options.Get(archiveStorageConfig) + if cfg := f.Options.Get(archiveStorageConfig); cfg != nil { + cfg.TLS.Close() + } + return f.Options.GetPrimary().TLS.Close() +} diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index 63896705228..ab2107dc808 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -47,7 +47,7 @@ func newMockSessionBuilder(session *mocks.Session, err error) *mockSessionBuilde } } -func (m *mockSessionBuilder) NewSession() (cassandra.Session, error) { +func (m *mockSessionBuilder) NewSession(*zap.Logger) (cassandra.Session, error) { return m.session, m.err } diff --git a/plugin/storage/cassandra/savetracetest/main.go b/plugin/storage/cassandra/savetracetest/main.go index 0c8256903b7..3916f9a4bc4 100644 --- a/plugin/storage/cassandra/savetracetest/main.go +++ b/plugin/storage/cassandra/savetracetest/main.go @@ -40,7 +40,7 @@ func main() { ProtoVersion: 4, Keyspace: "jaeger_v1_test", } - cqlSession, err := cConfig.NewSession() + cqlSession, err := cConfig.NewSession(logger) if err != nil { logger.Fatal("Cannot create Cassandra session", zap.Error(err)) } diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index de75f912240..1ba65864407 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -18,6 +18,7 @@ package es import ( "flag" "fmt" + "io" "strconv" "strings" @@ -215,3 +216,13 @@ func fixMapping(mapping string, shards, replicas int64) string { mapping = strings.Replace(mapping, "${__NUMBER_OF_REPLICAS__}", strconv.FormatInt(replicas, 10), 1) return mapping } + +var _ io.Closer = (*Factory)(nil) + +// Close closes the resources held by the factory +func (f *Factory) Close() error { + if cfg := f.Options.Get(archiveNamespace); cfg != nil { + cfg.TLS.Close() + } + return f.Options.GetPrimary().TLS.Close() +} diff --git a/plugin/storage/factory.go b/plugin/storage/factory.go index 68d30f7a9f2..263fbfab085 100644 --- a/plugin/storage/factory.go +++ b/plugin/storage/factory.go @@ -18,11 +18,13 @@ package storage import ( "flag" "fmt" + "io" "github.com/spf13/viper" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/pkg/multierror" "github.com/jaegertracing/jaeger/plugin" "github.com/jaegertracing/jaeger/plugin/storage/badger" "github.com/jaegertracing/jaeger/plugin/storage/cassandra" @@ -229,3 +231,22 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { } return archive.CreateArchiveSpanWriter() } + +var _ io.Closer = (*Factory)(nil) + +// Close closes the resources held by the factory +func (f *Factory) Close() error { + var errs []error + for _, storageType := range f.SpanWriterTypes { + factory, ok := f.factories[storageType] + if ok { + if closer, ok := factory.(io.Closer); ok { + err := closer.Close() + if err != nil { + errs = append(errs, err) + } + } + } + } + return multierror.Wrap(errs) +} diff --git a/plugin/storage/kafka/factory.go b/plugin/storage/kafka/factory.go index 40bbfc9961e..b11caf1983c 100644 --- a/plugin/storage/kafka/factory.go +++ b/plugin/storage/kafka/factory.go @@ -17,6 +17,7 @@ package kafka import ( "errors" "flag" + "io" "github.com/Shopify/sarama" "github.com/spf13/viper" @@ -68,7 +69,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) logger.Info("Kafka factory", zap.Any("producer builder", f.Builder), zap.Any("topic", f.options.Topic)) - p, err := f.NewProducer() + p, err := f.NewProducer(logger) if err != nil { return err } @@ -98,3 +99,10 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { return nil, errors.New("kafka storage is write-only") } + +var _ io.Closer = (*Factory)(nil) + +// Close closes the resources held by the factory +func (f *Factory) Close() error { + return f.options.Config.TLS.Close() +} diff --git a/plugin/storage/kafka/factory_test.go b/plugin/storage/kafka/factory_test.go index a9a7e77270f..13bca8d8eb0 100644 --- a/plugin/storage/kafka/factory_test.go +++ b/plugin/storage/kafka/factory_test.go @@ -41,7 +41,7 @@ type mockProducerBuilder struct { t *testing.T } -func (m *mockProducerBuilder) NewProducer() (sarama.AsyncProducer, error) { +func (m *mockProducerBuilder) NewProducer(*zap.Logger) (sarama.AsyncProducer, error) { if m.err == nil { return mocks.NewAsyncProducer(m.t, nil), nil }