diff --git a/plugins/outputs/influxdb_v2/README.md b/plugins/outputs/influxdb_v2/README.md index fee1200afbe46..4cb8b7db8c1dc 100644 --- a/plugins/outputs/influxdb_v2/README.md +++ b/plugins/outputs/influxdb_v2/README.md @@ -31,6 +31,10 @@ to use them. ## ex: urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"] urls = ["http://127.0.0.1:8086"] + ## Local address to bind when connecting to the server + ## If empty or not set, the local address is automatically chosen. + # local_address = "" + ## Token for authentication. token = "" diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index 2eade2b6872cb..4cfd19727e90b 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -44,6 +44,7 @@ const ( type HTTPConfig struct { URL *url.URL + LocalAddr *net.TCPAddr Token config.Secret Organization string Bucket string @@ -125,9 +126,15 @@ func NewHTTPClient(cfg *HTTPConfig) (*httpClient, error) { var transport *http.Transport switch cfg.URL.Scheme { case "http", "https": + var dialerFunc func(ctx context.Context, network, addr string) (net.Conn, error) + if cfg.LocalAddr != nil { + dialer := &net.Dialer{LocalAddr: cfg.LocalAddr} + dialerFunc = dialer.DialContext + } transport = &http.Transport{ Proxy: proxy, TLSClientConfig: cfg.TLSConfig, + DialContext: dialerFunc, } if cfg.ReadIdleTimeout != 0 || cfg.PingTimeout != 0 { http2Trans, err := http2.ConfigureTransports(transport) diff --git a/plugins/outputs/influxdb_v2/influxdb_v2.go b/plugins/outputs/influxdb_v2/influxdb_v2.go index 23ccdbb7bb766..bb9f023fe75b7 100644 --- a/plugins/outputs/influxdb_v2/influxdb_v2.go +++ b/plugins/outputs/influxdb_v2/influxdb_v2.go @@ -7,7 +7,10 @@ import ( "errors" "fmt" "math/rand" + "net" "net/url" + "strconv" + "strings" "time" "github.com/influxdata/telegraf" @@ -35,6 +38,7 @@ type Client interface { type InfluxDB struct { URLs []string `toml:"urls"` + LocalAddr string `toml:"local_address"` Token config.Secret `toml:"token"` Organization string `toml:"organization"` Bucket string `toml:"bucket"` @@ -78,9 +82,36 @@ func (i *InfluxDB) Connect() error { } } + var localAddr *net.TCPAddr + if i.LocalAddr != "" { + // Resolve the local address into IP address and the given port if any + addr, sPort, err := net.SplitHostPort(i.LocalAddr) + if err != nil { + if !strings.Contains(err.Error(), "missing port") { + return fmt.Errorf("invalid local address: %w", err) + } + addr = i.LocalAddr + } + local, err := net.ResolveIPAddr("ip", addr) + if err != nil { + return fmt.Errorf("cannot resolve local address: %w", err) + } + + var port int + if sPort != "" { + p, err := strconv.ParseUint(sPort, 10, 16) + if err != nil { + return fmt.Errorf("invalid port: %w", err) + } + port = int(p) + } + + localAddr = &net.TCPAddr{IP: local.IP, Port: port, Zone: local.Zone} + } + switch parts.Scheme { case "http", "https", "unix": - c, err := i.getHTTPClient(parts, proxy) + c, err := i.getHTTPClient(parts, localAddr, proxy) if err != nil { return err } @@ -121,7 +152,7 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { return errors.New("failed to send metrics to any configured server(s)") } -func (i *InfluxDB) getHTTPClient(address *url.URL, proxy *url.URL) (Client, error) { +func (i *InfluxDB) getHTTPClient(address *url.URL, localAddr *net.TCPAddr, proxy *url.URL) (Client, error) { tlsConfig, err := i.ClientConfig.TLSConfig() if err != nil { return nil, err @@ -134,6 +165,7 @@ func (i *InfluxDB) getHTTPClient(address *url.URL, proxy *url.URL) (Client, erro httpConfig := &HTTPConfig{ URL: address, + LocalAddr: localAddr, Token: i.Token, Organization: i.Organization, Bucket: i.Bucket, diff --git a/plugins/outputs/influxdb_v2/influxdb_v2_test.go b/plugins/outputs/influxdb_v2/influxdb_v2_test.go index 51a810e0c564e..d4aa80796eaad 100644 --- a/plugins/outputs/influxdb_v2/influxdb_v2_test.go +++ b/plugins/outputs/influxdb_v2/influxdb_v2_test.go @@ -1,6 +1,7 @@ package influxdb_v2_test import ( + "net" "testing" "github.com/influxdata/telegraf/plugins/common/tls" @@ -100,3 +101,14 @@ func TestUnused(_ *testing.T) { thing.SampleConfig() outputs.Outputs["influxdb_v2"]() } + +func TestInfluxDBLocalAddress(t *testing.T) { + t.Log("Starting server") + server, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer server.Close() + + output := influxdb.InfluxDB{LocalAddr: "localhost"} + require.NoError(t, output.Connect()) + require.NoError(t, output.Close()) +} diff --git a/plugins/outputs/influxdb_v2/sample.conf b/plugins/outputs/influxdb_v2/sample.conf index 93acf0dae0651..c62c888d76d59 100644 --- a/plugins/outputs/influxdb_v2/sample.conf +++ b/plugins/outputs/influxdb_v2/sample.conf @@ -7,6 +7,10 @@ ## ex: urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"] urls = ["http://127.0.0.1:8086"] + ## Local address to bind when connecting to the server + ## If empty or not set, the local address is automatically chosen. + # local_address = "" + ## Token for authentication. token = ""