Skip to content

Commit

Permalink
[Libbeat] Respect the parameters option defined in the ES output. (#…
Browse files Browse the repository at this point in the history
…18326) (#18348)

(cherry picked from commit 0575e15)

Co-authored-by: Pier-Hugues Pellerin <phpellerin@gmail.com>
  • Loading branch information
ruflin and ph authored May 11, 2020
1 parent d6dd4a7 commit c8e8875
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ https://github.com/elastic/beats/compare/v7.5.0...v7.5.1[View commits]
- Fix `proxy_url` option in Elasticsearch output. {pull}14950[14950]
- Fix bug with potential concurrent reads and writes from event.Meta map by Kafka output. {issue}14542[14542] {pull}14568[14568]
- Fix license detection, when a beats successfully connect to Elasticsearch the detected license will be show in the log at info level. {pull}15834[15834]
- Fix the `parameters` option configured in the Elasticsearch output so the values are added to the query string on bulk request. {issues}18325[18325]

*Filebeat*

Expand Down
28 changes: 26 additions & 2 deletions libbeat/esleg/eslegclient/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,16 @@ func (conn *Connection) Bulk(
return 0, nil, nil
}

mergedParams := mergeParams(conn.ConnectionSettings.Parameters, params)

enc := conn.Encoder
enc.Reset()
if err := bulkEncode(conn.log, enc, body); err != nil {
apm.CaptureError(ctx, err).Send()
return 0, nil, err
}

requ, err := newBulkRequest(conn.URL, index, docType, params, enc)
requ, err := newBulkRequest(conn.URL, index, docType, mergedParams, enc)
if err != nil {
apm.CaptureError(ctx, err).Send()
return 0, nil, err
Expand All @@ -103,6 +105,8 @@ func (conn *Connection) SendMonitoringBulk(
return nil, nil
}

mergedParams := mergeParams(conn.ConnectionSettings.Parameters, params)

enc := conn.Encoder
enc.Reset()
if err := bulkEncode(conn.log, enc, body); err != nil {
Expand All @@ -115,7 +119,7 @@ func (conn *Connection) SendMonitoringBulk(
}
}

requ, err := newMonitoringBulkRequest(conn.GetVersion(), conn.URL, params, enc)
requ, err := newMonitoringBulkRequest(conn.GetVersion(), conn.URL, mergedParams, enc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -229,3 +233,23 @@ func bulkEncode(log *logp.Logger, out BulkWriter, body []interface{}) error {
}
return nil
}

func mergeParams(m1, m2 map[string]string) map[string]string {
if len(m1) == 0 {
return m2
}
if len(m2) == 0 {
return m1
}
merged := make(map[string]string, len(m1)+len(m2))

for k, v := range m1 {
merged[k] = v
}

for k, v := range m2 {
merged[k] = v
}

return merged
}
57 changes: 57 additions & 0 deletions libbeat/esleg/eslegclient/bulkapi_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ package eslegclient

import (
"context"
"errors"
"fmt"
"net/http"
"os"
"strings"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/logp"
)

Expand Down Expand Up @@ -146,3 +149,57 @@ func TestOneHost503Resp_Bulk(t *testing.T) {
t.Errorf("Should return <503 Service Unavailable> instead of %v", err)
}
}

func TestEnforceParameters(t *testing.T) {
client, _ := NewConnection(ConnectionSettings{
Parameters: map[string]string{"hello": "world"},
URL: "http://localhost",
Timeout: 0,
})
client.Encoder = NewJSONEncoder(nil, false)

client.HTTP = &reqInspector{
assert: func(req *http.Request) (*http.Response, error) {
assert.Equal(t, "world", req.URL.Query().Get("hello"))
// short circuit others logic.
return nil, errors.New("shortcut")
},
}

index := "what"

ops := []map[string]interface{}{
{
"index": map[string]interface{}{
"_index": index,
"_type": "type1",
"_id": "1",
},
},
{
"field1": "value1",
},
}

body := make([]interface{}, 0, 10)
for _, op := range ops {
body = append(body, op)
}

params := map[string]string{
"refresh": "true",
}

client.Bulk(context.Background(), index, "type1", params, body)
}

type reqInspector struct {
assert func(req *http.Request) (*http.Response, error)
}

func (r *reqInspector) Do(req *http.Request) (*http.Response, error) {
return r.assert(req)
}

func (r *reqInspector) CloseIdleConnections() {
}

0 comments on commit c8e8875

Please sign in to comment.