Skip to content

Commit

Permalink
[Filebeat][httpjson] Adding possibility to use first_event in cursor …
Browse files Browse the repository at this point in the history
…context (#23437) (#23491)

* adding possibility to use first_event context in httpjson

* fixing typo

* Add transform context state test

* updating changelog

Co-authored-by: Marc Guasch <marc.guasch@elastic.co>
(cherry picked from commit d4e85e0)

Co-authored-by: Marius Iversen <marius.iversen@elastic.co>
  • Loading branch information
marc-gr and P1llus authored Jan 25, 2021
1 parent f157b3e commit b0fc8ab
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add Google Workspace module and mark Gsuite module as deprecated {pull}22950[22950]
- Mark m365 defender, defender atp, okta and google workspace modules as GA {pull}23113[23113]
- Add parsing of tcp flags to AWS vpcflow fileset {issue}228020[22820] {pull}23157[23157]
- Added support for first_event context in filebeat httpjson input {pull}23437[23437]
- Added `alternative_host` option to google pubsub input {pull}23215[23215]
- Added `encode_as` and `decode_as` options to httpjson along with pluggable encoders/decoders {pull}23478[23478]

Expand Down
7 changes: 4 additions & 3 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ The state has the following elements:
- `last_response.header`: A map containing the headers from the last successful response.
- `last_response.body`: A map containing the parsed JSON body from the last successful response. This is the response as it comes from the remote server.
- `last_response.page`: A number indicating the page number of the last response.
- `first_event`: A map representing the first event sent to the output (result from applying transforms to `last_response.body`).
- `last_event`: A map representing the last event sent to the output (result from applying transforms to `last_response.body`).
- `url.value`: The full URL with params and fragments.
- `url.params`: A map containing the URL params.
Expand Down Expand Up @@ -517,7 +518,7 @@ A set of transforms can be defined. This list will be applied after `response.tr

Available transforms for response: [`append`, `delete`, `set`].

Can read state from: [`.last_response.*`, `.last_event.*`, `.cursor.*`, `.header.*`, `.url.*`].
Can read state from: [`.last_response.*`, `.first_event.*`, `.last_event.*`, `.cursor.*`, `.header.*`, `.url.*`].

Can write state to: [`body.*`].

Expand Down Expand Up @@ -551,7 +552,7 @@ List of transforms that will be applied to the response to every new page reques

Available transforms for pagination: [`append`, `delete`, `set`].

Can read state from: [`.last_response.*`, `.last_event.*`, `.cursor.*`].
Can read state from: [`.last_response.*`, `.first_event.*`, `.last_event.*`, `.cursor.*`].

Can write state to: [`body.*`, `header.*`, `url.*`].

Expand Down Expand Up @@ -809,7 +810,7 @@ This will output:

Cursor is a list of key value objects where arbitrary values are defined. The values are interpreted as <<value-templates,value templates>> and a default template can be set. Cursor state is kept between input restarts and updated once all the events for a request are published.

Can read state from: [`.last_response.*`, `.last_event.*`].
Can read state from: [`.last_response.*`, `.first_event.*`, `.last_event.*`].

NOTE: Default templates do not have access to any state, only to functions.

Expand Down
6 changes: 4 additions & 2 deletions x-pack/filebeat/input/httpjson/internal/v2/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,11 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
r.log.Errorf("error publishing event: %v", err)
continue
}

if len(*trCtx.firstEventClone()) == 0 {
trCtx.updateFirstEvent(maybeMsg.msg)
}
trCtx.updateLastEvent(maybeMsg.msg)
n += 1
n++
}

trCtx.updateCursor()
Expand Down
136 changes: 136 additions & 0 deletions x-pack/filebeat/input/httpjson/internal/v2/request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package v2

import (
"context"
"fmt"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
beattest "github.com/elastic/beats/v7/libbeat/publisher/testing"
)

func TestCtxAfterDoRequest(t *testing.T) {
registerRequestTransforms()
t.Cleanup(func() { registeredTransforms = newRegistry() })

// mock timeNow func to return a fixed value
timeNow = func() time.Time {
t, _ := time.Parse(time.RFC3339, "2002-10-02T15:00:00Z")
return t
}
t.Cleanup(func() { timeNow = time.Now })

// test with dateCursorHandler to have different payloads each request
testServer := httptest.NewServer(dateCursorHandler())
t.Cleanup(testServer.Close)

cfg := common.MustNewConfigFrom(map[string]interface{}{
"interval": 1,
"request.method": "GET",
"request.url": testServer.URL,
"request.transforms": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "url.params.$filter",
"value": "alertCreationTime ge [[.cursor.timestamp]]",
"default": `alertCreationTime ge [[formatDate (now (parseDuration "-10m")) "2006-01-02T15:04:05Z"]]`,
},
},
},
"cursor": map[string]interface{}{
"timestamp": map[string]interface{}{
"value": `[[index .last_response.body "@timestamp"]]`,
},
},
})

config := defaultConfig()
assert.NoError(t, cfg.Unpack(&config))

log := logp.NewLogger("")
ctx := context.Background()
client, err := newHTTPClient(ctx, config, nil, log)
assert.NoError(t, err)

requestFactory := newRequestFactory(config.Request, nil, log)
pagination := newPagination(config, client, log)
responseProcessor := newResponseProcessor(config.Response, pagination, log)

requester := newRequester(client, requestFactory, responseProcessor, log)

trCtx := emptyTransformContext()
trCtx.cursor = newCursor(config.Cursor, log)

// first request
assert.NoError(t, requester.doRequest(ctx, trCtx, statelessPublisher{&beattest.FakeClient{}}))

assert.EqualValues(
t,
common.MapStr{"timestamp": "2002-10-02T15:00:00Z"},
trCtx.cursorMap(),
)
assert.EqualValues(
t,
&common.MapStr{"@timestamp": "2002-10-02T15:00:00Z", "foo": "bar"},
trCtx.firstEventClone(),
)
assert.EqualValues(
t,
&common.MapStr{"@timestamp": "2002-10-02T15:00:00Z", "foo": "bar"},
trCtx.lastEventClone(),
)
lastResp := trCtx.lastResponseClone()
// ignore since has dynamic date and content length values
// and is not relevant
lastResp.header = nil
assert.EqualValues(t,
&response{
page: 1,
url: newURL(fmt.Sprintf("%s?%s", testServer.URL, "%24filter=alertCreationTime+ge+2002-10-02T14%3A50%3A00Z")),
body: common.MapStr{"@timestamp": "2002-10-02T15:00:00Z", "foo": "bar"},
},
lastResp,
)

// second request
assert.NoError(t, requester.doRequest(ctx, trCtx, statelessPublisher{&beattest.FakeClient{}}))

assert.EqualValues(
t,
common.MapStr{"timestamp": "2002-10-02T15:00:01Z"},
trCtx.cursorMap(),
)

// this does not change
assert.EqualValues(
t,
&common.MapStr{"@timestamp": "2002-10-02T15:00:00Z", "foo": "bar"},
trCtx.firstEventClone(),
)

assert.EqualValues(
t,
&common.MapStr{"@timestamp": "2002-10-02T15:00:01Z", "foo": "bar"},
trCtx.lastEventClone(),
)

lastResp = trCtx.lastResponseClone()
lastResp.header = nil
assert.EqualValues(t,
&response{
page: 1,
url: newURL(fmt.Sprintf("%s?%s", testServer.URL, "%24filter=alertCreationTime+ge+2002-10-02T15%3A00%3A00Z")),
body: common.MapStr{"@timestamp": "2002-10-02T15:00:01Z", "foo": "bar"},
},
lastResp,
)
}
16 changes: 16 additions & 0 deletions x-pack/filebeat/input/httpjson/internal/v2/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type transforms []transform
type transformContext struct {
lock sync.RWMutex
cursor *cursor
firstEvent *common.MapStr
lastEvent *common.MapStr
lastResponse *response
}
Expand All @@ -33,6 +34,7 @@ func emptyTransformContext() *transformContext {
return &transformContext{
cursor: &cursor{},
lastEvent: &common.MapStr{},
firstEvent: &common.MapStr{},
lastResponse: &response{},
}
}
Expand All @@ -50,6 +52,13 @@ func (ctx *transformContext) lastEventClone() *common.MapStr {
return &clone
}

func (ctx *transformContext) firstEventClone() *common.MapStr {
ctx.lock.RLock()
defer ctx.lock.RUnlock()
clone := ctx.firstEvent.Clone()
return &clone
}

func (ctx *transformContext) lastResponseClone() *response {
ctx.lock.RLock()
defer ctx.lock.RUnlock()
Expand All @@ -63,6 +72,7 @@ func (ctx *transformContext) updateCursor() {
// we do not want to pass the cursor data to itself
newCtx := emptyTransformContext()
newCtx.lastEvent = ctx.lastEvent
newCtx.firstEvent = ctx.firstEvent
newCtx.lastResponse = ctx.lastResponse

ctx.cursor.update(newCtx)
Expand All @@ -74,6 +84,12 @@ func (ctx *transformContext) updateLastEvent(e common.MapStr) {
*ctx.lastEvent = e
}

func (ctx *transformContext) updateFirstEvent(e common.MapStr) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
*ctx.firstEvent = e
}

func (ctx *transformContext) updateLastResponse(r response) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func TestEmptyTransformContext(t *testing.T) {
ctx := emptyTransformContext()
assert.Equal(t, &cursor{}, ctx.cursor)
assert.Equal(t, &common.MapStr{}, ctx.lastEvent)
assert.Equal(t, &common.MapStr{}, ctx.firstEvent)
assert.Equal(t, &response{}, ctx.lastResponse)
}

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/httpjson/internal/v2/value_tpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (t *valueTpl) Execute(trCtx *transformContext, tr transformable, defaultVal
buf := new(bytes.Buffer)
data := tr.Clone()
data.Put("cursor", trCtx.cursorMap())
data.Put("first_event", trCtx.firstEventClone())
data.Put("last_event", trCtx.lastEventClone())
data.Put("last_response", trCtx.lastResponseClone().templateValues())

Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/httpjson/internal/v2/value_tpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestValueTpl(t *testing.T) {
name: "can render values from ctx",
value: "[[.last_response.body.param]]",
paramCtx: &transformContext{
firstEvent: &common.MapStr{},
lastEvent: &common.MapStr{},
lastResponse: newTestResponse(common.MapStr{"param": 25}, nil, ""),
},
Expand Down Expand Up @@ -159,7 +160,8 @@ func TestValueTpl(t *testing.T) {
name: "func getRFC5988Link",
value: `[[ getRFC5988Link "previous" .last_response.header.Link ]]`,
paramCtx: &transformContext{
lastEvent: &common.MapStr{},
firstEvent: &common.MapStr{},
lastEvent: &common.MapStr{},
lastResponse: newTestResponse(
nil,
http.Header{"Link": []string{
Expand Down

0 comments on commit b0fc8ab

Please sign in to comment.