Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Libbeat][Metricbeat]Add IgnoreAllErrors to schema.Conv object #12089

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Update urllib3 version to 1.24.2 {pull}11930[11930]
- Add libbeat/common/cleanup package. {pull}12134[12134]
- Only Load minimal template if no fields are provided. {pull}12103[12103]
- Add new option `IgnoreAllErrors` to `libbeat.common.schema` for skipping fields that failed while converting. {pull}12089[12089]
- Deprecate setup cmds for `template` and `ilm-policy`. Add new setup cmd for `index-management`. {pull}12132[12132]
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Avoid generating hints-based configuration with empty hosts when no exposed port is suitable for the hosts hint. {issue}8264[8264] {pull}12086[12086]
- Fixed a socket leak in the postgresql module under Windows when SSL is disabled on the server. {pull}11393[11393]
- Change some field type from scaled_float to long in aws module. {pull}11982[11982]
- Fixed RabbitMQ `queue` metricset gathering when `consumer_utilisation` is set empty at the metrics source {pull}12089[12089]

*Packetbeat*

Expand Down
21 changes: 17 additions & 4 deletions libbeat/common/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package schema
import (
"github.com/joeshaw/multierror"

"github.com/elastic/beats/libbeat/logp"

"github.com/elastic/beats/libbeat/common"
)

Expand All @@ -39,10 +41,11 @@ type Mapper interface {

// A Conv object represents a conversion mechanism from the data map to the event map.
type Conv struct {
Func Converter // Convertor function
Key string // The key in the data map
Optional bool // Whether to ignore errors if the key is not found
Required bool // Whether to provoke errors if the key is not found
Func Converter // Convertor function
Key string // The key in the data map
Optional bool // Whether to ignore errors if the key is not found
Required bool // Whether to provoke errors if the key is not found
IgnoreAllErrors bool // Ignore any value conversion error
}

// Converter function type
Expand All @@ -57,6 +60,10 @@ func (conv Conv) Map(key string, event common.MapStr, data map[string]interface{
err.Optional = conv.Optional
err.Required = conv.Required
}
if conv.IgnoreAllErrors {
logp.Debug("schema", "ignoring error for key %q: %s", key, err)
return nil
}
return multierror.Errors{err}
}
event[key] = value
Expand Down Expand Up @@ -142,6 +149,12 @@ func Required(c Conv) Conv {
return c
}

// IgnoreAllErrors set the enable all errors flag
func IgnoreAllErrors(c Conv) Conv {
c.IgnoreAllErrors = true
return c
}

// setOptions adds the optional flags to the Conv object
func SetOptions(c Conv, opts []SchemaOption) Conv {
for _, opt := range opts {
Expand Down
88 changes: 88 additions & 0 deletions libbeat/common/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package schema
import (
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -83,3 +84,90 @@ func TestOptions(t *testing.T) {
assert.Equal(t, conv.Key, "test")
assert.Equal(t, conv.Optional, true)
}

func TestSchemaCases(t *testing.T) {

var errFunc = func(key string, data map[string]interface{}) (interface{}, error) {
return nil, errors.New("test error")
}
var noopFunc = func(key string, data map[string]interface{}) (interface{}, error) { return data[key], nil }

var testCases = []struct {
name string
schema Schema
source map[string]interface{}

expectedErrorMessage string
expectedOutput common.MapStr
}{
{
name: "standard schema conversion case",
schema: Schema{
"outField": Conv{
Key: "inField",
Func: noopFunc,
IgnoreAllErrors: true,
},
},
source: map[string]interface{}{
"inField": "10",
},

expectedOutput: common.MapStr{
"outField": "10",
},
},
{
name: "error at conversion case",
schema: Schema{
"outField": Conv{
Key: "inField",
Func: errFunc,
Optional: true,
},
},
source: map[string]interface{}{
"doesntMatter": "",
},

expectedErrorMessage: "test error",
expectedOutput: common.MapStr{},
},
{
name: "ignore error at conversion case",
schema: Schema{
"outField": Conv{
Key: "inField",
Func: errFunc,
Optional: true,
IgnoreAllErrors: true,
},
},
source: map[string]interface{}{
"doesntMatter": "",
},

expectedOutput: common.MapStr{},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

event, errs := tc.schema.Apply(tc.source)

if errs != nil {
errorMessage := errs.Error()
if tc.expectedErrorMessage == "" {
t.Errorf("unexpected error ocurred: %s", errorMessage)
}
assert.Contains(t, errorMessage, tc.expectedErrorMessage)
} else if tc.expectedErrorMessage != "" {
t.Errorf("exepected error message %q was not returned", tc.expectedErrorMessage)
}

assert.Equal(t, tc.expectedOutput, event)

})
}
}
2 changes: 1 addition & 1 deletion metricbeat/module/rabbitmq/queue/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
"consumers": s.Object{
"count": c.Int("consumers"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What value does consumers have when consumer_utilisation is null? It could be good to add a test case for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what we read from RabbitMQ

...
    "consumer_utilisation": null,
    "consumers": 0,
...

This is metricbeat's output for this case (notice empty utilisation element

  "rabbitmq": {
    "node": {
      "name": "rabbit@Pablos-MacBook-Pro"
    },
    "queue": {
      "state": "running",
      "name": "hello",
      "memory": {
        "bytes": 13384
      },
      "consumers": {
        "count": 0,
        "utilisation": {}
      },
      "auto_delete": false,
      "arguments": {},
      "messages": {
        "total": {
          "details": {
            "rate": 0
          },
          "count": 0
        },
        "ready": {
          "details": {
            "rate": 0
          },
          "count": 0
        },
        "unacknowledged": {
          "count": 0,
          "details": {
            "rate": 0
          }
        },
        "persistent": {
          "count": 0
        }
      }

it would be nice to have a test for this case, but I'm not sure how to add it given that current test structure only allows one mocked response per metrics endpoint URL
https://github.com/elastic/beats/blob/master/metricbeat/module/rabbitmq/mtest/server.go#L56

can you shed some light on it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, leave it as is and we will add a test after #10667

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tested temporarily using null for consumer utilisation. Tests succeed

diff --git a/metricbeat/module/rabbitmq/_meta/testdata/queue_sample_response.json b/metricbeat/module/rabbitmq/_meta/testdata/queue_sample_response.json
index 7c3088e6d..fd7b60b93 100644
--- a/metricbeat/module/rabbitmq/_meta/testdata/queue_sample_response.json
+++ b/metricbeat/module/rabbitmq/_meta/testdata/queue_sample_response.json
@@ -76,7 +76,7 @@
             "rate": 0.5
         },
         "idle_since": "2017-07-28 23:45:52",
-        "consumer_utilisation": 0.7,
+        "consumer_utilisation": null,
         "policy": null,
         "exclusive_consumer_tag": null,
         "consumers": 3,
diff --git a/metricbeat/module/rabbitmq/queue/queue_test.go b/metricbeat/module/rabbitmq/queue/queue_test.go
index 2f3320abb..a033a0350 100644
--- a/metricbeat/module/rabbitmq/queue/queue_test.go
+++ b/metricbeat/module/rabbitmq/queue/queue_test.go
@@ -54,8 +54,8 @@ func TestFetchEventContents(t *testing.T) {
        consumers := event["consumers"].(common.MapStr)
        utilisation := consumers["utilisation"].(common.MapStr)
        assert.EqualValues(t, 3, consumers["count"])
-       assert.EqualValues(t, 0.7, utilisation["pct"])
-
+       //(assert.EqualValues(t, 0.7, utilisation["pct"])
+       assert.Nil(t, utilisation["pct"])
        memory := event["memory"].(common.MapStr)
        assert.EqualValues(t, 232720, memory["bytes"])

"utilisation": s.Object{
"pct": c.Int("consumer_utilisation", s.Optional),
"pct": c.Int("consumer_utilisation", s.IgnoreAllErrors),
},
},
"messages": s.Object{
Expand Down