Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outputs.influxdb_v2): Add option to set local address #15228

Merged
merged 1 commit into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions plugins/outputs/influxdb_v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ to use them.
## ex: urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"]
urls = ["http://127.0.0.1:8086"]

## Local address to bind when connecting to the server
## If empty or not set, the local address is automatically chosen.
# local_address = ""

## Token for authentication.
token = ""

Expand Down
7 changes: 7 additions & 0 deletions plugins/outputs/influxdb_v2/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (

type HTTPConfig struct {
URL *url.URL
LocalAddr *net.TCPAddr
Token config.Secret
Organization string
Bucket string
Expand Down Expand Up @@ -125,9 +126,15 @@ func NewHTTPClient(cfg *HTTPConfig) (*httpClient, error) {
var transport *http.Transport
switch cfg.URL.Scheme {
case "http", "https":
var dialerFunc func(ctx context.Context, network, addr string) (net.Conn, error)
if cfg.LocalAddr != nil {
dialer := &net.Dialer{LocalAddr: cfg.LocalAddr}
dialerFunc = dialer.DialContext
}
transport = &http.Transport{
Proxy: proxy,
TLSClientConfig: cfg.TLSConfig,
DialContext: dialerFunc,
}
if cfg.ReadIdleTimeout != 0 || cfg.PingTimeout != 0 {
http2Trans, err := http2.ConfigureTransports(transport)
Expand Down
36 changes: 34 additions & 2 deletions plugins/outputs/influxdb_v2/influxdb_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
"errors"
"fmt"
"math/rand"
"net"
"net/url"
"strconv"
"strings"
"time"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -35,6 +38,7 @@ type Client interface {

type InfluxDB struct {
URLs []string `toml:"urls"`
LocalAddr string `toml:"local_address"`
Token config.Secret `toml:"token"`
Organization string `toml:"organization"`
Bucket string `toml:"bucket"`
Expand Down Expand Up @@ -78,9 +82,36 @@ func (i *InfluxDB) Connect() error {
}
}

var localAddr *net.TCPAddr
if i.LocalAddr != "" {
// Resolve the local address into IP address and the given port if any
addr, sPort, err := net.SplitHostPort(i.LocalAddr)
if err != nil {
if !strings.Contains(err.Error(), "missing port") {
return fmt.Errorf("invalid local address: %w", err)
}
addr = i.LocalAddr
}
local, err := net.ResolveIPAddr("ip", addr)
if err != nil {
return fmt.Errorf("cannot resolve local address: %w", err)
}

var port int
if sPort != "" {
p, err := strconv.ParseUint(sPort, 10, 16)
if err != nil {
return fmt.Errorf("invalid port: %w", err)
}
port = int(p)
}

localAddr = &net.TCPAddr{IP: local.IP, Port: port, Zone: local.Zone}
}

switch parts.Scheme {
case "http", "https", "unix":
c, err := i.getHTTPClient(parts, proxy)
c, err := i.getHTTPClient(parts, localAddr, proxy)
if err != nil {
return err
}
Expand Down Expand Up @@ -121,7 +152,7 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
return errors.New("failed to send metrics to any configured server(s)")
}

func (i *InfluxDB) getHTTPClient(address *url.URL, proxy *url.URL) (Client, error) {
func (i *InfluxDB) getHTTPClient(address *url.URL, localAddr *net.TCPAddr, proxy *url.URL) (Client, error) {
tlsConfig, err := i.ClientConfig.TLSConfig()
if err != nil {
return nil, err
Expand All @@ -134,6 +165,7 @@ func (i *InfluxDB) getHTTPClient(address *url.URL, proxy *url.URL) (Client, erro

httpConfig := &HTTPConfig{
URL: address,
LocalAddr: localAddr,
Token: i.Token,
Organization: i.Organization,
Bucket: i.Bucket,
Expand Down
12 changes: 12 additions & 0 deletions plugins/outputs/influxdb_v2/influxdb_v2_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package influxdb_v2_test

import (
"net"
"testing"

"github.com/influxdata/telegraf/plugins/common/tls"
Expand Down Expand Up @@ -100,3 +101,14 @@ func TestUnused(_ *testing.T) {
thing.SampleConfig()
outputs.Outputs["influxdb_v2"]()
}

func TestInfluxDBLocalAddress(t *testing.T) {
t.Log("Starting server")
server, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
defer server.Close()

output := influxdb.InfluxDB{LocalAddr: "localhost"}
require.NoError(t, output.Connect())
require.NoError(t, output.Close())
}
4 changes: 4 additions & 0 deletions plugins/outputs/influxdb_v2/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
## ex: urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"]
urls = ["http://127.0.0.1:8086"]

## Local address to bind when connecting to the server
## If empty or not set, the local address is automatically chosen.
# local_address = ""

## Token for authentication.
token = ""

Expand Down
Loading