Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Modbus add per-request tags #10231

Merged
merged 5 commits into from
Dec 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions plugins/inputs/modbus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ Registers via Modbus TCP or Modbus RTU/ASCII.
{ address=3, name="motor1_overheating"},
]

[[inputs.modbus.request.tags]]
machine = "impresser"
location = "main building"

[[inputs.modbus.request]]
## Holding example
## All of those examples will result in FLOAT64 field outputs
Expand All @@ -169,6 +173,10 @@ Registers via Modbus TCP or Modbus RTU/ASCII.
{ address=8, name="power_factor", type="INT64", scale=0.01 },
]

[[inputs.modbus.request.tags]]
machine = "impresser"
location = "main building"

[[inputs.modbus.request]]
## Input example with type conversions
slave_id = 1
Expand All @@ -181,6 +189,10 @@ Registers via Modbus TCP or Modbus RTU/ASCII.
{ address=4, name="hours", type="UINT32" }, # will result in UIN64 field
]

[[inputs.modbus.request.tags]]
machine = "impresser"
location = "main building"

## Enable workarounds required by some devices to work correctly
# [inputs.modbus.workarounds]
## Pause between read requests sent to the device. This might be necessary for (slow) serial devices.
Expand Down Expand Up @@ -320,6 +332,11 @@ This setting is ignored if the field's `omit` is set to `true` and can be omitte

When specifying `omit=true`, the corresponding field will be ignored when collecting the metric but is taken into account when constructing the modbus requests. This way, you can fill "holes" in the addresses to construct consecutive address ranges resulting in a single request. Using a single modbus request can be beneficial as the values are all collected at the same point in time.

#### Tags definitions

Each `request` can be accompanied by tags valid for this request.
__Please note:__ These tags take precedence over predefined tags such as `name`, `type` or `slave_id`.

---

## Trouble shooting
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/modbus/configuration_register.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (c *ConfigurationOriginal) initRequests(fieldDefs []fieldDefinition, maxQua
if err != nil {
return nil, err
}
return newRequestsFromFields(fields, maxQuantity), nil
return groupFieldsToRequests(fields, nil, maxQuantity), nil
}

func (c *ConfigurationOriginal) initFields(fieldDefs []fieldDefinition) ([]field, error) {
Expand Down
21 changes: 16 additions & 5 deletions plugins/inputs/modbus/configuration_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const sampleConfigPartPerRequest = `

## Define a request sent to the device
## Multiple of those requests can be defined. Data will be collated into metrics at the end of data collection.
[[inputs.modbus.request]]
# [[inputs.modbus.request]]
## ID of the modbus slave device to query.
## If you need to query multiple slave-devices, create several "request" definitions.
# slave_id = 0
Expand Down Expand Up @@ -57,6 +57,11 @@ const sampleConfigPartPerRequest = `
# { address=3, name="motor1_overheating"},
# ]

## Per-request tags
## These tags take precedence over predefined tags.
# [[inputs.modbus.request.tags]]
# name = "value"

## Holding / input example
## All of those examples will result in FLOAT64 field outputs
# fields = [
Expand All @@ -75,6 +80,11 @@ const sampleConfigPartPerRequest = `
# { address=2, name="force", type="INT32", output="FLOAT64" }, # will result in FLOAT64 field
# { address=4, name="hours", type="UINT32" }, # will result in UIN64 field
# ]

## Per-request tags
## These tags take precedence over predefined tags.
# [[inputs.modbus.request.tags]]
# name = "value"
`

type requestFieldDefinition struct {
Expand All @@ -93,6 +103,7 @@ type requestDefinition struct {
RegisterType string `toml:"register"`
Measurement string `toml:"measurement"`
Fields []requestFieldDefinition `toml:"fields"`
Tags map[string]string `toml:"tags"`
}

type ConfigurationPerRequest struct {
Expand Down Expand Up @@ -213,16 +224,16 @@ func (c *ConfigurationPerRequest) Process() (map[byte]requestSet, error) {

switch def.RegisterType {
case "coil":
requests := newRequestsFromFields(fields, maxQuantityCoils)
requests := groupFieldsToRequests(fields, def.Tags, maxQuantityCoils)
set.coil = append(set.coil, requests...)
case "discrete":
requests := newRequestsFromFields(fields, maxQuantityDiscreteInput)
requests := groupFieldsToRequests(fields, def.Tags, maxQuantityDiscreteInput)
set.discrete = append(set.discrete, requests...)
case "holding":
requests := newRequestsFromFields(fields, maxQuantityHoldingRegisters)
requests := groupFieldsToRequests(fields, def.Tags, maxQuantityHoldingRegisters)
set.holding = append(set.holding, requests...)
case "input":
requests := newRequestsFromFields(fields, maxQuantityInputRegisters)
requests := groupFieldsToRequests(fields, def.Tags, maxQuantityInputRegisters)
set.input = append(set.input, requests...)
default:
return nil, fmt.Errorf("unknown register type %q", def.RegisterType)
Expand Down
11 changes: 10 additions & 1 deletion plugins/inputs/modbus/modbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,15 @@ func (m *Modbus) gatherRequestsInput(requests []request) error {
func (m *Modbus) collectFields(acc telegraf.Accumulator, timestamp time.Time, tags map[string]string, requests []request) {
grouper := metric.NewSeriesGrouper()
for _, request := range requests {
// Collect tags from global and per-request
rtags := map[string]string{}
for k, v := range tags {
rtags[k] = v
}
for k, v := range request.tags {
rtags[k] = v
}

for _, field := range request.fields {
// In case no measurement was specified we use "modbus" as default
measurement := "modbus"
Expand All @@ -469,7 +478,7 @@ func (m *Modbus) collectFields(acc telegraf.Accumulator, timestamp time.Time, ta
}

// Group the data by series
if err := grouper.Add(measurement, tags, timestamp, field.name, field.value); err != nil {
if err := grouper.Add(measurement, rtags, timestamp, field.name, field.value); err != nil {
acc.AddError(fmt.Errorf("cannot add field %q for measurement %q: %v", field.name, measurement, err))
continue
}
Expand Down
178 changes: 178 additions & 0 deletions plugins/inputs/modbus/modbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,184 @@ func TestConfigurationPerRequest(t *testing.T) {
require.Len(t, modbus.requests[1].input, 1)
}

func TestConfigurationPerRequestWithTags(t *testing.T) {
modbus := Modbus{
Name: "Test",
Controller: "tcp://localhost:1502",
ConfigurationType: "request",
Log: testutil.Logger{},
}
modbus.Requests = []requestDefinition{
{
SlaveID: 1,
ByteOrder: "ABCD",
RegisterType: "coil",
Fields: []requestFieldDefinition{
{
Name: "coil-0",
Address: uint16(0),
},
{
Name: "coil-1",
Address: uint16(1),
Omit: true,
},
{
Name: "coil-2",
Address: uint16(2),
InputType: "INT64",
Scale: 1.2,
OutputType: "FLOAT64",
Measurement: "modbus",
},
},
Tags: map[string]string{
"first": "a",
"second": "bb",
"third": "ccc",
},
},
{
SlaveID: 1,
RegisterType: "coil",
Fields: []requestFieldDefinition{
{
Name: "coil-3",
Address: uint16(6),
},
{
Name: "coil-4",
Address: uint16(7),
Omit: true,
},
{
Name: "coil-5",
Address: uint16(8),
InputType: "INT64",
Scale: 1.2,
OutputType: "FLOAT64",
Measurement: "modbus",
},
},
Tags: map[string]string{
"first": "a",
"second": "bb",
"third": "ccc",
},
},
{
SlaveID: 1,
ByteOrder: "ABCD",
RegisterType: "discrete",
Fields: []requestFieldDefinition{
{
Name: "discrete-0",
Address: uint16(0),
},
{
Name: "discrete-1",
Address: uint16(1),
Omit: true,
},
{
Name: "discrete-2",
Address: uint16(2),
InputType: "INT64",
Scale: 1.2,
OutputType: "FLOAT64",
Measurement: "modbus",
},
},
Tags: map[string]string{
"first": "a",
"second": "bb",
"third": "ccc",
},
},
{
SlaveID: 1,
ByteOrder: "ABCD",
RegisterType: "holding",
Fields: []requestFieldDefinition{
{
Name: "holding-0",
Address: uint16(0),
InputType: "INT16",
},
{
Name: "holding-1",
Address: uint16(1),
InputType: "UINT16",
Omit: true,
},
{
Name: "holding-2",
Address: uint16(2),
InputType: "INT64",
Scale: 1.2,
OutputType: "FLOAT64",
Measurement: "modbus",
},
},
Tags: map[string]string{
"first": "a",
"second": "bb",
"third": "ccc",
},
},
{
SlaveID: 1,
ByteOrder: "ABCD",
RegisterType: "input",
Fields: []requestFieldDefinition{
{
Name: "input-0",
Address: uint16(0),
InputType: "INT16",
},
{
Name: "input-1",
Address: uint16(1),
InputType: "UINT16",
Omit: true,
},
{
Name: "input-2",
Address: uint16(2),
InputType: "INT64",
Scale: 1.2,
OutputType: "FLOAT64",
Measurement: "modbus",
},
},
Tags: map[string]string{
"first": "a",
"second": "bb",
"third": "ccc",
},
},
}

require.NoError(t, modbus.Init())
require.NotEmpty(t, modbus.requests)
require.NotNil(t, modbus.requests[1])
require.Len(t, modbus.requests[1].coil, 2)
require.Len(t, modbus.requests[1].discrete, 1)
require.Len(t, modbus.requests[1].holding, 1)
require.Len(t, modbus.requests[1].input, 1)

expectedTags := map[string]string{
"first": "a",
"second": "bb",
"third": "ccc",
}
require.Equal(t, expectedTags, modbus.requests[1].coil[0].tags)
require.Equal(t, expectedTags, modbus.requests[1].coil[1].tags)
require.Equal(t, expectedTags, modbus.requests[1].discrete[0].tags)
require.Equal(t, expectedTags, modbus.requests[1].holding[0].tags)
require.Equal(t, expectedTags, modbus.requests[1].input[0].tags)
}

func TestConfigurationPerRequestFail(t *testing.T) {
tests := []struct {
name string
Expand Down
31 changes: 19 additions & 12 deletions plugins/inputs/modbus/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,25 @@ type request struct {
address uint16
length uint16
fields []field
tags map[string]string
}

func newRequestsFromFields(fields []field, maxBatchSize uint16) []request {
func newRequest(f field, tags map[string]string) request {
r := request{
address: f.address,
length: f.length,
fields: []field{f},
tags: map[string]string{},
}

// Copy the tags
for k, v := range tags {
r.tags[k] = v
}
return r
}

func groupFieldsToRequests(fields []field, tags map[string]string, maxBatchSize uint16) []request {
if len(fields) == 0 {
return nil
}
Expand All @@ -28,12 +44,7 @@ func newRequestsFromFields(fields []field, maxBatchSize uint16) []request {
// and the given maximum chunk sizes.
var requests []request

current := request{
address: fields[0].address,
length: fields[0].length,
fields: []field{fields[0]},
}

current := newRequest(fields[0], tags)
for _, f := range fields[1:] {
// Check if we need to interrupt the current chunk and require a new one
needInterrupt := f.address != current.address+current.length // not consecutive
Expand All @@ -51,11 +62,7 @@ func newRequestsFromFields(fields []field, maxBatchSize uint16) []request {

// Finish the current request, add it to the list and construct a new one
requests = append(requests, current)
current = request{
address: f.address,
length: f.length,
fields: []field{f},
}
current = newRequest(f, tags)
}
requests = append(requests, current)

Expand Down