From 73d4d363fc47508107a7cdf65f03917266e4c63e Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Mon, 18 Mar 2024 15:57:33 +0100 Subject: [PATCH] fix(inputs.knx_listener): Reconnect after connection loss (#14959) (cherry picked from commit 6afa18fd52205e480d91b40cec5fc7bd61ac871c) --- plugins/inputs/knx_listener/knx_listener.go | 36 ++++++--- .../inputs/knx_listener/knx_listener_test.go | 76 +++++++++++++++++-- 2 files changed, 95 insertions(+), 17 deletions(-) diff --git a/plugins/inputs/knx_listener/knx_listener.go b/plugins/inputs/knx_listener/knx_listener.go index 62a143af5e7f9..6dadb60856f47 100644 --- a/plugins/inputs/knx_listener/knx_listener.go +++ b/plugins/inputs/knx_listener/knx_listener.go @@ -3,9 +3,11 @@ package knx_listener import ( _ "embed" + "errors" "fmt" "reflect" "sync" + "sync/atomic" "github.com/vapourismo/knx-go/knx" "github.com/vapourismo/knx-go/knx/dpt" @@ -43,22 +45,27 @@ type KNXListener struct { gaTargetMap map[string]addressTarget gaLogbook map[string]bool - acc telegraf.Accumulator - wg sync.WaitGroup + wg sync.WaitGroup + connected atomic.Bool } func (*KNXListener) SampleConfig() string { return sampleConfig } -func (kl *KNXListener) Gather(_ telegraf.Accumulator) error { +func (kl *KNXListener) Gather(acc telegraf.Accumulator) error { + if !kl.connected.Load() { + // We got disconnected for some reason, so try to reconnect in every + // gather cycle until we are reconnected + if err := kl.Start(acc); err != nil { + return fmt.Errorf("reconnecting to bus failed: %w", err) + } + } + return nil } -func (kl *KNXListener) Start(acc telegraf.Accumulator) error { - // Store the accumulator for later use - kl.acc = acc - +func (kl *KNXListener) Init() error { // Setup a logbook to track unknown GAs to avoid log-spamming kl.gaLogbook = make(map[string]bool) @@ -80,6 +87,10 @@ func (kl *KNXListener) Start(acc telegraf.Accumulator) error { } } + return nil +} + +func (kl *KNXListener) Start(acc telegraf.Accumulator) error { // Connect to the KNX-IP interface kl.Log.Infof("Trying to connect to %q at %q", kl.ServiceType, kl.ServiceAddress) switch kl.ServiceType { @@ -112,12 +123,15 @@ func (kl *KNXListener) Start(acc telegraf.Accumulator) error { return fmt.Errorf("invalid interface type: %s", kl.ServiceAddress) } kl.Log.Infof("Connected!") + kl.connected.Store(true) // Listen to the KNX bus kl.wg.Add(1) go func() { - kl.wg.Done() - kl.listen() + defer kl.wg.Done() + kl.listen(acc) + kl.connected.Store(false) + acc.AddError(errors.New("disconnected from bus")) }() return nil @@ -130,7 +144,7 @@ func (kl *KNXListener) Stop() { } } -func (kl *KNXListener) listen() { +func (kl *KNXListener) listen(acc telegraf.Accumulator) { for msg := range kl.client.Inbound() { // Match GA to DataPointType and measurement name ga := msg.Destination.String() @@ -177,7 +191,7 @@ func (kl *KNXListener) listen() { "unit": target.datapoint.(dpt.DatapointMeta).Unit(), "source": msg.Source.String(), } - kl.acc.AddFields(target.measurement, fields, tags) + acc.AddFields(target.measurement, fields, tags) } } diff --git a/plugins/inputs/knx_listener/knx_listener_test.go b/plugins/inputs/knx_listener/knx_listener_test.go index ae82a985da157..afe52f8c9f835 100644 --- a/plugins/inputs/knx_listener/knx_listener_test.go +++ b/plugins/inputs/knx_listener/knx_listener_test.go @@ -36,13 +36,13 @@ func setValue(data dpt.DatapointValue, value interface{}) error { return nil } -type TestMessage struct { +type message struct { address string dpt string value interface{} } -func ProduceKnxEvent(t *testing.T, address string, datapoint string, value interface{}) *knx.GroupEvent { +func produceKnxEvent(t *testing.T, address string, datapoint string, value interface{}) *knx.GroupEvent { addr, err := cemi.NewGroupAddrString(address) require.NoError(t, err) @@ -60,7 +60,7 @@ func ProduceKnxEvent(t *testing.T, address string, datapoint string, value inter func TestRegularReceives_DPT(t *testing.T) { // Define the test-cases - var testcases = []TestMessage{ + var testcases = []message{ {"1/0/1", "1.001", true}, {"1/0/2", "1.002", false}, {"1/0/3", "1.003", true}, @@ -101,6 +101,7 @@ func TestRegularReceives_DPT(t *testing.T) { Measurements: measurements, Log: testutil.Logger{Name: "knx_listener"}, } + require.NoError(t, listener.Init()) // Setup the listener to test err := listener.Start(acc) @@ -111,7 +112,7 @@ func TestRegularReceives_DPT(t *testing.T) { // Send the defined test data for _, testcase := range testcases { - event := ProduceKnxEvent(t, testcase.address, testcase.dpt, testcase.value) + event := produceKnxEvent(t, testcase.address, testcase.dpt, testcase.value) client.Send(*event) } @@ -147,6 +148,7 @@ func TestRegularReceives_MultipleMessages(t *testing.T) { }, Log: testutil.Logger{Name: "knx_listener"}, } + require.NoError(t, listener.Init()) acc := &testutil.Accumulator{} @@ -155,7 +157,7 @@ func TestRegularReceives_MultipleMessages(t *testing.T) { require.NoError(t, err) client := listener.client.(*KNXDummyInterface) - testMessages := []TestMessage{ + testMessages := []message{ {"1/1/1", "1.001", true}, {"1/1/1", "1.001", false}, {"1/1/2", "1.001", false}, @@ -163,7 +165,7 @@ func TestRegularReceives_MultipleMessages(t *testing.T) { } for _, testcase := range testMessages { - event := ProduceKnxEvent(t, testcase.address, testcase.dpt, testcase.value) + event := produceKnxEvent(t, testcase.address, testcase.dpt, testcase.value) client.Send(*event) } @@ -190,3 +192,65 @@ func TestRegularReceives_MultipleMessages(t *testing.T) { require.Truef(t, ok, "bool type expected, got '%T' with '%v' value instead", acc.Metrics[1].Fields["value"], acc.Metrics[1].Fields["value"]) require.False(t, v) } + +func TestReconnect(t *testing.T) { + listener := KNXListener{ + ServiceType: "dummy", + Measurements: []Measurement{ + {"temperature", "1.001", []string{"1/1/1"}}, + }, + Log: testutil.Logger{Name: "knx_listener"}, + } + require.NoError(t, listener.Init()) + + var acc testutil.Accumulator + + // Setup the listener to test + require.NoError(t, listener.Start(&acc)) + defer listener.Stop() + client := listener.client.(*KNXDummyInterface) + + testMessages := []message{ + {"1/1/1", "1.001", true}, + {"1/1/1", "1.001", false}, + {"1/1/2", "1.001", false}, + {"1/1/2", "1.001", true}, + } + + for _, testcase := range testMessages { + event := produceKnxEvent(t, testcase.address, testcase.dpt, testcase.value) + client.Send(*event) + } + + // Give the accumulator some time to collect the data + require.Eventuallyf(t, func() bool { + return acc.NMetrics() >= 2 + }, 3*time.Second, 100*time.Millisecond, "expected 2 metric but got %d", acc.NMetrics()) + require.True(t, listener.connected.Load()) + + client.Close() + + require.Eventually(t, func() bool { + return !listener.connected.Load() + }, 3*time.Second, 100*time.Millisecond, "no disconnect") + acc.Lock() + err := acc.FirstError() + acc.Unlock() + require.ErrorContains(t, err, "disconnected from bus") + + require.NoError(t, listener.Gather(&acc)) + require.Eventually(t, func() bool { + return listener.connected.Load() + }, 3*time.Second, 100*time.Millisecond, "no reconnect") + client = listener.client.(*KNXDummyInterface) + + for _, testcase := range testMessages { + event := produceKnxEvent(t, testcase.address, testcase.dpt, testcase.value) + client.Send(*event) + } + + // Give the accumulator some time to collect the data + require.Eventuallyf(t, func() bool { + return acc.NMetrics() >= 2 + }, 3*time.Second, 100*time.Millisecond, "expected 2 metric but got %d", acc.NMetrics()) +}