Skip to content

Commit

Permalink
fix(inputs.knx_listener): Reconnect after connection loss (#14959)
Browse files Browse the repository at this point in the history
(cherry picked from commit 6afa18f)
  • Loading branch information
srebhan authored and powersj committed Apr 1, 2024
1 parent 2198b55 commit 73d4d36
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 17 deletions.
36 changes: 25 additions & 11 deletions plugins/inputs/knx_listener/knx_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package knx_listener

import (
_ "embed"
"errors"
"fmt"
"reflect"
"sync"
"sync/atomic"

"github.com/vapourismo/knx-go/knx"
"github.com/vapourismo/knx-go/knx/dpt"
Expand Down Expand Up @@ -43,22 +45,27 @@ type KNXListener struct {
gaTargetMap map[string]addressTarget
gaLogbook map[string]bool

acc telegraf.Accumulator
wg sync.WaitGroup
wg sync.WaitGroup
connected atomic.Bool
}

func (*KNXListener) SampleConfig() string {
return sampleConfig
}

func (kl *KNXListener) Gather(_ telegraf.Accumulator) error {
func (kl *KNXListener) Gather(acc telegraf.Accumulator) error {
if !kl.connected.Load() {
// We got disconnected for some reason, so try to reconnect in every
// gather cycle until we are reconnected
if err := kl.Start(acc); err != nil {
return fmt.Errorf("reconnecting to bus failed: %w", err)
}
}

return nil
}

func (kl *KNXListener) Start(acc telegraf.Accumulator) error {
// Store the accumulator for later use
kl.acc = acc

func (kl *KNXListener) Init() error {
// Setup a logbook to track unknown GAs to avoid log-spamming
kl.gaLogbook = make(map[string]bool)

Expand All @@ -80,6 +87,10 @@ func (kl *KNXListener) Start(acc telegraf.Accumulator) error {
}
}

return nil
}

func (kl *KNXListener) Start(acc telegraf.Accumulator) error {
// Connect to the KNX-IP interface
kl.Log.Infof("Trying to connect to %q at %q", kl.ServiceType, kl.ServiceAddress)
switch kl.ServiceType {
Expand Down Expand Up @@ -112,12 +123,15 @@ func (kl *KNXListener) Start(acc telegraf.Accumulator) error {
return fmt.Errorf("invalid interface type: %s", kl.ServiceAddress)
}
kl.Log.Infof("Connected!")
kl.connected.Store(true)

// Listen to the KNX bus
kl.wg.Add(1)
go func() {
kl.wg.Done()
kl.listen()
defer kl.wg.Done()
kl.listen(acc)
kl.connected.Store(false)
acc.AddError(errors.New("disconnected from bus"))
}()

return nil
Expand All @@ -130,7 +144,7 @@ func (kl *KNXListener) Stop() {
}
}

func (kl *KNXListener) listen() {
func (kl *KNXListener) listen(acc telegraf.Accumulator) {
for msg := range kl.client.Inbound() {
// Match GA to DataPointType and measurement name
ga := msg.Destination.String()
Expand Down Expand Up @@ -177,7 +191,7 @@ func (kl *KNXListener) listen() {
"unit": target.datapoint.(dpt.DatapointMeta).Unit(),
"source": msg.Source.String(),
}
kl.acc.AddFields(target.measurement, fields, tags)
acc.AddFields(target.measurement, fields, tags)
}
}

Expand Down
76 changes: 70 additions & 6 deletions plugins/inputs/knx_listener/knx_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ func setValue(data dpt.DatapointValue, value interface{}) error {
return nil
}

type TestMessage struct {
type message struct {
address string
dpt string
value interface{}
}

func ProduceKnxEvent(t *testing.T, address string, datapoint string, value interface{}) *knx.GroupEvent {
func produceKnxEvent(t *testing.T, address string, datapoint string, value interface{}) *knx.GroupEvent {
addr, err := cemi.NewGroupAddrString(address)
require.NoError(t, err)

Expand All @@ -60,7 +60,7 @@ func ProduceKnxEvent(t *testing.T, address string, datapoint string, value inter

func TestRegularReceives_DPT(t *testing.T) {
// Define the test-cases
var testcases = []TestMessage{
var testcases = []message{
{"1/0/1", "1.001", true},
{"1/0/2", "1.002", false},
{"1/0/3", "1.003", true},
Expand Down Expand Up @@ -101,6 +101,7 @@ func TestRegularReceives_DPT(t *testing.T) {
Measurements: measurements,
Log: testutil.Logger{Name: "knx_listener"},
}
require.NoError(t, listener.Init())

// Setup the listener to test
err := listener.Start(acc)
Expand All @@ -111,7 +112,7 @@ func TestRegularReceives_DPT(t *testing.T) {

// Send the defined test data
for _, testcase := range testcases {
event := ProduceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)
event := produceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)
client.Send(*event)
}

Expand Down Expand Up @@ -147,6 +148,7 @@ func TestRegularReceives_MultipleMessages(t *testing.T) {
},
Log: testutil.Logger{Name: "knx_listener"},
}
require.NoError(t, listener.Init())

acc := &testutil.Accumulator{}

Expand All @@ -155,15 +157,15 @@ func TestRegularReceives_MultipleMessages(t *testing.T) {
require.NoError(t, err)
client := listener.client.(*KNXDummyInterface)

testMessages := []TestMessage{
testMessages := []message{
{"1/1/1", "1.001", true},
{"1/1/1", "1.001", false},
{"1/1/2", "1.001", false},
{"1/1/2", "1.001", true},
}

for _, testcase := range testMessages {
event := ProduceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)
event := produceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)
client.Send(*event)
}

Expand All @@ -190,3 +192,65 @@ func TestRegularReceives_MultipleMessages(t *testing.T) {
require.Truef(t, ok, "bool type expected, got '%T' with '%v' value instead", acc.Metrics[1].Fields["value"], acc.Metrics[1].Fields["value"])
require.False(t, v)
}

func TestReconnect(t *testing.T) {
listener := KNXListener{
ServiceType: "dummy",
Measurements: []Measurement{
{"temperature", "1.001", []string{"1/1/1"}},
},
Log: testutil.Logger{Name: "knx_listener"},
}
require.NoError(t, listener.Init())

var acc testutil.Accumulator

// Setup the listener to test
require.NoError(t, listener.Start(&acc))
defer listener.Stop()
client := listener.client.(*KNXDummyInterface)

testMessages := []message{
{"1/1/1", "1.001", true},
{"1/1/1", "1.001", false},
{"1/1/2", "1.001", false},
{"1/1/2", "1.001", true},
}

for _, testcase := range testMessages {
event := produceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)
client.Send(*event)
}

// Give the accumulator some time to collect the data
require.Eventuallyf(t, func() bool {
return acc.NMetrics() >= 2
}, 3*time.Second, 100*time.Millisecond, "expected 2 metric but got %d", acc.NMetrics())
require.True(t, listener.connected.Load())

client.Close()

require.Eventually(t, func() bool {
return !listener.connected.Load()
}, 3*time.Second, 100*time.Millisecond, "no disconnect")
acc.Lock()
err := acc.FirstError()
acc.Unlock()
require.ErrorContains(t, err, "disconnected from bus")

require.NoError(t, listener.Gather(&acc))
require.Eventually(t, func() bool {
return listener.connected.Load()
}, 3*time.Second, 100*time.Millisecond, "no reconnect")
client = listener.client.(*KNXDummyInterface)

for _, testcase := range testMessages {
event := produceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)
client.Send(*event)
}

// Give the accumulator some time to collect the data
require.Eventuallyf(t, func() bool {
return acc.NMetrics() >= 2
}, 3*time.Second, 100*time.Millisecond, "expected 2 metric but got %d", acc.NMetrics())
}

0 comments on commit 73d4d36

Please sign in to comment.