From 085acb23a9a11c34bf6fd941ac94d2b9d2e137ab Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Thu, 25 Jul 2024 21:11:37 +0200 Subject: [PATCH] feat(inputs.s7comm): Implement startup-error behavior settings (#15655) --- plugins/inputs/s7comm/README.md | 14 ++ plugins/inputs/s7comm/s7comm.go | 8 +- plugins/inputs/s7comm/s7comm_test.go | 241 +++++++++++++++++++++++---- 3 files changed, 230 insertions(+), 33 deletions(-) diff --git a/plugins/inputs/s7comm/README.md b/plugins/inputs/s7comm/README.md index 0c3fc43bc58c4..d15065bdf7bcd 100644 --- a/plugins/inputs/s7comm/README.md +++ b/plugins/inputs/s7comm/README.md @@ -11,6 +11,20 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. [CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins +## Startup error behavior options + +In addition to the plugin-specific and global configuration settings the plugin +supports options for specifying the behavior when experiencing startup errors +using the `startup_error_behavior` setting. Available values are: + +- `error`: Telegraf with stop and exit in case of startup errors. This is the + default behavior. +- `ignore`: Telegraf will ignore startup errors for this plugin and disables it + but continues processing for all other plugins. +- `retry`: Telegraf will try to startup the plugin in every gather or write + cycle in case of startup errors. The plugin is disabled until + the startup succeeds. + ## Configuration ```toml @sample.conf diff --git a/plugins/inputs/s7comm/s7comm.go b/plugins/inputs/s7comm/s7comm.go index bb40e33f98724..f11d27cdca1e6 100644 --- a/plugins/inputs/s7comm/s7comm.go +++ b/plugins/inputs/s7comm/s7comm.go @@ -18,6 +18,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -143,7 +144,7 @@ func (s *S7comm) Init() error { s.handler = gos7.NewTCPClientHandlerWithConnectType(s.Server, s.Rack, s.Slot, connectionTypeMap[s.ConnectionType]) s.handler.Timeout = time.Duration(s.Timeout) if s.DebugConnection { - s.handler.Logger = log.New(os.Stderr, "D! [inputs.s7comm]", log.LstdFlags) + s.handler.Logger = log.New(os.Stderr, "D! [inputs.s7comm] ", log.LstdFlags) } // Create the requests @@ -154,7 +155,10 @@ func (s *S7comm) Init() error { func (s *S7comm) Start(_ telegraf.Accumulator) error { s.Log.Debugf("Connecting to %q...", s.Server) if err := s.handler.Connect(); err != nil { - return fmt.Errorf("connecting to %q failed: %w", s.Server, err) + return &internal.StartupError{ + Err: fmt.Errorf("connecting to %q failed: %w", s.Server, err), + Retry: true, + } } s.client = gos7.NewClient(s.handler) diff --git a/plugins/inputs/s7comm/s7comm_test.go b/plugins/inputs/s7comm/s7comm_test.go index 438a02c6dd9c1..32109602d5c6c 100644 --- a/plugins/inputs/s7comm/s7comm_test.go +++ b/plugins/inputs/s7comm/s7comm_test.go @@ -7,10 +7,14 @@ import ( "net" "sync/atomic" "testing" + "time" "github.com/robinson/gos7" "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/testutil" ) @@ -707,20 +711,220 @@ func TestMetricCollisions(t *testing.T) { func TestConnectionLoss(t *testing.T) { // Create fake S7 comm server that can accept connects - listener, err := net.Listen("tcp", "127.0.0.1:0") + server, err := NewMockServer("127.0.0.1:0") require.NoError(t, err) - defer listener.Close() + defer server.Close() + require.NoError(t, server.Start()) - var connectionAttempts uint32 + // Create the plugin and attempt a connection + plugin := &S7comm{ + Server: server.Addr(), + Rack: 0, + Slot: 2, + DebugConnection: true, + Timeout: config.Duration(100 * time.Millisecond), + Configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB1.W2", + }, + }, + }, + }, + Log: &testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + require.NoError(t, plugin.Gather(&acc)) + require.NoError(t, plugin.Gather(&acc)) + plugin.Stop() + server.Close() + + require.Equal(t, uint32(3), server.ConnectionAttempts.Load()) +} + +func TestStartupErrorBehaviorError(t *testing.T) { + // Create fake S7 comm server that can accept connects + server, err := NewMockServer("127.0.0.1:0") + require.NoError(t, err) + defer server.Close() + + // Setup the plugin and the model to be able to use the startup retry strategy + plugin := &S7comm{ + Server: server.Addr(), + Rack: 0, + Slot: 2, + DebugConnection: true, + Timeout: config.Duration(100 * time.Millisecond), + Configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB1.W2", + }, + }, + }, + }, + Log: &testutil.Logger{}, + } + model := models.NewRunningInput( + plugin, + &models.InputConfig{ + Name: "s7comm", + Alias: "error-test", // required to get a unique error stats instance + }, + ) + model.StartupErrors.Set(0) + require.NoError(t, model.Init()) + + // Starting the plugin will fail with an error because the server does not listen + var acc testutil.Accumulator + require.ErrorContains(t, model.Start(&acc), "connecting to \""+server.Addr()+"\" failed") +} + +func TestStartupErrorBehaviorIgnore(t *testing.T) { + // Create fake S7 comm server that can accept connects + server, err := NewMockServer("127.0.0.1:0") + require.NoError(t, err) + defer server.Close() + + // Setup the plugin and the model to be able to use the startup retry strategy + plugin := &S7comm{ + Server: server.Addr(), + Rack: 0, + Slot: 2, + DebugConnection: true, + Timeout: config.Duration(100 * time.Millisecond), + Configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB1.W2", + }, + }, + }, + }, + Log: &testutil.Logger{}, + } + model := models.NewRunningInput( + plugin, + &models.InputConfig{ + Name: "s7comm", + Alias: "ignore-test", // required to get a unique error stats instance + StartupErrorBehavior: "ignore", + }, + ) + model.StartupErrors.Set(0) + require.NoError(t, model.Init()) + + // Starting the plugin will fail because the server does not accept connections. + // The model code should convert it to a fatal error for the agent to remove + // the plugin. + var acc testutil.Accumulator + err = model.Start(&acc) + require.ErrorContains(t, err, "connecting to \""+server.Addr()+"\" failed") + var fatalErr *internal.FatalError + require.ErrorAs(t, err, &fatalErr) +} + +func TestStartupErrorBehaviorRetry(t *testing.T) { + // Create fake S7 comm server that can accept connects + server, err := NewMockServer("127.0.0.1:0") + require.NoError(t, err) + defer server.Close() + + // Setup the plugin and the model to be able to use the startup retry strategy + plugin := &S7comm{ + Server: server.Addr(), + Rack: 0, + Slot: 2, + DebugConnection: true, + Timeout: config.Duration(100 * time.Millisecond), + Configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB1.W2", + }, + }, + }, + }, + Log: &testutil.Logger{}, + } + model := models.NewRunningInput( + plugin, + &models.InputConfig{ + Name: "s7comm", + Alias: "retry-test", // required to get a unique error stats instance + StartupErrorBehavior: "retry", + }, + ) + model.StartupErrors.Set(0) + require.NoError(t, model.Init()) + + // Starting the plugin will return no error because the plugin will + // retry to connect in every gather cycle. + var acc testutil.Accumulator + require.NoError(t, model.Start(&acc)) + + // The gather should fail as the server does not accept connections (yet) + require.Empty(t, acc.GetTelegrafMetrics()) + require.ErrorIs(t, model.Gather(&acc), internal.ErrNotConnected) + require.Equal(t, int64(2), model.StartupErrors.Get()) + + // Allow connection in the server, now the connection should succeed + require.NoError(t, server.Start()) + defer model.Stop() + require.NoError(t, model.Gather(&acc)) +} + +type MockServer struct { + ConnectionAttempts atomic.Uint32 + + listener net.Listener +} + +func NewMockServer(addr string) (*MockServer, error) { + l, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + return &MockServer{listener: l}, nil +} + +func (s *MockServer) Addr() string { + return s.listener.Addr().String() +} + +func (s *MockServer) Close() error { + if s.listener != nil { + return s.listener.Close() + } + return nil +} + +func (s *MockServer) Start() error { go func() { + defer s.listener.Close() for { - conn, err := listener.Accept() + conn, err := s.listener.Accept() if err != nil { return } + if err := conn.SetDeadline(time.Now().Add(time.Second)); err != nil { + conn.Close() + return + } // Count the number of connection attempts - atomic.AddUint32(&connectionAttempts, 1) + s.ConnectionAttempts.Add(1) buf := make([]byte, 4096) @@ -757,31 +961,6 @@ func TestConnectionLoss(t *testing.T) { conn.Close() } }() - plugin := &S7comm{ - Server: listener.Addr().String(), - Rack: 0, - Slot: 2, - DebugConnection: true, - Configs: []metricDefinition{ - { - Fields: []metricFieldDefinition{ - { - Name: "foo", - Address: "DB1.W2", - }, - }, - }, - }, - Log: &testutil.Logger{}, - } - require.NoError(t, plugin.Init()) - - var acc testutil.Accumulator - require.NoError(t, plugin.Start(&acc)) - require.NoError(t, plugin.Gather(&acc)) - require.NoError(t, plugin.Gather(&acc)) - plugin.Stop() - listener.Close() - require.Equal(t, 3, int(atomic.LoadUint32(&connectionAttempts))) + return nil }