Skip to content

Commit

Permalink
Fix httpjson first_event (#26407) (#26409)
Browse files Browse the repository at this point in the history
(cherry picked from commit 0b729af)

Co-authored-by: Marc Guasch <marc-gr@users.noreply.github.com>
  • Loading branch information
mergify[bot] and marc-gr authored Jun 22, 2021
1 parent c3b060f commit b0e5e75
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Improve inode reuse handling by removing state for removed files more eagerly from the internal state table in the logs inputs. {pull}25756[25756]
- Fix default config template values for paths on oracle module: {pull}26276[26276]
- Fix bug in aws-s3 input where the end of gzipped log files might have been discarded. {pull}26260[26260]
- Fix bug in `httpjson` that prevented `first_event` getting updated. {pull}26407[26407]

*Filebeat*

Expand Down
55 changes: 51 additions & 4 deletions x-pack/filebeat/input/httpjson/internal/v2/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func TestInput(t *testing.T) {
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"interval": time.Second,
"request.method": "GET",
"response.split": map[string]interface{}{
"target": "body.items",
Expand All @@ -230,7 +230,50 @@ func TestInput(t *testing.T) {
},
},
handler: paginationHandler(),
expected: []string{`{"foo":"bar"}`, `{"foo":"bar"}`},
expected: []string{`{"foo":"a"}`, `{"foo":"b"}`},
},
{
name: "Test first event",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
registerPaginationTransforms()
registerResponseTransforms()
t.Cleanup(func() { registeredTransforms = newRegistry() })
server := httptest.NewServer(h)
config["request.url"] = server.URL
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": "GET",
"response.split": map[string]interface{}{
"target": "body.items",
"transforms": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "body.first",
"value": "[[.cursor.first]]",
"default": "none",
},
},
},
},
"response.pagination": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "url.params.page",
"value": "[[.last_response.body.nextPageToken]]",
"fail_on_template_error": true,
},
},
},
"cursor": map[string]interface{}{
"first": map[string]interface{}{
"value": "[[.first_event.foo]]",
},
},
},
handler: paginationHandler(),
expected: []string{`{"first":"none", "foo":"a"}`, `{"first":"a", "foo":"b"}`, `{"first":"a", "foo":"c"}`, `{"first":"c", "foo":"d"}`},
},
{
name: "Test pagination with array response",
Expand Down Expand Up @@ -492,14 +535,18 @@ func paginationHandler() http.HandlerFunc {
w.Header().Set("content-type", "application/json")
switch count {
case 0:
_, _ = w.Write([]byte(`{"@timestamp":"2002-10-02T15:00:00Z","nextPageToken":"bar","items":[{"foo":"bar"}]}`))
_, _ = w.Write([]byte(`{"@timestamp":"2002-10-02T15:00:00Z","nextPageToken":"bar","items":[{"foo":"a"}]}`))
case 1:
if r.URL.Query().Get("page") != "bar" {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(`{"error":"wrong page token value"}`))
return
}
_, _ = w.Write([]byte(`{"@timestamp":"2002-10-02T15:00:01Z","items":[{"foo":"bar"}]}`))
_, _ = w.Write([]byte(`{"@timestamp":"2002-10-02T15:00:01Z","items":[{"foo":"b"}]}`))
case 2:
_, _ = w.Write([]byte(`{"@timestamp":"2002-10-02T15:00:02Z","items":[{"foo":"c"}]}`))
case 3:
_, _ = w.Write([]byte(`{"@timestamp":"2002-10-02T15:00:03Z","items":[{"foo":"d"}]}`))
}
count += 1
}
Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/httpjson/internal/v2/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
return err
}

trCtx.clearIntervalData()

var n int
for maybeMsg := range eventsCh {
if maybeMsg.failed() {
Expand Down
3 changes: 1 addition & 2 deletions x-pack/filebeat/input/httpjson/internal/v2/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,9 @@ func TestCtxAfterDoRequest(t *testing.T) {
trCtx.cursorMap(),
)

// this does not change
assert.EqualValues(
t,
&common.MapStr{"@timestamp": "2002-10-02T15:00:00Z", "foo": "bar"},
&common.MapStr{"@timestamp": "2002-10-02T15:00:01Z", "foo": "bar"},
trCtx.firstEventClone(),
)

Expand Down
8 changes: 8 additions & 0 deletions x-pack/filebeat/input/httpjson/internal/v2/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ func (ctx *transformContext) updateLastResponse(r response) {
*ctx.lastResponse = r
}

func (ctx *transformContext) clearIntervalData() {
ctx.lock.Lock()
defer ctx.lock.Unlock()
ctx.lastEvent = &common.MapStr{}
ctx.firstEvent = &common.MapStr{}
ctx.lastResponse = &response{}
}

type transformable common.MapStr

func (tr transformable) access() common.MapStr {
Expand Down

0 comments on commit b0e5e75

Please sign in to comment.