diff --git a/plugins/inputs/nsq/nsq.go b/plugins/inputs/nsq/nsq.go index e53c0ca04d078..bf884f6adf157 100644 --- a/plugins/inputs/nsq/nsq.go +++ b/plugins/inputs/nsq/nsq.go @@ -23,9 +23,6 @@ package nsq import ( - "crypto/rand" - "crypto/tls" - "crypto/x509" "encoding/json" "fmt" "io/ioutil" @@ -36,19 +33,15 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/inputs" ) // Might add Lookupd endpoints for cluster discovery type NSQ struct { - Endpoints []string - TlsCert string - TlsKey string - TlsCacert string - httpClient *http.Client - httpClientOnce *sync.Once - tlsConfig *tls.Config - tlsConfigOnce *sync.Once + Endpoints []string + tls.ClientConfig + httpClient *http.Client } var sampleConfig = ` @@ -56,10 +49,11 @@ var sampleConfig = ` endpoints = ["http://localhost:4151"] ## Or using HTTPS endpoint - endpoints = ["https://localhost:4152"] - tls_cert = "/path/to/client-cert.pem" - tls_key = "/path/to/client-key.pem" - tls_cacert = "/path/to/ca.pem" + endpoints = ["https://localhost:4152"] + tls_cert = "/path/to/client-cert.pem" + tls_key = "/path/to/client-key.pem" + tls_cacert = "/path/to/ca.pem" + insecure_skip_verify = false ` const ( @@ -73,10 +67,7 @@ func init() { } func New() *NSQ { - return &NSQ{ - httpClientOnce: &sync.Once{}, - tlsConfigOnce: &sync.Once{}, - } + return &NSQ{} } func (n *NSQ) SampleConfig() string { @@ -90,11 +81,11 @@ func (n *NSQ) Description() string { func (n *NSQ) Gather(acc telegraf.Accumulator) error { var err error - n.tlsConfigOnce.Do(func() { - n.tlsConfig, err = n.buildTLSConfig() - }) - if err != nil { - return fmt.Errorf("fail to build tls config: %v", err) + if n.httpClient == nil { + n.httpClient, err = n.getHttpClient() + if err != nil { + return err + } } var wg sync.WaitGroup @@ -110,46 +101,19 @@ func (n *NSQ) Gather(acc telegraf.Accumulator) error { return nil } -func (n *NSQ) buildTLSConfig() (*tls.Config, error) { - if n.TlsCert == "" || n.TlsKey == "" || n.TlsCacert == "" { - return nil, nil - } - - caCertBytes, err := ioutil.ReadFile(n.TlsCacert) +func (n *NSQ) getHttpClient() (*http.Client, error) { + tlsConfig, err := n.ClientConfig.TLSConfig() if err != nil { - return nil, fmt.Errorf("fail to read CA cert file %v: %v", n.TlsCacert, err) + return nil, err } - - cert, err := tls.LoadX509KeyPair(n.TlsCert, n.TlsKey) - if err != nil { - return nil, fmt.Errorf("fail to load certificate %v: %v", n.TlsCert, err) + tr := &http.Transport{ + TLSClientConfig: tlsConfig, } - - pool := x509.NewCertPool() - pool.AppendCertsFromPEM(caCertBytes) - - config := &tls.Config{ - Certificates: []tls.Certificate{cert}, - ClientCAs: pool, - RootCAs: pool, + httpClient := &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), } - - config.Rand = rand.Reader - return config, nil -} - -func (n *NSQ) getHttpClient() *http.Client { - n.httpClientOnce.Do(func() { - tr := &http.Transport{ - ResponseHeaderTimeout: time.Duration(3 * time.Second), - TLSClientConfig: n.tlsConfig, - } - n.httpClient = &http.Client{ - Transport: tr, - Timeout: time.Duration(4 * time.Second), - } - }) - return n.httpClient + return httpClient, nil } func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error { @@ -157,7 +121,7 @@ func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error { if err != nil { return err } - r, err := n.getHttpClient().Get(u.String()) + r, err := n.httpClient.Get(u.String()) if err != nil { return fmt.Errorf("Error while polling %s: %s", u.String(), err) }