Skip to content

Commit

Permalink
Add support for TLS configuration in NSQ input to reach HTTPS nsqd en…
Browse files Browse the repository at this point in the history
…dpoints

Goal: A completely secure NSQ doesn't have any HTTP endpoint, but only
HTTPS, often with x509 client authentication. This PR aims at
configuring the HTTP client to correctly connects to nsqd

Choices: TLS configuration build is only done once, and the first
`Gather()` will return an error, if configuration is invalid, then it
will be silent. The HTTP client is only built once, to limit allocations
  • Loading branch information
Soulou committed Mar 19, 2018
1 parent 76ce71f commit 99d5a57
Showing 1 changed file with 70 additions and 9 deletions.
79 changes: 70 additions & 9 deletions plugins/inputs/nsq/nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
package nsq

import (
"crypto/rand"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
Expand All @@ -38,12 +41,25 @@ import (

// Might add Lookupd endpoints for cluster discovery
type NSQ struct {
Endpoints []string
Endpoints []string
TlsCert string
TlsKey string
TlsCacert string
httpClient *http.Client
httpClientOnce *sync.Once
tlsConfig *tls.Config
tlsConfigOnce *sync.Once
}

var sampleConfig = `
## An array of NSQD HTTP API endpoints
endpoints = ["http://localhost:4151"]
endpoints = ["http://localhost:4151"]
## Or using HTTPS endpoint
endpoints = ["https://localhost:4152"]
tls_cert = "/path/to/client-cert.pem"
tls_key = "/path/to/client-key.pem"
tls_cacert = "/path/to/ca.pem"
`

const (
Expand All @@ -52,7 +68,10 @@ const (

func init() {
inputs.Add("nsq", func() telegraf.Input {
return &NSQ{}
return &NSQ{
httpClientOnce: &sync.Once{},
tlsConfigOnce: &sync.Once{},
}
})
}

Expand All @@ -65,6 +84,15 @@ func (n *NSQ) Description() string {
}

func (n *NSQ) Gather(acc telegraf.Accumulator) error {
var err error

n.tlsConfigOnce.Do(func() {
n.tlsConfig, err = n.buildTLSConfig()
})
if err != nil {
return fmt.Errorf("fail to build tls config: %v", err)
}

var wg sync.WaitGroup
for _, e := range n.Endpoints {
wg.Add(1)
Expand All @@ -78,21 +106,54 @@ func (n *NSQ) Gather(acc telegraf.Accumulator) error {
return nil
}

var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
func (n *NSQ) buildTLSConfig() (*tls.Config, error) {
if n.TlsCacert == "" || n.TlsKey == "" || n.TlsCacert == "" {
return nil, nil
}

caCertBytes, err := ioutil.ReadFile(n.TlsCacert)
if err != nil {
return nil, fmt.Errorf("fail to read CA cert file %v: %v", n.TlsCacert, err)
}

cert, err := tls.LoadX509KeyPair(n.TlsCert, n.TlsKey)
if err != nil {
return nil, fmt.Errorf("fail to load certificate %v: %v", n.TlsCert, err)
}

pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caCertBytes)

config := &tls.Config{
Certificates: []tls.Certificate{cert},
ClientCAs: pool,
RootCAs: pool,
}

config.Rand = rand.Reader
return config, nil
}

var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
func (n *NSQ) getHttpClient() *http.Client {
n.httpClientOnce.Do(func() {
tr := &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
TLSClientConfig: n.tlsConfig,
}
n.httpClient = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
})
return n.httpClient
}

func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error {
u, err := buildURL(e)
if err != nil {
return err
}
r, err := client.Get(u.String())
r, err := n.getHttpClient().Get(u.String())
if err != nil {
return fmt.Errorf("Error while polling %s: %s", u.String(), err)
}
Expand Down

0 comments on commit 99d5a57

Please sign in to comment.