Skip to content

Commit fb29d0c

Browse files
simittgraphaelli
authored andcommitted
Ensure Kibana client reconnects. (#2421) (#2436)
Implement Kibana client wrapper that takes care of obtaining a valid Kibana client, in case client cannot be obtained on startup. Align returned response body to whether or not the user sends a secret token. Log full error messages. fixes #2371
1 parent f32f936 commit fb29d0c

18 files changed

+641
-233
lines changed

agentcfg/cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func newCache(logger *logp.Logger, exp time.Duration) *cache {
4747
}
4848

4949
func (c *cache) fetchAndAdd(q Query, fn func(Query) (*Doc, error)) (doc *Doc, err error) {
50-
id := q.id()
50+
id := q.ID()
5151

5252
// return from cache if possible
5353
doc, found := c.fetch(id)

agentcfg/cache_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func newCacheSetup(service string, exp time.Duration, init bool) cacheSetup {
4545
doc: &defaultDoc,
4646
}
4747
if init {
48-
setup.c.add(setup.q.id(), setup.doc)
48+
setup.c.add(setup.q.ID(), setup.doc)
4949
}
5050
return setup
5151
}
@@ -75,7 +75,7 @@ func TestCache_fetchAndAdd(t *testing.T) {
7575
} else {
7676
assert.NoError(t, err)
7777
//ensure value is cached afterwards
78-
cachedDoc, found := setup.c.fetch(setup.q.id())
78+
cachedDoc, found := setup.c.fetch(setup.q.ID())
7979
assert.True(t, found)
8080
assert.Equal(t, doc, cachedDoc)
8181
}
@@ -89,7 +89,7 @@ func TestCache_fetchAndAdd(t *testing.T) {
8989
require.NoError(t, err)
9090
require.NotNil(t, doc)
9191
time.Sleep(exp)
92-
nilDoc, found := setup.c.fetch(setup.q.id())
92+
nilDoc, found := setup.c.fetch(setup.q.ID())
9393
assert.False(t, found)
9494
assert.Nil(t, nilDoc)
9595
})

agentcfg/fetch.go

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,41 +18,42 @@
1818
package agentcfg
1919

2020
import (
21-
"errors"
22-
"fmt"
2321
"io"
2422
"io/ioutil"
2523
"net/http"
2624
"time"
2725

28-
"github.com/elastic/beats/libbeat/common"
29-
"github.com/elastic/beats/libbeat/kibana"
26+
"github.com/pkg/errors"
27+
3028
"github.com/elastic/beats/libbeat/logp"
3129

3230
"github.com/elastic/apm-server/convert"
31+
"github.com/elastic/apm-server/kibana"
3332
)
3433

34+
// Error Messages used to signal fetching errors
3535
const (
36-
endpoint = "/api/apm/settings/agent-configuration/search"
36+
ErrMsgSendToKibanaFailed = "sending request to kibana failed"
37+
ErrMsgMultipleChoices = "multiple configurations found"
38+
ErrMsgReadKibanaResponse = "unable to read Kibana response body"
3739
)
40+
const endpoint = "/api/apm/settings/agent-configuration/search"
3841

3942
// Fetcher holds static information and information shared between requests.
4043
// It implements the Fetch method to retrieve agent configuration information.
4144
type Fetcher struct {
42-
kbClient *kibana.Client
43-
docCache *cache
44-
logger *logp.Logger
45-
minVersion common.Version
45+
docCache *cache
46+
logger *logp.Logger
47+
kbClient kibana.Client
4648
}
4749

4850
// NewFetcher returns a Fetcher instance.
49-
func NewFetcher(kbClient *kibana.Client, cacheExp time.Duration) *Fetcher {
51+
func NewFetcher(kbClient kibana.Client, cacheExp time.Duration) *Fetcher {
5052
logger := logp.NewLogger("agentcfg")
5153
return &Fetcher{
52-
kbClient: kbClient,
53-
logger: logger,
54-
docCache: newCache(logger, cacheExp),
55-
minVersion: common.Version{Major: 7, Minor: 3},
54+
kbClient: kbClient,
55+
logger: logger,
56+
docCache: newCache(logger, cacheExp),
5657
}
5758
}
5859

@@ -77,15 +78,10 @@ func (f *Fetcher) request(r io.Reader, err error) ([]byte, error) {
7778
if err != nil {
7879
return nil, err
7980
}
80-
if f.kbClient == nil {
81-
return nil, errors.New("no configured Kibana Client: provide apm-server.kibana.* settings")
82-
}
83-
if version := f.kbClient.GetVersion(); version.LessThan(&f.minVersion) {
84-
return nil, fmt.Errorf("needs Kibana version %s or higher", f.minVersion.String())
85-
}
81+
8682
resp, err := f.kbClient.Send(http.MethodPost, endpoint, nil, nil, r)
8783
if err != nil {
88-
return nil, err
84+
return nil, errors.Wrap(err, ErrMsgSendToKibanaFailed)
8985
}
9086
defer resp.Body.Close()
9187

@@ -94,8 +90,13 @@ func (f *Fetcher) request(r io.Reader, err error) ([]byte, error) {
9490
}
9591

9692
result, err := ioutil.ReadAll(resp.Body)
97-
if resp.StatusCode >= http.StatusMultipleChoices {
93+
if resp.StatusCode == http.StatusMultipleChoices {
94+
return nil, errors.Wrap(errors.New(string(result)), ErrMsgMultipleChoices)
95+
} else if resp.StatusCode > http.StatusMultipleChoices {
9896
return nil, errors.New(string(result))
9997
}
100-
return result, err
98+
if err != nil {
99+
return nil, errors.Wrap(err, ErrMsgReadKibanaResponse)
100+
}
101+
return result, nil
101102
}

agentcfg/fetch_test.go

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,30 @@ import (
2222
"testing"
2323
"time"
2424

25+
"github.com/pkg/errors"
2526
"github.com/stretchr/testify/assert"
2627
"github.com/stretchr/testify/require"
2728

28-
"github.com/elastic/beats/libbeat/kibana"
29+
"github.com/elastic/beats/libbeat/common"
2930

31+
"github.com/elastic/apm-server/kibana"
3032
"github.com/elastic/apm-server/tests"
3133
)
3234

3335
type m map[string]interface{}
3436

35-
var testExp = time.Nanosecond
37+
var (
38+
testExp = time.Nanosecond
39+
mockVersion = *common.MustNewVersion("7.3.0")
40+
)
3641

3742
func query(name string) Query {
3843
return Query{Service: Service{Name: name}}
3944
}
4045

41-
func TestFetchNoClient(t *testing.T) {
42-
kb, kerr := kibana.NewKibanaClient(nil)
43-
_, _, ferr := NewFetcher(kb, testExp).Fetch(query(t.Name()), kerr)
46+
func TestFetchWithError(t *testing.T) {
47+
kerr := errors.New("test error")
48+
_, _, ferr := NewFetcher(&kibana.ConnectingClient{}, testExp).Fetch(query(t.Name()), kerr)
4449
require.Error(t, ferr)
4550
assert.Equal(t, kerr, ferr)
4651
}
@@ -54,23 +59,23 @@ func TestFetchStringConversion(t *testing.T) {
5459
"sampling_rate": 0.5,
5560
},
5661
},
57-
})
62+
},
63+
mockVersion, true)
5864
result, etag, err := NewFetcher(kb, testExp).Fetch(query(t.Name()), nil)
5965
require.NoError(t, err)
6066
assert.Equal(t, "1", etag, etag)
6167
assert.Equal(t, map[string]string{"sampling_rate": "0.5"}, result)
6268
}
6369

64-
func TestFetchVersionCheck(t *testing.T) {
65-
kb := tests.MockKibana(http.StatusOK, m{})
66-
kb.Connection.Version.Major = 6
70+
func TestFetchError(t *testing.T) {
71+
kb := tests.MockKibana(http.StatusMultipleChoices, m{"error": "an error"}, mockVersion, true)
6772
_, _, err := NewFetcher(kb, testExp).Fetch(query(t.Name()), nil)
6873
require.Error(t, err)
69-
assert.Contains(t, err.Error(), "version")
74+
assert.Contains(t, err.Error(), ErrMsgMultipleChoices)
7075
}
7176

72-
func TestFetchError(t *testing.T) {
73-
kb := tests.MockKibana(http.StatusExpectationFailed, m{"error": "an error"})
77+
func TestExpectationFailed(t *testing.T) {
78+
kb := tests.MockKibana(http.StatusExpectationFailed, m{"error": "an error"}, mockVersion, true)
7479
_, _, err := NewFetcher(kb, testExp).Fetch(query(t.Name()), nil)
7580
require.Error(t, err)
7681
assert.Equal(t, "{\"error\":\"an error\"}", err.Error())
@@ -79,7 +84,7 @@ func TestFetchError(t *testing.T) {
7984
func TestFetchWithCaching(t *testing.T) {
8085
fetch := func(f *Fetcher, samplingRate float64) map[string]string {
8186

82-
client := func(samplingRate float64) *kibana.Client {
87+
client := func(samplingRate float64) kibana.Client {
8388
return tests.MockKibana(http.StatusOK,
8489
m{
8590
"_id": "1",
@@ -88,7 +93,8 @@ func TestFetchWithCaching(t *testing.T) {
8893
"sampling_rate": samplingRate,
8994
},
9095
},
91-
})
96+
},
97+
mockVersion, true)
9298
}
9399
f.kbClient = client(samplingRate)
94100

@@ -109,7 +115,7 @@ func TestFetchWithCaching(t *testing.T) {
109115
assert.Equal(t, map[string]string{"sampling_rate": "0.5"}, result)
110116

111117
// after key is expired, fetch from Kibana again
112-
fetcher.docCache.gocache.Delete(query(t.Name()).id())
118+
fetcher.docCache.gocache.Delete(query(t.Name()).ID())
113119
result = fetch(fetcher, 0.7)
114120
assert.Equal(t, map[string]string{"sampling_rate": "0.7"}, result)
115121

agentcfg/model.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ type Query struct {
6666
Service Service `json:"service"`
6767
}
6868

69-
func (q Query) id() string {
69+
// ID returns the unique id for the query
70+
func (q Query) ID() string {
7071
var str strings.Builder
7172
str.WriteString(q.Service.Name)
7273
if q.Service.Environment != "" {

0 commit comments

Comments
 (0)