Skip to content

Commit

Permalink
Fix _id field in s3 and googlepubsub inputs (#17026)
Browse files Browse the repository at this point in the history
In #15859 the Elasticsearch output was changed to read from the @metadata._id field when it had been using @metadata.id.
The s3 and googlepubsub inputs had both been setting @metadata.id, but were not updated with that change.

This updates the s3 and googlepubsub inputs to use `beat.Event#SetID()` rather than creating the metadata object themselves.
  • Loading branch information
andrewkroh authored Mar 18, 2020
1 parent efdab6f commit 304eca4
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix merging of fileset inputs to replace paths and append processors. {pull}16450{16450}
- Add queue_url definition in manifest file for aws module. {pull}16640{16640}
- Fix issue where autodiscover hints default configuration was not being copied. {pull}16987[16987]
- Fix Elasticsearch `_id` field set by S3 and Google Pub/Sub inputs. {pull}17026[17026]

*Heartbeat*

Expand Down
27 changes: 13 additions & 14 deletions x-pack/filebeat/input/googlepubsub/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,25 +193,24 @@ func makeTopicID(project, topic string) string {
func makeEvent(topicID string, msg *pubsub.Message) beat.Event {
id := topicID + "-" + msg.ID

fields := common.MapStr{
"event": common.MapStr{
"id": id,
"created": time.Now().UTC(),
event := beat.Event{
Timestamp: msg.PublishTime.UTC(),
Fields: common.MapStr{
"event": common.MapStr{
"id": id,
"created": time.Now().UTC(),
},
"message": string(msg.Data),
},
"message": string(msg.Data),
Private: msg,
}
event.SetID(id)

if len(msg.Attributes) > 0 {
fields.Put("labels", msg.Attributes)
event.PutValue("labels", msg.Attributes)
}

return beat.Event{
Timestamp: msg.PublishTime.UTC(),
Meta: common.MapStr{
"id": id,
},
Fields: fields,
Private: msg,
}
return event
}

func (in *pubsubInput) getOrCreateSubscription(ctx context.Context, client *pubsub.Client) (*pubsub.Subscription, error) {
Expand Down
47 changes: 24 additions & 23 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,33 +587,34 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s
}

func createEvent(log string, offset int, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event {
f := common.MapStr{
"message": log,
"log": common.MapStr{
"offset": int64(offset),
"file.path": constructObjectURL(info),
},
"aws": common.MapStr{
"s3": common.MapStr{
"bucket": common.MapStr{
"name": info.name,
"arn": info.arn},
"object.key": info.key,
s3Ctx.Inc()

event := beat.Event{
Timestamp: time.Now().UTC(),
Fields: common.MapStr{
"message": log,
"log": common.MapStr{
"offset": int64(offset),
"file.path": constructObjectURL(info),
},
"aws": common.MapStr{
"s3": common.MapStr{
"bucket": common.MapStr{
"name": info.name,
"arn": info.arn},
"object.key": info.key,
},
},
"cloud": common.MapStr{
"provider": "aws",
"region": info.region,
},
},
"cloud": common.MapStr{
"provider": "aws",
"region": info.region,
},
Private: s3Ctx,
}
event.SetID(objectHash + "-" + fmt.Sprintf("%012d", offset))

s3Ctx.Inc()
return beat.Event{
Timestamp: time.Now(),
Fields: f,
Meta: common.MapStr{"id": objectHash + "-" + fmt.Sprintf("%012d", offset)},
Private: s3Ctx,
}
return event
}

func constructObjectURL(info s3Info) string {
Expand Down

0 comments on commit 304eca4

Please sign in to comment.