Skip to content

Commit

Permalink
feat(outputs.influxdb_v2): Add option to set local address (#15228)
Browse files Browse the repository at this point in the history
  • Loading branch information
momoson authored Apr 26, 2024
1 parent bf55d78 commit c7446f7
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 2 deletions.
4 changes: 4 additions & 0 deletions plugins/outputs/influxdb_v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ to use them.
## ex: urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"]
urls = ["http://127.0.0.1:8086"]

## Local address to bind when connecting to the server
## If empty or not set, the local address is automatically chosen.
# local_address = ""

## Token for authentication.
token = ""

Expand Down
7 changes: 7 additions & 0 deletions plugins/outputs/influxdb_v2/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (

type HTTPConfig struct {
URL *url.URL
LocalAddr *net.TCPAddr
Token config.Secret
Organization string
Bucket string
Expand Down Expand Up @@ -125,9 +126,15 @@ func NewHTTPClient(cfg *HTTPConfig) (*httpClient, error) {
var transport *http.Transport
switch cfg.URL.Scheme {
case "http", "https":
var dialerFunc func(ctx context.Context, network, addr string) (net.Conn, error)
if cfg.LocalAddr != nil {
dialer := &net.Dialer{LocalAddr: cfg.LocalAddr}
dialerFunc = dialer.DialContext
}
transport = &http.Transport{
Proxy: proxy,
TLSClientConfig: cfg.TLSConfig,
DialContext: dialerFunc,
}
if cfg.ReadIdleTimeout != 0 || cfg.PingTimeout != 0 {
http2Trans, err := http2.ConfigureTransports(transport)
Expand Down
36 changes: 34 additions & 2 deletions plugins/outputs/influxdb_v2/influxdb_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
"errors"
"fmt"
"math/rand"
"net"
"net/url"
"strconv"
"strings"
"time"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -35,6 +38,7 @@ type Client interface {

type InfluxDB struct {
URLs []string `toml:"urls"`
LocalAddr string `toml:"local_address"`
Token config.Secret `toml:"token"`
Organization string `toml:"organization"`
Bucket string `toml:"bucket"`
Expand Down Expand Up @@ -78,9 +82,36 @@ func (i *InfluxDB) Connect() error {
}
}

var localAddr *net.TCPAddr
if i.LocalAddr != "" {
// Resolve the local address into IP address and the given port if any
addr, sPort, err := net.SplitHostPort(i.LocalAddr)
if err != nil {
if !strings.Contains(err.Error(), "missing port") {
return fmt.Errorf("invalid local address: %w", err)
}
addr = i.LocalAddr
}
local, err := net.ResolveIPAddr("ip", addr)
if err != nil {
return fmt.Errorf("cannot resolve local address: %w", err)
}

var port int
if sPort != "" {
p, err := strconv.ParseUint(sPort, 10, 16)
if err != nil {
return fmt.Errorf("invalid port: %w", err)
}
port = int(p)
}

localAddr = &net.TCPAddr{IP: local.IP, Port: port, Zone: local.Zone}
}

switch parts.Scheme {
case "http", "https", "unix":
c, err := i.getHTTPClient(parts, proxy)
c, err := i.getHTTPClient(parts, localAddr, proxy)
if err != nil {
return err
}
Expand Down Expand Up @@ -121,7 +152,7 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
return errors.New("failed to send metrics to any configured server(s)")
}

func (i *InfluxDB) getHTTPClient(address *url.URL, proxy *url.URL) (Client, error) {
func (i *InfluxDB) getHTTPClient(address *url.URL, localAddr *net.TCPAddr, proxy *url.URL) (Client, error) {
tlsConfig, err := i.ClientConfig.TLSConfig()
if err != nil {
return nil, err
Expand All @@ -134,6 +165,7 @@ func (i *InfluxDB) getHTTPClient(address *url.URL, proxy *url.URL) (Client, erro

httpConfig := &HTTPConfig{
URL: address,
LocalAddr: localAddr,
Token: i.Token,
Organization: i.Organization,
Bucket: i.Bucket,
Expand Down
12 changes: 12 additions & 0 deletions plugins/outputs/influxdb_v2/influxdb_v2_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package influxdb_v2_test

import (
"net"
"testing"

"github.com/influxdata/telegraf/plugins/common/tls"
Expand Down Expand Up @@ -100,3 +101,14 @@ func TestUnused(_ *testing.T) {
thing.SampleConfig()
outputs.Outputs["influxdb_v2"]()
}

func TestInfluxDBLocalAddress(t *testing.T) {
t.Log("Starting server")
server, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
defer server.Close()

output := influxdb.InfluxDB{LocalAddr: "localhost"}
require.NoError(t, output.Connect())
require.NoError(t, output.Close())
}
4 changes: 4 additions & 0 deletions plugins/outputs/influxdb_v2/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
## ex: urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"]
urls = ["http://127.0.0.1:8086"]

## Local address to bind when connecting to the server
## If empty or not set, the local address is automatically chosen.
# local_address = ""

## Token for authentication.
token = ""

Expand Down

0 comments on commit c7446f7

Please sign in to comment.