diff --git a/filebeat/docs/inputs/input-common-unix-options.asciidoc b/filebeat/docs/inputs/input-common-unix-options.asciidoc index 443fe761274..f73278944a6 100644 --- a/filebeat/docs/inputs/input-common-unix-options.asciidoc +++ b/filebeat/docs/inputs/input-common-unix-options.asciidoc @@ -16,6 +16,22 @@ The maximum size of the message received over the socket. The default is `20MiB` The path to the Unix socket that will receive event streams. +[float] +[id="{beatname_lc}-input-{type}-unix-group"] +==== `group` + +The group ownership of the Unix socket that will be created by Filebeat. +The default is the primary group name for the user Filebeat is running as. +This option is ignored on Windows. + +[float] +[id="{beatname_lc}-input-{type}-unix-mode"] +==== `mode` + +The file mode of the Unix socket that will be created by Filebeat. This is +expected to be a file mode as an octal string. The default value is the system +default (generally `0755`). + [float] [id="{beatname_lc}-input-{type}-unix-line-delimiter"] ==== `line_delimiter` diff --git a/filebeat/docs/inputs/input-syslog.asciidoc b/filebeat/docs/inputs/input-syslog.asciidoc index 0c360a03f7f..f9a24c04b81 100644 --- a/filebeat/docs/inputs/input-syslog.asciidoc +++ b/filebeat/docs/inputs/input-syslog.asciidoc @@ -51,6 +51,8 @@ include::../inputs/input-common-tcp-options.asciidoc[] ===== Protocol `unix`: +beta[] + include::../inputs/input-common-unix-options.asciidoc[] [id="{beatname_lc}-input-{type}-common-options"] diff --git a/filebeat/docs/inputs/input-unix.asciidoc b/filebeat/docs/inputs/input-unix.asciidoc new file mode 100644 index 00000000000..a2f445159b7 --- /dev/null +++ b/filebeat/docs/inputs/input-unix.asciidoc @@ -0,0 +1,35 @@ +:type: unix + +[id="{beatname_lc}-input-{type}"] +=== Unix input + +beta[] + +++++ +Unix +++++ + +Use the `unix` input to read events over a stream-oriented Unix domain socket. + +Example configuration: + +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: unix + max_message_size: 10MiB + path: "/var/run/filebeat.sock" +---- + + +==== Configuration options + +The `unix` input supports the following configuration options plus the +<<{beatname_lc}-input-{type}-common-options>> described later. + +include::../inputs/input-common-unix-options.asciidoc[] + +[id="{beatname_lc}-input-{type}-common-options"] +include::../inputs/input-common-options.asciidoc[] + +:type!: diff --git a/filebeat/input/syslog/config.go b/filebeat/input/syslog/config.go index 5b6ac1452b4..ff009bfb1dd 100644 --- a/filebeat/input/syslog/config.go +++ b/filebeat/input/syslog/config.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/filebeat/inputsource/udp" "github.com/elastic/beats/v7/filebeat/inputsource/unix" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -98,6 +99,8 @@ func factory( return tcp.New(&config.Config, factory) case unix.Name: + cfgwarn.Beta("Syslog Unix socket support is beta.") + config := defaultUnix if err := cfg.Unpack(&config); err != nil { return nil, err diff --git a/filebeat/input/unix/input.go b/filebeat/input/unix/input.go index 12c091f00da..19609cb5ab8 100644 --- a/filebeat/input/unix/input.go +++ b/filebeat/input/unix/input.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/filebeat/inputsource/unix" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -56,6 +57,7 @@ func NewInput( connector channel.Connector, context input.Context, ) (input.Input, error) { + cfgwarn.Beta("Unix socket support is beta.") out, err := connector.ConnectWith(cfg, beat.ClientConfig{ Processing: beat.ProcessingConfig{ diff --git a/filebeat/inputsource/unix/config.go b/filebeat/inputsource/unix/config.go index 79b2a43dd08..5051ab86e75 100644 --- a/filebeat/inputsource/unix/config.go +++ b/filebeat/inputsource/unix/config.go @@ -30,6 +30,8 @@ const Name = "unix" // Config exposes the unix configuration. type Config struct { Path string `config:"path"` + Group *string `config:"group"` + Mode *string `config:"mode"` Timeout time.Duration `config:"timeout" validate:"nonzero,positive"` MaxMessageSize cfgtype.ByteSize `config:"max_message_size" validate:"nonzero,positive"` MaxConnections int `config:"max_connections"` diff --git a/filebeat/inputsource/unix/server.go b/filebeat/inputsource/unix/server.go index 965a3300282..ee9a0f4564d 100644 --- a/filebeat/inputsource/unix/server.go +++ b/filebeat/inputsource/unix/server.go @@ -20,10 +20,16 @@ package unix import ( "fmt" "net" + "os" + "os/user" + "runtime" + "strconv" + "github.com/pkg/errors" "golang.org/x/net/netutil" "github.com/elastic/beats/v7/filebeat/inputsource/common" + "github.com/elastic/beats/v7/libbeat/logp" ) // Server represent a unix server @@ -55,13 +61,91 @@ func New( } func (s *Server) createServer() (net.Listener, error) { + if err := s.cleanupStaleSocket(); err != nil { + return nil, err + } + l, err := net.Listen("unix", s.config.Path) if err != nil { return nil, err } + if err := s.setSocketOwnership(); err != nil { + return nil, err + } + + if err := s.setSocketMode(); err != nil { + return nil, err + } + if s.config.MaxConnections > 0 { return netutil.LimitListener(l, s.config.MaxConnections), nil } return l, nil } + +func (s *Server) cleanupStaleSocket() error { + path := s.config.Path + info, err := os.Lstat(path) + if err != nil { + // If the file does not exist, then the cleanup can be considered successful. + if os.IsNotExist(err) { + return nil + } + return errors.Wrapf(err, "cannot lstat unix socket file at location %s", path) + } + + if runtime.GOOS != "windows" { + // see https://github.com/golang/go/issues/33357 for context on Windows socket file attributes bug + if info.Mode()&os.ModeSocket == 0 { + return fmt.Errorf("refusing to remove file at location %s, it is not a socket", path) + } + } + + if err := os.Remove(path); err != nil { + return errors.Wrapf(err, "cannot remove existing unix socket file at location %s", path) + } + + return nil +} + +func (s *Server) setSocketOwnership() error { + if s.config.Group != nil { + if runtime.GOOS == "windows" { + logp.NewLogger("unix").Warn("windows does not support the 'group' configuration option, ignoring") + return nil + } + g, err := user.LookupGroup(*s.config.Group) + if err != nil { + return err + } + gid, err := strconv.Atoi(g.Gid) + if err != nil { + return err + } + return os.Chown(s.config.Path, -1, gid) + } + return nil +} + +func (s *Server) setSocketMode() error { + if s.config.Mode != nil { + mode, err := parseFileMode(*s.config.Mode) + if err != nil { + return err + } + return os.Chmod(s.config.Path, mode) + } + return nil +} + +func parseFileMode(mode string) (os.FileMode, error) { + parsed, err := strconv.ParseUint(mode, 8, 32) + if err != nil { + return 0, err + } + if parsed > 0777 { + return 0, errors.New("invalid file mode") + } + return os.FileMode(parsed), nil +} diff --git a/filebeat/inputsource/unix/server_test.go b/filebeat/inputsource/unix/server_test.go index 36e75c757e9..a9043d14a8e 100644 --- a/filebeat/inputsource/unix/server_test.go +++ b/filebeat/inputsource/unix/server_test.go @@ -23,7 +23,10 @@ import ( "math/rand" "net" "os" + "os/user" "path/filepath" + "runtime" + "strconv" "strings" "testing" "time" @@ -35,6 +38,7 @@ import ( "github.com/elastic/beats/v7/filebeat/inputsource" netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/file" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -207,6 +211,92 @@ func TestReceiveEventsAndMetadata(t *testing.T) { } } +func TestSocketOwnershipAndMode(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("changing socket ownership is only supported on non-windows") + return + } + + groups, err := os.Getgroups() + require.NoError(t, err) + + if len(groups) <= 1 { + t.Skip("no group that we can change to") + return + } + + group, err := user.LookupGroupId(strconv.Itoa(groups[1])) + require.NoError(t, err) + + path := filepath.Join(os.TempDir(), "test.sock") + cfg, _ := common.NewConfigFrom(map[string]interface{}{ + "path": path, + "group": group.Name, + "mode": "0740", + }) + config := defaultConfig + err = cfg.Unpack(&config) + require.NoError(t, err) + + factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logp.NewLogger("test"), MetadataCallback, nil, netcommon.SplitFunc([]byte("\n"))) + server, err := New(&config, factory) + require.NoError(t, err) + err = server.Start() + require.NoError(t, err) + defer server.Stop() + + info, err := file.Lstat(path) + require.NoError(t, err) + require.NotEqual(t, 0, info.Mode()&os.ModeSocket) + require.Equal(t, os.FileMode(0740), info.Mode().Perm()) + gid, err := info.GID() + require.NoError(t, err) + require.Equal(t, group.Gid, strconv.Itoa(gid)) +} + +func TestSocketCleanup(t *testing.T) { + path := filepath.Join(os.TempDir(), "test.sock") + mockStaleSocket, err := net.Listen("unix", path) + require.NoError(t, err) + defer mockStaleSocket.Close() + + cfg, _ := common.NewConfigFrom(map[string]interface{}{ + "path": path, + }) + config := defaultConfig + require.NoError(t, cfg.Unpack(&config)) + factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logp.NewLogger("test"), MetadataCallback, nil, netcommon.SplitFunc([]byte("\n"))) + server, err := New(&config, factory) + require.NoError(t, err) + err = server.Start() + require.NoError(t, err) + server.Stop() +} + +func TestSocketCleanupRefusal(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("skipping due to windows FileAttributes bug https://github.com/golang/go/issues/33357") + return + } + path := filepath.Join(os.TempDir(), "test.sock") + f, err := os.Create(path) + require.NoError(t, err) + require.NoError(t, f.Close()) + defer os.Remove(path) + + cfg, _ := common.NewConfigFrom(map[string]interface{}{ + "path": path, + }) + config := defaultConfig + require.NoError(t, cfg.Unpack(&config)) + factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logp.NewLogger("test"), MetadataCallback, nil, netcommon.SplitFunc([]byte("\n"))) + server, err := New(&config, factory) + require.NoError(t, err) + err = server.Start() + require.Error(t, err) + require.Contains(t, err.Error(), "refusing to remove file at location") +} + func TestReceiveNewEventsConcurrently(t *testing.T) { workers := 4 eventsCount := 100 diff --git a/libbeat/common/seccomp/policy_linux_386.go b/libbeat/common/seccomp/policy_linux_386.go index 76b24714cac..acbc69ddd1f 100644 --- a/libbeat/common/seccomp/policy_linux_386.go +++ b/libbeat/common/seccomp/policy_linux_386.go @@ -32,6 +32,7 @@ func init() { "access", "brk", "chmod", + "chown", "clock_gettime", "clone", "close", diff --git a/libbeat/common/seccomp/policy_linux_amd64.go b/libbeat/common/seccomp/policy_linux_amd64.go index 92b5fbe488a..bf1e4bc31c5 100644 --- a/libbeat/common/seccomp/policy_linux_amd64.go +++ b/libbeat/common/seccomp/policy_linux_amd64.go @@ -35,6 +35,7 @@ func init() { "bind", "brk", "chmod", + "chown", "clock_gettime", "clone", "close",