Skip to content

Commit

Permalink
feat(inputs.s7comm): Implement startup-error behavior settings (#15655)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed Jul 25, 2024
1 parent 777154b commit 085acb2
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 33 deletions.
14 changes: 14 additions & 0 deletions plugins/inputs/s7comm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins

## Startup error behavior options <!-- @/docs/includes/startup_error_behavior.md -->

In addition to the plugin-specific and global configuration settings the plugin
supports options for specifying the behavior when experiencing startup errors
using the `startup_error_behavior` setting. Available values are:

- `error`: Telegraf with stop and exit in case of startup errors. This is the
default behavior.
- `ignore`: Telegraf will ignore startup errors for this plugin and disables it
but continues processing for all other plugins.
- `retry`: Telegraf will try to startup the plugin in every gather or write
cycle in case of startup errors. The plugin is disabled until
the startup succeeds.

## Configuration

```toml @sample.conf
Expand Down
8 changes: 6 additions & 2 deletions plugins/inputs/s7comm/s7comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/inputs"
)
Expand Down Expand Up @@ -143,7 +144,7 @@ func (s *S7comm) Init() error {
s.handler = gos7.NewTCPClientHandlerWithConnectType(s.Server, s.Rack, s.Slot, connectionTypeMap[s.ConnectionType])
s.handler.Timeout = time.Duration(s.Timeout)
if s.DebugConnection {
s.handler.Logger = log.New(os.Stderr, "D! [inputs.s7comm]", log.LstdFlags)
s.handler.Logger = log.New(os.Stderr, "D! [inputs.s7comm] ", log.LstdFlags)
}

// Create the requests
Expand All @@ -154,7 +155,10 @@ func (s *S7comm) Init() error {
func (s *S7comm) Start(_ telegraf.Accumulator) error {
s.Log.Debugf("Connecting to %q...", s.Server)
if err := s.handler.Connect(); err != nil {
return fmt.Errorf("connecting to %q failed: %w", s.Server, err)
return &internal.StartupError{
Err: fmt.Errorf("connecting to %q failed: %w", s.Server, err),
Retry: true,
}
}
s.client = gos7.NewClient(s.handler)

Expand Down
241 changes: 210 additions & 31 deletions plugins/inputs/s7comm/s7comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import (
"net"
"sync/atomic"
"testing"
"time"

"github.com/robinson/gos7"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/testutil"
)

Expand Down Expand Up @@ -707,20 +711,220 @@ func TestMetricCollisions(t *testing.T) {

func TestConnectionLoss(t *testing.T) {
// Create fake S7 comm server that can accept connects
listener, err := net.Listen("tcp", "127.0.0.1:0")
server, err := NewMockServer("127.0.0.1:0")
require.NoError(t, err)
defer listener.Close()
defer server.Close()
require.NoError(t, server.Start())

var connectionAttempts uint32
// Create the plugin and attempt a connection
plugin := &S7comm{
Server: server.Addr(),
Rack: 0,
Slot: 2,
DebugConnection: true,
Timeout: config.Duration(100 * time.Millisecond),
Configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.W2",
},
},
},
},
Log: &testutil.Logger{},
}
require.NoError(t, plugin.Init())

var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
require.NoError(t, plugin.Gather(&acc))
require.NoError(t, plugin.Gather(&acc))
plugin.Stop()
server.Close()

require.Equal(t, uint32(3), server.ConnectionAttempts.Load())
}

func TestStartupErrorBehaviorError(t *testing.T) {
// Create fake S7 comm server that can accept connects
server, err := NewMockServer("127.0.0.1:0")
require.NoError(t, err)
defer server.Close()

// Setup the plugin and the model to be able to use the startup retry strategy
plugin := &S7comm{
Server: server.Addr(),
Rack: 0,
Slot: 2,
DebugConnection: true,
Timeout: config.Duration(100 * time.Millisecond),
Configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.W2",
},
},
},
},
Log: &testutil.Logger{},
}
model := models.NewRunningInput(
plugin,
&models.InputConfig{
Name: "s7comm",
Alias: "error-test", // required to get a unique error stats instance
},
)
model.StartupErrors.Set(0)
require.NoError(t, model.Init())

// Starting the plugin will fail with an error because the server does not listen
var acc testutil.Accumulator
require.ErrorContains(t, model.Start(&acc), "connecting to \""+server.Addr()+"\" failed")
}

func TestStartupErrorBehaviorIgnore(t *testing.T) {
// Create fake S7 comm server that can accept connects
server, err := NewMockServer("127.0.0.1:0")
require.NoError(t, err)
defer server.Close()

// Setup the plugin and the model to be able to use the startup retry strategy
plugin := &S7comm{
Server: server.Addr(),
Rack: 0,
Slot: 2,
DebugConnection: true,
Timeout: config.Duration(100 * time.Millisecond),
Configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.W2",
},
},
},
},
Log: &testutil.Logger{},
}
model := models.NewRunningInput(
plugin,
&models.InputConfig{
Name: "s7comm",
Alias: "ignore-test", // required to get a unique error stats instance
StartupErrorBehavior: "ignore",
},
)
model.StartupErrors.Set(0)
require.NoError(t, model.Init())

// Starting the plugin will fail because the server does not accept connections.
// The model code should convert it to a fatal error for the agent to remove
// the plugin.
var acc testutil.Accumulator
err = model.Start(&acc)
require.ErrorContains(t, err, "connecting to \""+server.Addr()+"\" failed")
var fatalErr *internal.FatalError
require.ErrorAs(t, err, &fatalErr)
}

func TestStartupErrorBehaviorRetry(t *testing.T) {
// Create fake S7 comm server that can accept connects
server, err := NewMockServer("127.0.0.1:0")
require.NoError(t, err)
defer server.Close()

// Setup the plugin and the model to be able to use the startup retry strategy
plugin := &S7comm{
Server: server.Addr(),
Rack: 0,
Slot: 2,
DebugConnection: true,
Timeout: config.Duration(100 * time.Millisecond),
Configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.W2",
},
},
},
},
Log: &testutil.Logger{},
}
model := models.NewRunningInput(
plugin,
&models.InputConfig{
Name: "s7comm",
Alias: "retry-test", // required to get a unique error stats instance
StartupErrorBehavior: "retry",
},
)
model.StartupErrors.Set(0)
require.NoError(t, model.Init())

// Starting the plugin will return no error because the plugin will
// retry to connect in every gather cycle.
var acc testutil.Accumulator
require.NoError(t, model.Start(&acc))

// The gather should fail as the server does not accept connections (yet)
require.Empty(t, acc.GetTelegrafMetrics())
require.ErrorIs(t, model.Gather(&acc), internal.ErrNotConnected)
require.Equal(t, int64(2), model.StartupErrors.Get())

// Allow connection in the server, now the connection should succeed
require.NoError(t, server.Start())
defer model.Stop()
require.NoError(t, model.Gather(&acc))
}

type MockServer struct {
ConnectionAttempts atomic.Uint32

listener net.Listener
}

func NewMockServer(addr string) (*MockServer, error) {
l, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
return &MockServer{listener: l}, nil
}

func (s *MockServer) Addr() string {
return s.listener.Addr().String()
}

func (s *MockServer) Close() error {
if s.listener != nil {
return s.listener.Close()
}
return nil
}

func (s *MockServer) Start() error {
go func() {
defer s.listener.Close()
for {
conn, err := listener.Accept()
conn, err := s.listener.Accept()
if err != nil {
return
}
if err := conn.SetDeadline(time.Now().Add(time.Second)); err != nil {
conn.Close()
return
}

// Count the number of connection attempts
atomic.AddUint32(&connectionAttempts, 1)
s.ConnectionAttempts.Add(1)

buf := make([]byte, 4096)

Expand Down Expand Up @@ -757,31 +961,6 @@ func TestConnectionLoss(t *testing.T) {
conn.Close()
}
}()
plugin := &S7comm{
Server: listener.Addr().String(),
Rack: 0,
Slot: 2,
DebugConnection: true,
Configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.W2",
},
},
},
},
Log: &testutil.Logger{},
}
require.NoError(t, plugin.Init())

var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
require.NoError(t, plugin.Gather(&acc))
require.NoError(t, plugin.Gather(&acc))
plugin.Stop()
listener.Close()

require.Equal(t, 3, int(atomic.LoadUint32(&connectionAttempts)))
return nil
}

0 comments on commit 085acb2

Please sign in to comment.