From 99ddb467e9de14d48b62ba2bff8b2f0f429ab06b Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Wed, 8 Dec 2021 20:19:13 +0100 Subject: [PATCH] feat: Modbus add per-request tags (#10231) --- plugins/inputs/modbus/README.md | 17 ++ .../inputs/modbus/configuration_register.go | 2 +- .../inputs/modbus/configuration_request.go | 21 ++- plugins/inputs/modbus/modbus.go | 11 +- plugins/inputs/modbus/modbus_test.go | 178 ++++++++++++++++++ plugins/inputs/modbus/request.go | 31 +-- 6 files changed, 241 insertions(+), 19 deletions(-) diff --git a/plugins/inputs/modbus/README.md b/plugins/inputs/modbus/README.md index 1d0d253a7ab65..fd9b92e513178 100644 --- a/plugins/inputs/modbus/README.md +++ b/plugins/inputs/modbus/README.md @@ -154,6 +154,10 @@ Registers via Modbus TCP or Modbus RTU/ASCII. { address=3, name="motor1_overheating"}, ] + [[inputs.modbus.request.tags]] + machine = "impresser" + location = "main building" + [[inputs.modbus.request]] ## Holding example ## All of those examples will result in FLOAT64 field outputs @@ -169,6 +173,10 @@ Registers via Modbus TCP or Modbus RTU/ASCII. { address=8, name="power_factor", type="INT64", scale=0.01 }, ] + [[inputs.modbus.request.tags]] + machine = "impresser" + location = "main building" + [[inputs.modbus.request]] ## Input example with type conversions slave_id = 1 @@ -181,6 +189,10 @@ Registers via Modbus TCP or Modbus RTU/ASCII. { address=4, name="hours", type="UINT32" }, # will result in UIN64 field ] + [[inputs.modbus.request.tags]] + machine = "impresser" + location = "main building" + ## Enable workarounds required by some devices to work correctly # [inputs.modbus.workarounds] ## Pause between read requests sent to the device. This might be necessary for (slow) serial devices. @@ -320,6 +332,11 @@ This setting is ignored if the field's `omit` is set to `true` and can be omitte When specifying `omit=true`, the corresponding field will be ignored when collecting the metric but is taken into account when constructing the modbus requests. This way, you can fill "holes" in the addresses to construct consecutive address ranges resulting in a single request. Using a single modbus request can be beneficial as the values are all collected at the same point in time. +#### Tags definitions + +Each `request` can be accompanied by tags valid for this request. +__Please note:__ These tags take precedence over predefined tags such as `name`, `type` or `slave_id`. + --- ## Trouble shooting diff --git a/plugins/inputs/modbus/configuration_register.go b/plugins/inputs/modbus/configuration_register.go index b73f3c7ce0203..edb6548acbb57 100644 --- a/plugins/inputs/modbus/configuration_register.go +++ b/plugins/inputs/modbus/configuration_register.go @@ -129,7 +129,7 @@ func (c *ConfigurationOriginal) initRequests(fieldDefs []fieldDefinition, maxQua if err != nil { return nil, err } - return newRequestsFromFields(fields, maxQuantity), nil + return groupFieldsToRequests(fields, nil, maxQuantity), nil } func (c *ConfigurationOriginal) initFields(fieldDefs []fieldDefinition) ([]field, error) { diff --git a/plugins/inputs/modbus/configuration_request.go b/plugins/inputs/modbus/configuration_request.go index 2231616142e67..d64b53a827a89 100644 --- a/plugins/inputs/modbus/configuration_request.go +++ b/plugins/inputs/modbus/configuration_request.go @@ -11,7 +11,7 @@ const sampleConfigPartPerRequest = ` ## Define a request sent to the device ## Multiple of those requests can be defined. Data will be collated into metrics at the end of data collection. - [[inputs.modbus.request]] + # [[inputs.modbus.request]] ## ID of the modbus slave device to query. ## If you need to query multiple slave-devices, create several "request" definitions. # slave_id = 0 @@ -57,6 +57,11 @@ const sampleConfigPartPerRequest = ` # { address=3, name="motor1_overheating"}, # ] + ## Per-request tags + ## These tags take precedence over predefined tags. + # [[inputs.modbus.request.tags]] + # name = "value" + ## Holding / input example ## All of those examples will result in FLOAT64 field outputs # fields = [ @@ -75,6 +80,11 @@ const sampleConfigPartPerRequest = ` # { address=2, name="force", type="INT32", output="FLOAT64" }, # will result in FLOAT64 field # { address=4, name="hours", type="UINT32" }, # will result in UIN64 field # ] + + ## Per-request tags + ## These tags take precedence over predefined tags. + # [[inputs.modbus.request.tags]] + # name = "value" ` type requestFieldDefinition struct { @@ -93,6 +103,7 @@ type requestDefinition struct { RegisterType string `toml:"register"` Measurement string `toml:"measurement"` Fields []requestFieldDefinition `toml:"fields"` + Tags map[string]string `toml:"tags"` } type ConfigurationPerRequest struct { @@ -213,16 +224,16 @@ func (c *ConfigurationPerRequest) Process() (map[byte]requestSet, error) { switch def.RegisterType { case "coil": - requests := newRequestsFromFields(fields, maxQuantityCoils) + requests := groupFieldsToRequests(fields, def.Tags, maxQuantityCoils) set.coil = append(set.coil, requests...) case "discrete": - requests := newRequestsFromFields(fields, maxQuantityDiscreteInput) + requests := groupFieldsToRequests(fields, def.Tags, maxQuantityDiscreteInput) set.discrete = append(set.discrete, requests...) case "holding": - requests := newRequestsFromFields(fields, maxQuantityHoldingRegisters) + requests := groupFieldsToRequests(fields, def.Tags, maxQuantityHoldingRegisters) set.holding = append(set.holding, requests...) case "input": - requests := newRequestsFromFields(fields, maxQuantityInputRegisters) + requests := groupFieldsToRequests(fields, def.Tags, maxQuantityInputRegisters) set.input = append(set.input, requests...) default: return nil, fmt.Errorf("unknown register type %q", def.RegisterType) diff --git a/plugins/inputs/modbus/modbus.go b/plugins/inputs/modbus/modbus.go index 865a430b74929..535c5e921fb26 100644 --- a/plugins/inputs/modbus/modbus.go +++ b/plugins/inputs/modbus/modbus.go @@ -461,6 +461,15 @@ func (m *Modbus) gatherRequestsInput(requests []request) error { func (m *Modbus) collectFields(acc telegraf.Accumulator, timestamp time.Time, tags map[string]string, requests []request) { grouper := metric.NewSeriesGrouper() for _, request := range requests { + // Collect tags from global and per-request + rtags := map[string]string{} + for k, v := range tags { + rtags[k] = v + } + for k, v := range request.tags { + rtags[k] = v + } + for _, field := range request.fields { // In case no measurement was specified we use "modbus" as default measurement := "modbus" @@ -469,7 +478,7 @@ func (m *Modbus) collectFields(acc telegraf.Accumulator, timestamp time.Time, ta } // Group the data by series - if err := grouper.Add(measurement, tags, timestamp, field.name, field.value); err != nil { + if err := grouper.Add(measurement, rtags, timestamp, field.name, field.value); err != nil { acc.AddError(fmt.Errorf("cannot add field %q for measurement %q: %v", field.name, measurement, err)) continue } diff --git a/plugins/inputs/modbus/modbus_test.go b/plugins/inputs/modbus/modbus_test.go index ec1e73cd2dca3..fe155a99654d7 100644 --- a/plugins/inputs/modbus/modbus_test.go +++ b/plugins/inputs/modbus/modbus_test.go @@ -1290,6 +1290,184 @@ func TestConfigurationPerRequest(t *testing.T) { require.Len(t, modbus.requests[1].input, 1) } +func TestConfigurationPerRequestWithTags(t *testing.T) { + modbus := Modbus{ + Name: "Test", + Controller: "tcp://localhost:1502", + ConfigurationType: "request", + Log: testutil.Logger{}, + } + modbus.Requests = []requestDefinition{ + { + SlaveID: 1, + ByteOrder: "ABCD", + RegisterType: "coil", + Fields: []requestFieldDefinition{ + { + Name: "coil-0", + Address: uint16(0), + }, + { + Name: "coil-1", + Address: uint16(1), + Omit: true, + }, + { + Name: "coil-2", + Address: uint16(2), + InputType: "INT64", + Scale: 1.2, + OutputType: "FLOAT64", + Measurement: "modbus", + }, + }, + Tags: map[string]string{ + "first": "a", + "second": "bb", + "third": "ccc", + }, + }, + { + SlaveID: 1, + RegisterType: "coil", + Fields: []requestFieldDefinition{ + { + Name: "coil-3", + Address: uint16(6), + }, + { + Name: "coil-4", + Address: uint16(7), + Omit: true, + }, + { + Name: "coil-5", + Address: uint16(8), + InputType: "INT64", + Scale: 1.2, + OutputType: "FLOAT64", + Measurement: "modbus", + }, + }, + Tags: map[string]string{ + "first": "a", + "second": "bb", + "third": "ccc", + }, + }, + { + SlaveID: 1, + ByteOrder: "ABCD", + RegisterType: "discrete", + Fields: []requestFieldDefinition{ + { + Name: "discrete-0", + Address: uint16(0), + }, + { + Name: "discrete-1", + Address: uint16(1), + Omit: true, + }, + { + Name: "discrete-2", + Address: uint16(2), + InputType: "INT64", + Scale: 1.2, + OutputType: "FLOAT64", + Measurement: "modbus", + }, + }, + Tags: map[string]string{ + "first": "a", + "second": "bb", + "third": "ccc", + }, + }, + { + SlaveID: 1, + ByteOrder: "ABCD", + RegisterType: "holding", + Fields: []requestFieldDefinition{ + { + Name: "holding-0", + Address: uint16(0), + InputType: "INT16", + }, + { + Name: "holding-1", + Address: uint16(1), + InputType: "UINT16", + Omit: true, + }, + { + Name: "holding-2", + Address: uint16(2), + InputType: "INT64", + Scale: 1.2, + OutputType: "FLOAT64", + Measurement: "modbus", + }, + }, + Tags: map[string]string{ + "first": "a", + "second": "bb", + "third": "ccc", + }, + }, + { + SlaveID: 1, + ByteOrder: "ABCD", + RegisterType: "input", + Fields: []requestFieldDefinition{ + { + Name: "input-0", + Address: uint16(0), + InputType: "INT16", + }, + { + Name: "input-1", + Address: uint16(1), + InputType: "UINT16", + Omit: true, + }, + { + Name: "input-2", + Address: uint16(2), + InputType: "INT64", + Scale: 1.2, + OutputType: "FLOAT64", + Measurement: "modbus", + }, + }, + Tags: map[string]string{ + "first": "a", + "second": "bb", + "third": "ccc", + }, + }, + } + + require.NoError(t, modbus.Init()) + require.NotEmpty(t, modbus.requests) + require.NotNil(t, modbus.requests[1]) + require.Len(t, modbus.requests[1].coil, 2) + require.Len(t, modbus.requests[1].discrete, 1) + require.Len(t, modbus.requests[1].holding, 1) + require.Len(t, modbus.requests[1].input, 1) + + expectedTags := map[string]string{ + "first": "a", + "second": "bb", + "third": "ccc", + } + require.Equal(t, expectedTags, modbus.requests[1].coil[0].tags) + require.Equal(t, expectedTags, modbus.requests[1].coil[1].tags) + require.Equal(t, expectedTags, modbus.requests[1].discrete[0].tags) + require.Equal(t, expectedTags, modbus.requests[1].holding[0].tags) + require.Equal(t, expectedTags, modbus.requests[1].input[0].tags) +} + func TestConfigurationPerRequestFail(t *testing.T) { tests := []struct { name string diff --git a/plugins/inputs/modbus/request.go b/plugins/inputs/modbus/request.go index 890f730db6759..77fa392146097 100644 --- a/plugins/inputs/modbus/request.go +++ b/plugins/inputs/modbus/request.go @@ -8,9 +8,25 @@ type request struct { address uint16 length uint16 fields []field + tags map[string]string } -func newRequestsFromFields(fields []field, maxBatchSize uint16) []request { +func newRequest(f field, tags map[string]string) request { + r := request{ + address: f.address, + length: f.length, + fields: []field{f}, + tags: map[string]string{}, + } + + // Copy the tags + for k, v := range tags { + r.tags[k] = v + } + return r +} + +func groupFieldsToRequests(fields []field, tags map[string]string, maxBatchSize uint16) []request { if len(fields) == 0 { return nil } @@ -28,12 +44,7 @@ func newRequestsFromFields(fields []field, maxBatchSize uint16) []request { // and the given maximum chunk sizes. var requests []request - current := request{ - address: fields[0].address, - length: fields[0].length, - fields: []field{fields[0]}, - } - + current := newRequest(fields[0], tags) for _, f := range fields[1:] { // Check if we need to interrupt the current chunk and require a new one needInterrupt := f.address != current.address+current.length // not consecutive @@ -51,11 +62,7 @@ func newRequestsFromFields(fields []field, maxBatchSize uint16) []request { // Finish the current request, add it to the list and construct a new one requests = append(requests, current) - current = request{ - address: f.address, - length: f.length, - fields: []field{f}, - } + current = newRequest(f, tags) } requests = append(requests, current)