Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.http_listener_v2): Add unix socket mode #15764

Merged
merged 13 commits into from
Sep 4, 2024
11 changes: 11 additions & 0 deletions plugins/inputs/http_listener_v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Address and port to host HTTP listener on
service_address = ":8080"

## Network type to listen to default tcp
# if set to unix service_address will be interpreted as unix socket path
# network = "tcp"

## Permission for unix sockets (only available for network unix)
## This setting may not be respected by some platforms. To safely restrict
## permissions it is recommended to place the socket into a previously
## created directory with the desired permissions.
## ex: socket_mode = "777"
# socket_mode = ""

## Paths to listen to.
# paths = ["/telegraf"]

Expand Down
45 changes: 42 additions & 3 deletions plugins/inputs/http_listener_v2/http_listener_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ import (
"crypto/tls"
_ "embed"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -47,6 +52,8 @@ type TimeFunc func() time.Time
// HTTPListenerV2 is an input plugin that collects external metrics sent via HTTP
type HTTPListenerV2 struct {
ServiceAddress string `toml:"service_address"`
Network string `toml:"network"`
SocketMode string `toml:"socket_mode"`
Path string `toml:"path" deprecated:"1.20.0;1.35.0;use 'paths' instead"`
Paths []string `toml:"paths"`
PathTag bool `toml:"path_tag"`
Expand Down Expand Up @@ -151,18 +158,49 @@ func (h *HTTPListenerV2) Init() error {
return err
}

switch h.Network {
case "tcp":
case "unix":
path := filepath.FromSlash(h.ServiceAddress)
if runtime.GOOS == "windows" && strings.Contains(path, ":") {
path = strings.TrimPrefix(path, `\`)
}
if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("removing socket failed: %w", err)
}
default:
return fmt.Errorf("unknown protocol %q", h.Network)
}

var listener net.Listener
if tlsConf != nil {
listener, err = tls.Listen("tcp", h.ServiceAddress, tlsConf)
listener, err = tls.Listen(h.Network, h.ServiceAddress, tlsConf)
} else {
listener, err = net.Listen("tcp", h.ServiceAddress)
listener, err = net.Listen(h.Network, h.ServiceAddress)
}
if err != nil {
return err
}
h.tlsConf = tlsConf
h.listener = listener
h.Port = listener.Addr().(*net.TCPAddr).Port

if h.Network == "tcp" {
h.Port = listener.Addr().(*net.TCPAddr).Port
}

if h.Network == "unix" && h.SocketMode != "" {
// Set permissions on socket
// Convert from octal in string to int
i, err := strconv.ParseUint(h.SocketMode, 8, 32)
if err != nil {
return fmt.Errorf("converting socket mode failed: %w", err)
}

perm := os.FileMode(uint32(i))
if err := os.Chmod(h.ServiceAddress, perm); err != nil {
return fmt.Errorf("changing socket permissions failed: %w", err)
}
}

if h.SuccessCode == 0 {
h.SuccessCode = http.StatusNoContent
Expand Down Expand Up @@ -375,6 +413,7 @@ func init() {
inputs.Add("http_listener_v2", func() telegraf.Input {
return &HTTPListenerV2{
ServiceAddress: ":8080",
Network: "tcp",
TimeFunc: time.Now,
Paths: []string{"/telegraf"},
Methods: []string{"POST", "PUT"},
Expand Down
50 changes: 43 additions & 7 deletions plugins/inputs/http_listener_v2/http_listener_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package http_listener_v2

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -42,9 +44,7 @@ cpu_load_short,host=server06 value=12.0 1422568543702900257
basicPassword = "super-secure-password!"
)

var (
pki = testutil.NewPKI("../../../testutil/pki")
)
var pki = testutil.NewPKI("../../../testutil/pki")

func newTestHTTPListenerV2() (*HTTPListenerV2, error) {
parser := &influx.Parser{}
Expand All @@ -54,6 +54,7 @@ func newTestHTTPListenerV2() (*HTTPListenerV2, error) {

listener := &HTTPListenerV2{
Log: testutil.Logger{},
Network: "tcp",
ServiceAddress: "localhost:0",
Path: "/write",
Methods: []string{"POST"},
Expand Down Expand Up @@ -84,6 +85,7 @@ func newTestHTTPSListenerV2() (*HTTPListenerV2, error) {

listener := &HTTPListenerV2{
Log: testutil.Logger{},
Network: "tcp",
ServiceAddress: "localhost:0",
Path: "/write",
Methods: []string{"POST"},
Expand Down Expand Up @@ -124,6 +126,7 @@ func TestInvalidListenerConfig(t *testing.T) {

listener := &HTTPListenerV2{
Log: testutil.Logger{},
Network: "tcp",
ServiceAddress: "address_without_port",
Path: "/write",
Methods: []string{"POST"},
Expand Down Expand Up @@ -231,8 +234,10 @@ func TestWriteHTTP(t *testing.T) {
require.EqualValues(t, 204, resp.StatusCode)

acc.Wait(2)
hostTags := []string{"server02", "server03",
"server04", "server05", "server06"}
hostTags := []string{
"server02", "server03",
"server04", "server05", "server06",
}
srebhan marked this conversation as resolved.
Show resolved Hide resolved
for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
Expand Down Expand Up @@ -359,6 +364,7 @@ func TestWriteHTTPExactMaxBodySize(t *testing.T) {

listener := &HTTPListenerV2{
Log: testutil.Logger{},
Network: "tcp",
ServiceAddress: "localhost:0",
Path: "/write",
Methods: []string{"POST"},
Expand All @@ -385,6 +391,7 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) {

listener := &HTTPListenerV2{
Log: testutil.Logger{},
Network: "tcp",
ServiceAddress: "localhost:0",
Path: "/write",
Methods: []string{"POST"},
Expand Down Expand Up @@ -429,8 +436,10 @@ func TestWriteHTTPGzippedData(t *testing.T) {
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode)

hostTags := []string{"server02", "server03",
"server04", "server05", "server06"}
hostTags := []string{
"server02", "server03",
"server04", "server05", "server06",
}
srebhan marked this conversation as resolved.
Show resolved Hide resolved
acc.Wait(len(hostTags))
for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short",
Expand Down Expand Up @@ -724,6 +733,33 @@ func TestServerHeaders(t *testing.T) {
require.Equal(t, "value", resp.Header.Get("key"))
}

func TestUnixSocket(t *testing.T) {
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
listener.Network = "unix"
file, err := os.CreateTemp("", "*.socket")
require.NoError(t, err)
require.NoError(t, file.Close())
defer os.Remove(file.Name())
socketName := file.Name()
listener.ServiceAddress = socketName
listener.SocketMode = "777"
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
httpc := http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", socketName)
},
},
}
resp, err := httpc.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBufferString(testMsg))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode)
}

func mustReadHugeMetric() []byte {
filePath := "testdata/huge_metric"
data, err := os.ReadFile(filePath)
Expand Down
11 changes: 11 additions & 0 deletions plugins/inputs/http_listener_v2/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@
## Address and port to host HTTP listener on
service_address = ":8080"

## Network type to listen to default tcp
# if set to unix service_address will be interpreted as unix socket path
# network = "tcp"
srebhan marked this conversation as resolved.
Show resolved Hide resolved

## Permission for unix sockets (only available for network unix)
## This setting may not be respected by some platforms. To safely restrict
## permissions it is recommended to place the socket into a previously
## created directory with the desired permissions.
## ex: socket_mode = "777"
# socket_mode = ""

## Paths to listen to.
# paths = ["/telegraf"]

Expand Down
Loading