Skip to content

[ACM] Create Etag from settings struct. #2441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions agentcfg/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
)

var (
defaultDoc = Doc{Source: Source{Settings: Settings{"a": "default"}}}
externalDoc = Doc{Source: Source{Settings: Settings{"a": "b"}}}
defaultDoc = Doc{Settings: Settings{"a": "default"}}
externalDoc = Doc{Settings: Settings{"a": "b"}}
)

type cacheSetup struct {
Expand Down Expand Up @@ -62,7 +62,7 @@ func TestCache_fetchAndAdd(t *testing.T) {
"DocFromCache": {fn: testFn, init: true, doc: &defaultDoc},
"DocFromFunctionFails": {fn: testFnErr, fail: true},
"DocFromFunction": {fn: testFn, doc: &externalDoc},
"EmptyDocFromFunction": {fn: testFnSettingsNil, doc: &Doc{Source: Source{Settings: Settings{}}}},
"EmptyDocFromFunction": {fn: testFnSettingsNil, doc: &Doc{Settings: Settings{}}},
"NilDocFromFunction": {fn: testFnNil},
} {
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -134,7 +134,7 @@ func testFnNil(_ Query) (*Doc, error) {
}

func testFnSettingsNil(_ Query) (*Doc, error) {
return &Doc{Source: Source{Settings: Settings{}}}, nil
return &Doc{Settings: Settings{}}, nil
}

func testFn(_ Query) (*Doc, error) {
Expand Down
9 changes: 5 additions & 4 deletions agentcfg/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,19 @@ func NewFetcher(kbClient kibana.Client, cacheExp time.Duration) *Fetcher {
// Fetch retrieves agent configuration, fetched from Kibana or a local temporary cache.
func (f *Fetcher) Fetch(q Query, err error) (map[string]string, string, error) {
req := func(query Query) (*Doc, error) {
var doc Doc
resultBytes, err := f.request(convert.ToReader(query), err)
err = convert.FromBytes(resultBytes, &doc, err)
return &doc, err
if err != nil {
return nil, err
}
return NewDoc(resultBytes)
}

doc, err := f.docCache.fetchAndAdd(q, req)
if err != nil {
return nil, "", err
}

return doc.Source.Settings, doc.ID, nil
return doc.Settings, doc.ID, nil
}

func (f *Fetcher) request(r io.Reader, err error) ([]byte, error) {
Expand Down
151 changes: 83 additions & 68 deletions agentcfg/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package agentcfg

import (
"encoding/json"
"net/http"
"testing"
"time"
Expand All @@ -39,84 +40,98 @@ var (
mockVersion = *common.MustNewVersion("7.3.0")
)

func query(name string) Query {
return Query{Service: Service{Name: name}}
}
func TestFetcher_Fetch(t *testing.T) {
t.Run("ErrorInput", func(t *testing.T) {
kerr := errors.New("test error")
_, _, ferr := NewFetcher(&kibana.ConnectingClient{}, testExp).Fetch(query(t.Name()), kerr)
require.Error(t, ferr)
assert.Equal(t, kerr, ferr)
})

t.Run("FetchError", func(t *testing.T) {
kb := tests.MockKibana(http.StatusMultipleChoices, m{"error": "an error"}, mockVersion, true)
_, _, err := NewFetcher(kb, testExp).Fetch(query(t.Name()), nil)
require.Error(t, err)
assert.Contains(t, err.Error(), ErrMsgMultipleChoices)
})

t.Run("ExpectationFailed", func(t *testing.T) {
kb := tests.MockKibana(http.StatusExpectationFailed, m{"error": "an error"}, mockVersion, true)
_, _, err := NewFetcher(kb, testExp).Fetch(query(t.Name()), nil)
require.Error(t, err)
assert.Equal(t, "{\"error\":\"an error\"}", err.Error())
})

t.Run("NotFound", func(t *testing.T) {
kb := tests.MockKibana(http.StatusNotFound, m{}, mockVersion, true)
doc, err := NewDoc([]byte{})
require.NoError(t, err)

func TestFetchWithError(t *testing.T) {
kerr := errors.New("test error")
_, _, ferr := NewFetcher(&kibana.ConnectingClient{}, testExp).Fetch(query(t.Name()), kerr)
require.Error(t, ferr)
assert.Equal(t, kerr, ferr)
}
result, etag, err := NewFetcher(kb, testExp).Fetch(query(t.Name()), nil)
require.NoError(t, err)
assert.Equal(t, doc.ID, etag)
assert.Equal(t, doc.Settings, Settings(result))
})

func TestFetchStringConversion(t *testing.T) {
kb := tests.MockKibana(http.StatusOK,
m{
"_id": "1",
"_source": m{
"settings": m{
"sampling_rate": 0.5,
},
},
},
mockVersion, true)
result, etag, err := NewFetcher(kb, testExp).Fetch(query(t.Name()), nil)
require.NoError(t, err)
assert.Equal(t, "1", etag, etag)
assert.Equal(t, map[string]string{"sampling_rate": "0.5"}, result)
}
t.Run("Success", func(t *testing.T) {
kb := tests.MockKibana(http.StatusOK, mockDoc(0.5), mockVersion, true)
b, err := json.Marshal(mockDoc(0.5))
require.NoError(t, err)
doc, err := NewDoc(b)
require.NoError(t, err)

func TestFetchError(t *testing.T) {
kb := tests.MockKibana(http.StatusMultipleChoices, m{"error": "an error"}, mockVersion, true)
_, _, err := NewFetcher(kb, testExp).Fetch(query(t.Name()), nil)
require.Error(t, err)
assert.Contains(t, err.Error(), ErrMsgMultipleChoices)
}
result, etag, err := NewFetcher(kb, testExp).Fetch(query(t.Name()), nil)
require.NoError(t, err)
assert.Equal(t, doc.ID, etag)
assert.Equal(t, doc.Settings, Settings(result))
})

func TestExpectationFailed(t *testing.T) {
kb := tests.MockKibana(http.StatusExpectationFailed, m{"error": "an error"}, mockVersion, true)
_, _, err := NewFetcher(kb, testExp).Fetch(query(t.Name()), nil)
require.Error(t, err)
assert.Equal(t, "{\"error\":\"an error\"}", err.Error())
}
t.Run("FetchFromCache", func(t *testing.T) {

func TestFetchWithCaching(t *testing.T) {
fetch := func(f *Fetcher, samplingRate float64) map[string]string {

client := func(samplingRate float64) kibana.Client {
return tests.MockKibana(http.StatusOK,
m{
"_id": "1",
"_source": m{
"settings": m{
"sampling_rate": samplingRate,
},
},
},
mockVersion, true)
fetch := func(f *Fetcher, kibanaSamplingRate, expectedSamplingRate float64) {

client := func(samplingRate float64) kibana.Client {
return tests.MockKibana(http.StatusOK, mockDoc(samplingRate), mockVersion, true)
}
f.kbClient = client(kibanaSamplingRate)

b, err := json.Marshal(mockDoc(expectedSamplingRate))
require.NoError(t, err)
doc, err := NewDoc(b)
require.NoError(t, err)

result, etag, err := f.Fetch(query(t.Name()), nil)
require.NoError(t, err)
assert.Equal(t, doc.ID, etag)
assert.Equal(t, doc.Settings, Settings(result))
}
f.kbClient = client(samplingRate)

result, id, err := f.Fetch(query(t.Name()), nil)
require.NoError(t, err)
require.Equal(t, "1", id)
return result
}
fetcher := NewFetcher(nil, time.Minute)

fetcher := NewFetcher(nil, time.Minute)
// nothing cached yet
fetch(fetcher, 0.5, 0.5)

// nothing cached yet
result := fetch(fetcher, 0.5)
assert.Equal(t, map[string]string{"sampling_rate": "0.5"}, result)
// next fetch runs against cache
fetch(fetcher, 0.8, 0.5)

// next fetch runs against cache
result = fetch(fetcher, 0.8)
assert.Equal(t, map[string]string{"sampling_rate": "0.5"}, result)
// after key is expired, fetch from Kibana again
fetcher.docCache.gocache.Delete(query(t.Name()).ID())
fetch(fetcher, 0.7, 0.7)

// after key is expired, fetch from Kibana again
fetcher.docCache.gocache.Delete(query(t.Name()).ID())
result = fetch(fetcher, 0.7)
assert.Equal(t, map[string]string{"sampling_rate": "0.7"}, result)
})
}

func query(name string) Query {
return Query{Service: Service{Name: name}}
}

func mockDoc(sampleRate float64) m {
return m{
"_id": "1",
"_source": m{
"settings": m{
"sampling_rate": sampleRate,
},
},
}
}
105 changes: 63 additions & 42 deletions agentcfg/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,67 +18,88 @@
package agentcfg

import (
"crypto/md5"
"encoding/json"
"fmt"
"hash"
"sort"
"strings"
)

const (
// ServiceName keyword
ServiceName = "service.name"
// ServiceEnv keyword
ServiceEnv = "service.environment"
)

// Doc represents an elasticsearch document
type Doc struct {
ID string `json:"_id"`
Source Source `json:"_source"`
}

// Source represents the elasticsearch _source field of a document
type Source struct {
Settings Settings `json:"settings"`
Settings Settings
ID string
}

// Settings hold agent configuration
type Settings map[string]string

// UnmarshalJSON overrides default method to convert any JSON type to string
func (s *Settings) UnmarshalJSON(b []byte) error {
in := make(map[string]interface{})
out := make(map[string]string)
err := json.Unmarshal(b, &in)
for k, v := range in {
out[k] = fmt.Sprintf("%v", v)
// NewDoc unmarshals given byte slice into a Doc instance
func NewDoc(inp []byte) (*Doc, error) {
settings, err := unmarshal(inp)
if err != nil {
return nil, err
}

h := md5.New()
var out = map[string]string{}
if err := parse(settings, out, "", h); err != nil {
return nil, err
}
*s = out
return err
}

// NewQuery creates a Query struct
func NewQuery(name, env string) Query {
return Query{Service{name, env}}
return &Doc{ID: fmt.Sprintf("%x", h.Sum(nil)), Settings: out}, nil
}

// Query represents an URL body or query params for agent configuration
type Query struct {
Service Service `json:"service"`
func unmarshal(inp []byte) (map[string]interface{}, error) {
if len(inp) == 0 {
return nil, nil
}
type tmpDoc struct {
Source struct {
Settings map[string]interface{} `json:"settings"`
} `json:"_source"`
}
var tmp tmpDoc
if err := json.Unmarshal(inp, &tmp); err != nil {
return nil, err
}
return tmp.Source.Settings, nil
}

// ID returns the unique id for the query
func (q Query) ID() string {
var str strings.Builder
str.WriteString(q.Service.Name)
if q.Service.Environment != "" {
str.WriteString("_")
str.WriteString(q.Service.Environment)
func parse(inp map[string]interface{}, out map[string]string, rootKey string, h hash.Hash) error {
var keys []string
for k := range inp {
keys = append(keys, k)
}
return str.String()
sort.Strings(keys)
var localkey string
for _, k := range keys {
localkey = dotKey(rootKey, k)

switch val := inp[k].(type) {
case map[string]interface{}:
if err := parse(val, out, localkey, h); err != nil {
return err
}
case []interface{}:
var strArr = make([]string, len(val))
for idx, entry := range val {
strArr[idx] = fmt.Sprintf("%+v", entry)
}
out[localkey] = strings.Join(strArr, ",")
h.Write([]byte(fmt.Sprintf("%s_%v", localkey, out[localkey])))
default:
out[localkey] = fmt.Sprintf("%+v", val)
h.Write([]byte(fmt.Sprintf("%s_%v", localkey, val)))
}
}
return nil
}

// Service holds supported attributes for querying configuration
type Service struct {
Name string `json:"name"`
Environment string `json:"environment,omitempty"`
func dotKey(k1, k2 string) string {
if k1 == "" {
return k2
}
return fmt.Sprintf("%s.%s", k1, k2)
}
Loading