Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(inputs.knx_listener): Reconnect after connection loss #14959

Merged
merged 1 commit into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions plugins/inputs/knx_listener/knx_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package knx_listener

import (
_ "embed"
"errors"
"fmt"
"reflect"
"sync"
"sync/atomic"

"github.com/vapourismo/knx-go/knx"
"github.com/vapourismo/knx-go/knx/dpt"
Expand Down Expand Up @@ -43,22 +45,27 @@ type KNXListener struct {
gaTargetMap map[string]addressTarget
gaLogbook map[string]bool

acc telegraf.Accumulator
wg sync.WaitGroup
wg sync.WaitGroup
connected atomic.Bool
}

func (*KNXListener) SampleConfig() string {
return sampleConfig
}

func (kl *KNXListener) Gather(_ telegraf.Accumulator) error {
func (kl *KNXListener) Gather(acc telegraf.Accumulator) error {
if !kl.connected.Load() {
// We got disconnected for some reason, so try to reconnect in every
// gather cycle until we are reconnected
if err := kl.Start(acc); err != nil {
return fmt.Errorf("reconnecting to bus failed: %w", err)
}
}

return nil
}

func (kl *KNXListener) Start(acc telegraf.Accumulator) error {
// Store the accumulator for later use
kl.acc = acc

func (kl *KNXListener) Init() error {
// Setup a logbook to track unknown GAs to avoid log-spamming
kl.gaLogbook = make(map[string]bool)

Expand All @@ -80,6 +87,10 @@ func (kl *KNXListener) Start(acc telegraf.Accumulator) error {
}
}

return nil
}

func (kl *KNXListener) Start(acc telegraf.Accumulator) error {
// Connect to the KNX-IP interface
kl.Log.Infof("Trying to connect to %q at %q", kl.ServiceType, kl.ServiceAddress)
switch kl.ServiceType {
Expand Down Expand Up @@ -112,12 +123,15 @@ func (kl *KNXListener) Start(acc telegraf.Accumulator) error {
return fmt.Errorf("invalid interface type: %s", kl.ServiceAddress)
}
kl.Log.Infof("Connected!")
kl.connected.Store(true)

// Listen to the KNX bus
kl.wg.Add(1)
go func() {
kl.wg.Done()
kl.listen()
defer kl.wg.Done()
kl.listen(acc)
kl.connected.Store(false)
acc.AddError(errors.New("disconnected from bus"))
}()

return nil
Expand All @@ -130,7 +144,7 @@ func (kl *KNXListener) Stop() {
}
}

func (kl *KNXListener) listen() {
func (kl *KNXListener) listen(acc telegraf.Accumulator) {
for msg := range kl.client.Inbound() {
// Match GA to DataPointType and measurement name
ga := msg.Destination.String()
Expand Down Expand Up @@ -177,7 +191,7 @@ func (kl *KNXListener) listen() {
"unit": target.datapoint.(dpt.DatapointMeta).Unit(),
"source": msg.Source.String(),
}
kl.acc.AddFields(target.measurement, fields, tags)
acc.AddFields(target.measurement, fields, tags)
}
}

Expand Down
76 changes: 70 additions & 6 deletions plugins/inputs/knx_listener/knx_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ func setValue(data dpt.DatapointValue, value interface{}) error {
return nil
}

type TestMessage struct {
type message struct {
address string
dpt string
value interface{}
}

func ProduceKnxEvent(t *testing.T, address string, datapoint string, value interface{}) *knx.GroupEvent {
func produceKnxEvent(t *testing.T, address string, datapoint string, value interface{}) *knx.GroupEvent {
addr, err := cemi.NewGroupAddrString(address)
require.NoError(t, err)

Expand All @@ -60,7 +60,7 @@ func ProduceKnxEvent(t *testing.T, address string, datapoint string, value inter

func TestRegularReceives_DPT(t *testing.T) {
// Define the test-cases
var testcases = []TestMessage{
var testcases = []message{
{"1/0/1", "1.001", true},
{"1/0/2", "1.002", false},
{"1/0/3", "1.003", true},
Expand Down Expand Up @@ -101,6 +101,7 @@ func TestRegularReceives_DPT(t *testing.T) {
Measurements: measurements,
Log: testutil.Logger{Name: "knx_listener"},
}
require.NoError(t, listener.Init())

// Setup the listener to test
err := listener.Start(acc)
Expand All @@ -111,7 +112,7 @@ func TestRegularReceives_DPT(t *testing.T) {

// Send the defined test data
for _, testcase := range testcases {
event := ProduceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)
event := produceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)
client.Send(*event)
}

Expand Down Expand Up @@ -147,6 +148,7 @@ func TestRegularReceives_MultipleMessages(t *testing.T) {
},
Log: testutil.Logger{Name: "knx_listener"},
}
require.NoError(t, listener.Init())

acc := &testutil.Accumulator{}

Expand All @@ -155,15 +157,15 @@ func TestRegularReceives_MultipleMessages(t *testing.T) {
require.NoError(t, err)
client := listener.client.(*KNXDummyInterface)

testMessages := []TestMessage{
testMessages := []message{
{"1/1/1", "1.001", true},
{"1/1/1", "1.001", false},
{"1/1/2", "1.001", false},
{"1/1/2", "1.001", true},
}

for _, testcase := range testMessages {
event := ProduceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)
event := produceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)
client.Send(*event)
}

Expand All @@ -190,3 +192,65 @@ func TestRegularReceives_MultipleMessages(t *testing.T) {
require.Truef(t, ok, "bool type expected, got '%T' with '%v' value instead", acc.Metrics[1].Fields["value"], acc.Metrics[1].Fields["value"])
require.False(t, v)
}

func TestReconnect(t *testing.T) {
listener := KNXListener{
ServiceType: "dummy",
Measurements: []Measurement{
{"temperature", "1.001", []string{"1/1/1"}},
},
Log: testutil.Logger{Name: "knx_listener"},
}
require.NoError(t, listener.Init())

var acc testutil.Accumulator

// Setup the listener to test
require.NoError(t, listener.Start(&acc))
defer listener.Stop()
client := listener.client.(*KNXDummyInterface)

testMessages := []message{
{"1/1/1", "1.001", true},
{"1/1/1", "1.001", false},
{"1/1/2", "1.001", false},
{"1/1/2", "1.001", true},
}

for _, testcase := range testMessages {
event := produceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)
client.Send(*event)
}

// Give the accumulator some time to collect the data
require.Eventuallyf(t, func() bool {
return acc.NMetrics() >= 2
}, 3*time.Second, 100*time.Millisecond, "expected 2 metric but got %d", acc.NMetrics())
require.True(t, listener.connected.Load())

client.Close()

require.Eventually(t, func() bool {
return !listener.connected.Load()
}, 3*time.Second, 100*time.Millisecond, "no disconnect")
acc.Lock()
err := acc.FirstError()
acc.Unlock()
require.ErrorContains(t, err, "disconnected from bus")

require.NoError(t, listener.Gather(&acc))
require.Eventually(t, func() bool {
return listener.connected.Load()
}, 3*time.Second, 100*time.Millisecond, "no reconnect")
client = listener.client.(*KNXDummyInterface)

for _, testcase := range testMessages {
event := produceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)
client.Send(*event)
}

// Give the accumulator some time to collect the data
require.Eventuallyf(t, func() bool {
return acc.NMetrics() >= 2
}, 3*time.Second, 100*time.Millisecond, "expected 2 metric but got %d", acc.NMetrics())
}
Loading