Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #15859 to 7.6: Fix missing support for setting doc… #15915

Merged
merged 1 commit into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Affecting all Beats*

- The document id fields has been renamed from @metadata.id to @metadata._id {pull}15859[15859]


*Auditbeat*

Expand Down Expand Up @@ -77,6 +79,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Affecting all Beats*

- Add document_id setting to decode_json_fields processor. {pull}15859[15859]


*Auditbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ occur.

*`document_id`*:: Option configuration setting that specifies the JSON key to
set the document id. If configured, the field will be removed from the original
json document and stored in `@metadata.id`
json document and stored in `@metadata._id`

*`ignore_decoding_error`*:: An optional configuration setting that specifies if
JSON decoding errors should be logged or not. If set to true, errors will not
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (h *Harvester) onMessage(

if id != "" {
meta = common.MapStr{
"id": id,
"_id": id,
}
}
} else if &text != nil {
Expand Down
4 changes: 2 additions & 2 deletions filebeat/tests/system/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ def test_id_in_message(self):

assert len(output) == 3
for i in xrange(len(output)):
assert("@metadata.id" in output[i])
assert(output[i]["@metadata.id"] == str(i))
assert("@metadata._id" in output[i])
assert(output[i]["@metadata._id"] == str(i))
assert("json.id" not in output[i])

def test_with_generic_filtering(self):
Expand Down
2 changes: 1 addition & 1 deletion libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (e *Event) SetID(id string) {
if e.Meta == nil {
e.Meta = common.MapStr{}
}
e.Meta["id"] = id
e.Meta["_id"] = id
}

func (e *Event) GetValue(key string) (interface{}, error) {
Expand Down
8 changes: 4 additions & 4 deletions libbeat/beat/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestEventPutGetTimestamp(t *testing.T) {

func TestEventMetadata(t *testing.T) {
const id = "123"
newMeta := func() common.MapStr { return common.MapStr{"id": id} }
newMeta := func() common.MapStr { return common.MapStr{"_id": id} }

t.Run("put", func(t *testing.T) {
evt := newEmptyEvent()
Expand All @@ -75,7 +75,7 @@ func TestEventMetadata(t *testing.T) {
t.Run("put sub-key", func(t *testing.T) {
evt := newEmptyEvent()

evt.PutValue("@metadata.id", id)
evt.PutValue("@metadata._id", id)

assert.Equal(t, newMeta(), evt.Meta)
assert.Empty(t, evt.Fields)
Expand All @@ -85,7 +85,7 @@ func TestEventMetadata(t *testing.T) {
evt := newEmptyEvent()
evt.Meta = newMeta()

v, err := evt.GetValue("@metadata.id")
v, err := evt.GetValue("@metadata._id")

assert.NoError(t, err)
assert.Equal(t, id, v)
Expand All @@ -105,7 +105,7 @@ func TestEventMetadata(t *testing.T) {
evt := newEmptyEvent()
evt.Meta = newMeta()

err := evt.Delete("@metadata.id")
err := evt.Delete("@metadata._id")

assert.NoError(t, err)
assert.Empty(t, evt.Meta)
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func createEventBulkMeta(

var id string
if m := event.Meta; m != nil {
if tmp := m["id"]; tmp != nil {
if tmp := m["_id"]; tmp != nil {
if s, ok := tmp.(string); ok {
id = s
} else {
Expand Down
31 changes: 30 additions & 1 deletion libbeat/processors/actions/decode_json_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type decodeJSONFields struct {
overwriteKeys bool
addErrorKey bool
processArray bool
documentID string
target *string
}

Expand All @@ -50,6 +51,7 @@ type config struct {
AddErrorKey bool `config:"add_error_key"`
ProcessArray bool `config:"process_array"`
Target *string `config:"target"`
DocumentID string `config:"document_id"`
}

var (
Expand Down Expand Up @@ -81,7 +83,15 @@ func NewDecodeJSONFields(c *common.Config) (processors.Processor, error) {
return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err)
}

f := &decodeJSONFields{fields: config.Fields, maxDepth: config.MaxDepth, overwriteKeys: config.OverwriteKeys, addErrorKey: config.AddErrorKey, processArray: config.ProcessArray, target: config.Target}
f := &decodeJSONFields{
fields: config.Fields,
maxDepth: config.MaxDepth,
overwriteKeys: config.OverwriteKeys,
addErrorKey: config.AddErrorKey,
processArray: config.ProcessArray,
documentID: config.DocumentID,
target: config.Target,
}
return f, nil
}

Expand Down Expand Up @@ -115,6 +125,18 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) {
target = *f.target
}

var id string
if key := f.documentID; key != "" {
if dict, ok := output.(map[string]interface{}); ok {
if tmp, err := common.MapStr(dict).GetValue(key); err == nil {
if v, ok := tmp.(string); ok {
id = v
common.MapStr(dict).Delete(key)
}
}
}
}

if target != "" {
_, err = event.PutValue(target, output)
} else {
Expand All @@ -131,6 +153,13 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) {
errs = append(errs, err.Error())
continue
}

if id != "" {
if event.Meta == nil {
event.Meta = common.MapStr{}
}
event.Meta["_id"] = id
}
}

if len(errs) > 0 {
Expand Down
33 changes: 33 additions & 0 deletions libbeat/processors/actions/decode_json_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -94,6 +95,38 @@ func TestInvalidJSONMultiple(t *testing.T) {
assert.Equal(t, expected.String(), actual.String())
}

func TestDocumentID(t *testing.T) {
logp.TestingSetup()

input := common.MapStr{
"msg": `{"log": "message", "myid": "myDocumentID"}`,
}

config := common.MustNewConfigFrom(map[string]interface{}{
"fields": []string{"msg"},
"document_id": "myid",
})

p, err := NewDecodeJSONFields(config)
if err != nil {
logp.Err("Error initializing decode_json_fields")
t.Fatal(err)
}

actual, err := p.Run(&beat.Event{Fields: input})
require.NoError(t, err)

wantFields := common.MapStr{
"msg": map[string]interface{}{"log": "message"},
}
wantMeta := common.MapStr{
"_id": "myDocumentID",
}

assert.Equal(t, wantFields, actual.Fields)
assert.Equal(t, wantMeta, actual.Meta)
}

func TestValidJSONDepthOne(t *testing.T) {
input := common.MapStr{
"msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}",
Expand Down
3 changes: 3 additions & 0 deletions libbeat/processors/actions/docs/decode_json_fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ default value is false.
`error` field is going to be part of event with error message. If it set to false, there
will not be any error in event's field. Even error occurs while decoding json keys. The
default value is false
`document_id`:: (Optional) JSON key to use as the document id. If configured,
the field will be removed from the original json document and stored in
`@metadata._id`
4 changes: 2 additions & 2 deletions libbeat/processors/add_id/add_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestDefaultTargetField(t *testing.T) {
newEvent, err := p.Run(testEvent)
assert.NoError(t, err)

v, err := newEvent.GetValue("@metadata.id")
v, err := newEvent.GetValue("@metadata._id")
assert.NoError(t, err)
assert.NotEmpty(t, v)
}
Expand All @@ -59,7 +59,7 @@ func TestNonDefaultTargetField(t *testing.T) {
assert.NoError(t, err)
assert.NotEmpty(t, v)

v, err = newEvent.GetValue("@metadata.id")
v, err = newEvent.GetValue("@metadata._id")
assert.NoError(t, err)
assert.Empty(t, v)
}
2 changes: 1 addition & 1 deletion libbeat/processors/add_id/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type config struct {

func defaultConfig() config {
return config{
TargetField: "@metadata.id",
TargetField: "@metadata._id",
Type: "elasticsearch",
}
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/processors/add_id/docs/add_id.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ processors:

The following settings are supported:

`target_field`:: (Optional) Field where the generated ID will be stored. Default is `@metadata.id`.
`target_field`:: (Optional) Field where the generated ID will be stored. Default is `@metadata._id`.

`type`:: (Optional) Type of ID to generate. Currently only `elasticsearch` is supported and is the default.
The `elasticsearch` type generates IDs using the same algorithm that Elasticsearch uses for auto-generating
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/mb/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestEventConversionToBeatEvent(t *testing.T) {
e := mbEvent.BeatEvent(module, metricSet)
e = mbEvent.BeatEvent(module, metricSet)

assert.Equal(t, "foobar", e.Meta["id"])
assert.Equal(t, "foobar", e.Meta["_id"])
assert.Equal(t, timestamp, e.Timestamp)
assert.Equal(t, common.MapStr{
"type": "docker",
Expand Down