Skip to content

Commit

Permalink
feat(inputs.http_listener_v2): Add unix socket mode (#15764)
Browse files Browse the repository at this point in the history
  • Loading branch information
bazko1 authored Sep 4, 2024
1 parent b00de66 commit 0b4f77d
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 15 deletions.
15 changes: 13 additions & 2 deletions plugins/inputs/http_listener_v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,19 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
```toml @sample.conf
# Generic HTTP write listener
[[inputs.http_listener_v2]]
## Address and port to host HTTP listener on
service_address = ":8080"
## Address to host HTTP listener on
## can be prefixed by protocol tcp, or unix if not provided defaults to tcp
## if unix network type provided it should be followed by absolute path for unix socket
service_address = "tcp://:8080"
## service_address = "tcp://:8443"
## service_address = "unix:///tmp/telegraf.sock"

## Permission for unix sockets (only available for unix sockets)
## This setting may not be respected by some platforms. To safely restrict
## permissions it is recommended to place the socket into a previously
## created directory with the desired permissions.
## ex: socket_mode = "777"
# socket_mode = ""

## Paths to listen to.
# paths = ["/telegraf"]
Expand Down
69 changes: 60 additions & 9 deletions plugins/inputs/http_listener_v2/http_listener_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@ import (
"crypto/tls"
_ "embed"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -47,6 +53,7 @@ type TimeFunc func() time.Time
// HTTPListenerV2 is an input plugin that collects external metrics sent via HTTP
type HTTPListenerV2 struct {
ServiceAddress string `toml:"service_address"`
SocketMode string `toml:"socket_mode"`
Path string `toml:"path" deprecated:"1.20.0;1.35.0;use 'paths' instead"`
Paths []string `toml:"paths"`
PathTag bool `toml:"path_tag"`
Expand All @@ -56,7 +63,7 @@ type HTTPListenerV2 struct {
ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"`
MaxBodySize config.Size `toml:"max_body_size"`
Port int `toml:"port"`
Port int `toml:"port" deprecated:"1.32.0;1.35.0;use 'service_address' instead"`
SuccessCode int `toml:"http_success_code"`
BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"`
Expand All @@ -72,6 +79,7 @@ type HTTPListenerV2 struct {
close chan struct{}

listener net.Listener
url *url.URL

telegraf.Parser
acc telegraf.Accumulator
Expand All @@ -91,6 +99,49 @@ func (h *HTTPListenerV2) SetParser(parser telegraf.Parser) {

// Start starts the http listener service.
func (h *HTTPListenerV2) Start(acc telegraf.Accumulator) error {
u := h.url
address := u.Host
switch u.Scheme {
case "tcp":
case "unix":
path := filepath.FromSlash(u.Path)
if runtime.GOOS == "windows" && strings.Contains(path, ":") {
path = strings.TrimPrefix(path, `\`)
}
if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("removing socket failed: %w", err)
}
address = path
default:
return fmt.Errorf("unknown protocol %q", u.Scheme)
}

var listener net.Listener
var err error
if h.tlsConf != nil {
listener, err = tls.Listen(u.Scheme, address, h.tlsConf)
} else {
listener, err = net.Listen(u.Scheme, address)
}
if err != nil {
return err
}
h.listener = listener

if u.Scheme == "unix" && h.SocketMode != "" {
// Set permissions on socket
// Convert from octal in string to int
i, err := strconv.ParseUint(h.SocketMode, 8, 32)
if err != nil {
return fmt.Errorf("converting socket mode failed: %w", err)
}

perm := os.FileMode(uint32(i))
if err := os.Chmod(address, perm); err != nil {
return fmt.Errorf("changing socket permissions failed: %w", err)
}
}

if h.MaxBodySize == 0 {
h.MaxBodySize = config.Size(defaultMaxBodySize)
}
Expand Down Expand Up @@ -151,18 +202,18 @@ func (h *HTTPListenerV2) Init() error {
return err
}

var listener net.Listener
if tlsConf != nil {
listener, err = tls.Listen("tcp", h.ServiceAddress, tlsConf)
} else {
listener, err = net.Listen("tcp", h.ServiceAddress)
protoRegex := regexp.MustCompile(`\w://`)
if !protoRegex.MatchString(h.ServiceAddress) {
h.ServiceAddress = "tcp://" + h.ServiceAddress
}

u, err := url.Parse(h.ServiceAddress)
if err != nil {
return err
return fmt.Errorf("parsing address failed: %w", err)
}

h.url = u
h.tlsConf = tlsConf
h.listener = listener
h.Port = listener.Addr().(*net.TCPAddr).Port

if h.SuccessCode == 0 {
h.SuccessCode = http.StatusNoContent
Expand Down
44 changes: 42 additions & 2 deletions plugins/inputs/http_listener_v2/http_listener_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package http_listener_v2

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"net/http"
"net/url"
"os"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -109,9 +112,13 @@ func getHTTPSClient() *http.Client {
}

func createURL(listener *HTTPListenerV2, scheme string, path string, rawquery string) string {
var port int
if strings.HasPrefix(listener.ServiceAddress, "tcp://") {
port = listener.listener.Addr().(*net.TCPAddr).Port
}
u := url.URL{
Scheme: scheme,
Host: "localhost:" + strconv.Itoa(listener.Port),
Host: "localhost:" + strconv.Itoa(port),
Path: path,
RawQuery: rawquery,
}
Expand All @@ -134,7 +141,9 @@ func TestInvalidListenerConfig(t *testing.T) {
close: make(chan struct{}),
}

require.Error(t, listener.Init())
require.NoError(t, listener.Init())
acc := &testutil.Accumulator{}
require.Error(t, listener.Start(acc))

// Stop is called when any ServiceInput fails to start; it must succeed regardless of state
listener.Stop()
Expand Down Expand Up @@ -724,6 +733,37 @@ func TestServerHeaders(t *testing.T) {
require.Equal(t, "value", resp.Header.Get("key"))
}

func TestUnixSocket(t *testing.T) {
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
file, err := os.CreateTemp("", "*.socket")
require.NoError(t, err)
require.NoError(t, file.Close())
defer os.Remove(file.Name())
socketName := file.Name()
if runtime.GOOS == "windows" {
listener.ServiceAddress = "unix:///" + socketName
} else {
listener.ServiceAddress = "unix://" + socketName
}
listener.SocketMode = "777"
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()
httpc := http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", socketName)
},
},
}
resp, err := httpc.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBufferString(testMsg))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode)
}

func mustReadHugeMetric() []byte {
filePath := "testdata/huge_metric"
data, err := os.ReadFile(filePath)
Expand Down
15 changes: 13 additions & 2 deletions plugins/inputs/http_listener_v2/sample.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
# Generic HTTP write listener
[[inputs.http_listener_v2]]
## Address and port to host HTTP listener on
service_address = ":8080"
## Address to host HTTP listener on
## can be prefixed by protocol tcp, or unix if not provided defaults to tcp
## if unix network type provided it should be followed by absolute path for unix socket
service_address = "tcp://:8080"
## service_address = "tcp://:8443"
## service_address = "unix:///tmp/telegraf.sock"

## Permission for unix sockets (only available for unix sockets)
## This setting may not be respected by some platforms. To safely restrict
## permissions it is recommended to place the socket into a previously
## created directory with the desired permissions.
## ex: socket_mode = "777"
# socket_mode = ""

## Paths to listen to.
# paths = ["/telegraf"]
Expand Down

0 comments on commit 0b4f77d

Please sign in to comment.