Skip to content

[7.3] Ensure Kibana client reconnects. (#2421) #2436

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agentcfg/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func newCache(logger *logp.Logger, exp time.Duration) *cache {
}

func (c *cache) fetchAndAdd(q Query, fn func(Query) (*Doc, error)) (doc *Doc, err error) {
id := q.id()
id := q.ID()

// return from cache if possible
doc, found := c.fetch(id)
Expand Down
6 changes: 3 additions & 3 deletions agentcfg/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func newCacheSetup(service string, exp time.Duration, init bool) cacheSetup {
doc: &defaultDoc,
}
if init {
setup.c.add(setup.q.id(), setup.doc)
setup.c.add(setup.q.ID(), setup.doc)
}
return setup
}
Expand Down Expand Up @@ -75,7 +75,7 @@ func TestCache_fetchAndAdd(t *testing.T) {
} else {
assert.NoError(t, err)
//ensure value is cached afterwards
cachedDoc, found := setup.c.fetch(setup.q.id())
cachedDoc, found := setup.c.fetch(setup.q.ID())
assert.True(t, found)
assert.Equal(t, doc, cachedDoc)
}
Expand All @@ -89,7 +89,7 @@ func TestCache_fetchAndAdd(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, doc)
time.Sleep(exp)
nilDoc, found := setup.c.fetch(setup.q.id())
nilDoc, found := setup.c.fetch(setup.q.ID())
assert.False(t, found)
assert.Nil(t, nilDoc)
})
Expand Down
47 changes: 24 additions & 23 deletions agentcfg/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,42 @@
package agentcfg

import (
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/kibana"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"

"github.com/elastic/apm-server/convert"
"github.com/elastic/apm-server/kibana"
)

// Error Messages used to signal fetching errors
const (
endpoint = "/api/apm/settings/agent-configuration/search"
ErrMsgSendToKibanaFailed = "sending request to kibana failed"
ErrMsgMultipleChoices = "multiple configurations found"
ErrMsgReadKibanaResponse = "unable to read Kibana response body"
)
const endpoint = "/api/apm/settings/agent-configuration/search"

// Fetcher holds static information and information shared between requests.
// It implements the Fetch method to retrieve agent configuration information.
type Fetcher struct {
kbClient *kibana.Client
docCache *cache
logger *logp.Logger
minVersion common.Version
docCache *cache
logger *logp.Logger
kbClient kibana.Client
}

// NewFetcher returns a Fetcher instance.
func NewFetcher(kbClient *kibana.Client, cacheExp time.Duration) *Fetcher {
func NewFetcher(kbClient kibana.Client, cacheExp time.Duration) *Fetcher {
logger := logp.NewLogger("agentcfg")
return &Fetcher{
kbClient: kbClient,
logger: logger,
docCache: newCache(logger, cacheExp),
minVersion: common.Version{Major: 7, Minor: 3},
kbClient: kbClient,
logger: logger,
docCache: newCache(logger, cacheExp),
}
}

Expand All @@ -77,15 +78,10 @@ func (f *Fetcher) request(r io.Reader, err error) ([]byte, error) {
if err != nil {
return nil, err
}
if f.kbClient == nil {
return nil, errors.New("no configured Kibana Client: provide apm-server.kibana.* settings")
}
if version := f.kbClient.GetVersion(); version.LessThan(&f.minVersion) {
return nil, fmt.Errorf("needs Kibana version %s or higher", f.minVersion.String())
}

resp, err := f.kbClient.Send(http.MethodPost, endpoint, nil, nil, r)
if err != nil {
return nil, err
return nil, errors.Wrap(err, ErrMsgSendToKibanaFailed)
}
defer resp.Body.Close()

Expand All @@ -94,8 +90,13 @@ func (f *Fetcher) request(r io.Reader, err error) ([]byte, error) {
}

result, err := ioutil.ReadAll(resp.Body)
if resp.StatusCode >= http.StatusMultipleChoices {
if resp.StatusCode == http.StatusMultipleChoices {
return nil, errors.Wrap(errors.New(string(result)), ErrMsgMultipleChoices)
} else if resp.StatusCode > http.StatusMultipleChoices {
return nil, errors.New(string(result))
}
return result, err
if err != nil {
return nil, errors.Wrap(err, ErrMsgReadKibanaResponse)
}
return result, nil
}
36 changes: 21 additions & 15 deletions agentcfg/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,30 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/libbeat/kibana"
"github.com/elastic/beats/libbeat/common"

"github.com/elastic/apm-server/kibana"
"github.com/elastic/apm-server/tests"
)

type m map[string]interface{}

var testExp = time.Nanosecond
var (
testExp = time.Nanosecond
mockVersion = *common.MustNewVersion("7.3.0")
)

func query(name string) Query {
return Query{Service: Service{Name: name}}
}

func TestFetchNoClient(t *testing.T) {
kb, kerr := kibana.NewKibanaClient(nil)
_, _, ferr := NewFetcher(kb, testExp).Fetch(query(t.Name()), kerr)
func TestFetchWithError(t *testing.T) {
kerr := errors.New("test error")
_, _, ferr := NewFetcher(&kibana.ConnectingClient{}, testExp).Fetch(query(t.Name()), kerr)
require.Error(t, ferr)
assert.Equal(t, kerr, ferr)
}
Expand All @@ -54,23 +59,23 @@ func TestFetchStringConversion(t *testing.T) {
"sampling_rate": 0.5,
},
},
})
},
mockVersion, true)
result, etag, err := NewFetcher(kb, testExp).Fetch(query(t.Name()), nil)
require.NoError(t, err)
assert.Equal(t, "1", etag, etag)
assert.Equal(t, map[string]string{"sampling_rate": "0.5"}, result)
}

func TestFetchVersionCheck(t *testing.T) {
kb := tests.MockKibana(http.StatusOK, m{})
kb.Connection.Version.Major = 6
func TestFetchError(t *testing.T) {
kb := tests.MockKibana(http.StatusMultipleChoices, m{"error": "an error"}, mockVersion, true)
_, _, err := NewFetcher(kb, testExp).Fetch(query(t.Name()), nil)
require.Error(t, err)
assert.Contains(t, err.Error(), "version")
assert.Contains(t, err.Error(), ErrMsgMultipleChoices)
}

func TestFetchError(t *testing.T) {
kb := tests.MockKibana(http.StatusExpectationFailed, m{"error": "an error"})
func TestExpectationFailed(t *testing.T) {
kb := tests.MockKibana(http.StatusExpectationFailed, m{"error": "an error"}, mockVersion, true)
_, _, err := NewFetcher(kb, testExp).Fetch(query(t.Name()), nil)
require.Error(t, err)
assert.Equal(t, "{\"error\":\"an error\"}", err.Error())
Expand All @@ -79,7 +84,7 @@ func TestFetchError(t *testing.T) {
func TestFetchWithCaching(t *testing.T) {
fetch := func(f *Fetcher, samplingRate float64) map[string]string {

client := func(samplingRate float64) *kibana.Client {
client := func(samplingRate float64) kibana.Client {
return tests.MockKibana(http.StatusOK,
m{
"_id": "1",
Expand All @@ -88,7 +93,8 @@ func TestFetchWithCaching(t *testing.T) {
"sampling_rate": samplingRate,
},
},
})
},
mockVersion, true)
}
f.kbClient = client(samplingRate)

Expand All @@ -109,7 +115,7 @@ func TestFetchWithCaching(t *testing.T) {
assert.Equal(t, map[string]string{"sampling_rate": "0.5"}, result)

// after key is expired, fetch from Kibana again
fetcher.docCache.gocache.Delete(query(t.Name()).id())
fetcher.docCache.gocache.Delete(query(t.Name()).ID())
result = fetch(fetcher, 0.7)
assert.Equal(t, map[string]string{"sampling_rate": "0.7"}, result)

Expand Down
3 changes: 2 additions & 1 deletion agentcfg/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ type Query struct {
Service Service `json:"service"`
}

func (q Query) id() string {
// ID returns the unique id for the query
func (q Query) ID() string {
var str strings.Builder
str.WriteString(q.Service.Name)
if q.Service.Environment != "" {
Expand Down
Loading