Skip to content

Commit

Permalink
Configurable ES doc count (jaegertracing#2453)
Browse files Browse the repository at this point in the history
* Configurable limits

Signed-off-by: albertteoh <albert.teoh@logz.io>

* aggregationSize -> maxDocCount

Signed-off-by: albertteoh <albert.teoh@logz.io>

* Refactor create dependency reader

Signed-off-by: albertteoh <albert.teoh@logz.io>

* Stronger assertions on aggregation sizes

Signed-off-by: albertteoh <albert.teoh@logz.io>

* Add deprecation note

Signed-off-by: albertteoh <albert.teoh@logz.io>

* Remove stuttered full-stop

Signed-off-by: albertteoh <albert.teoh@logz.io>

* Better comments and readability

Signed-off-by: albertteoh <albert.teoh@logz.io>

* Comments

Signed-off-by: albertteoh <albert.teoh@logz.io>

* Revert spanstore import

Signed-off-by: albertteoh <albert.teoh@logz.io>

* One-liner create dep reader

Signed-off-by: albertteoh <albert.teoh@logz.io>

* MaxNumSpans -> MaxDocCount

Signed-off-by: albertteoh <albert.teoh@logz.io>

* OTEL: MaxNumSpans -> MaxDocCount

Signed-off-by: albertteoh <albert.teoh@logz.io>

* Fix integration test

Signed-off-by: albertteoh <albert.teoh@logz.io>

* TestMaxNumSpans -> TestMaxDocCount

Signed-off-by: albertteoh <albert.teoh@logz.io>

* Remove unused struct member in test

Signed-off-by: albertteoh <albert.teoh@logz.io>

* Address review comments

Signed-off-by: albertteoh <albert.teoh@logz.io>

Co-authored-by: albertteoh <alber.teoh@logz.io>
  • Loading branch information
albertteoh and albertteoh authored Sep 5, 2020
1 parent 4120220 commit 9dad32f
Show file tree
Hide file tree
Showing 18 changed files with 189 additions and 99 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@ Changes by Version
### Backend Changes

#### Breaking Changes
* Configurable ES doc count ([#2453](https://github.com/jaegertracing/jaeger/pull/2453), [@albertteoh](https://github.com/albertteoh))

The `--es.max-num-spans` flag has been deprecated in favour of `--es.max-doc-count`.
`--es.max-num-spans` is marked for removal in v1.21.0 as indicated in the flag description.

If both `--es.max-num-spans` and `--es.max-doc-count` are set, the lesser of the two will be used.

The use of `--es.max-doc-count` (which defaults to 10,000) will limit the results from all Elasticsearch
queries by the configured value, limiting counts for Jaeger UI:

* Services
* Operations
* Dependencies (edges in a dependency graph)
* Span fetch size for a trace

#### New Features

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ import (
)

const (
host = "0.0.0.0"
esPort = "9200"
esHostPort = host + ":" + esPort
esURL = "http://" + esHostPort
indexPrefix = "integration-test"
tagKeyDeDotChar = "@"
maxSpanAge = time.Hour * 72
numShards = 5
numReplicas = 0
host = "0.0.0.0"
esPort = "9200"
esHostPort = host + ":" + esPort
esURL = "http://" + esHostPort
indexPrefix = "integration-test"
tagKeyDeDotChar = "@"
maxSpanAge = time.Hour * 72
numShards = 5
numReplicas = 0
defaultMaxDocCount = 10_000
)

type IntegrationTest struct {
Expand Down Expand Up @@ -119,12 +120,12 @@ func (s *IntegrationTest) initSpanstore(allTagsAsFields bool) error {
IndexPrefix: indexPrefix,
TagDotReplacement: tagKeyDeDotChar,
MaxSpanAge: maxSpanAge,
MaxNumSpans: 10_000,
MaxDocCount: defaultMaxDocCount,
})
s.SpanReader = reader

depMapping := es.GetDependenciesMappings(numShards, numReplicas, esVersion)
depStore := esdependencyreader.NewDependencyStore(elasticsearchClient, s.logger, indexPrefix)
depStore := esdependencyreader.NewDependencyStore(elasticsearchClient, s.logger, indexPrefix, defaultMaxDocCount)
if err := depStore.CreateTemplates(depMapping); err != nil {
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *StorageFactory) CreateSpanReader() (spanstore.Reader, error) {
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
IndexPrefix: cfg.GetIndexPrefix(),
MaxSpanAge: cfg.GetMaxSpanAge(),
MaxNumSpans: cfg.GetMaxNumSpans(),
MaxDocCount: cfg.GetMaxDocCount(),
TagDotReplacement: cfg.GetTagDotReplacement(),
}), nil
}
Expand All @@ -100,7 +100,7 @@ func (s *StorageFactory) CreateDependencyReader() (dependencystore.Reader, error
if err != nil {
return nil, err
}
return esdependencyreader.NewDependencyStore(client, s.logger, cfg.GetIndexPrefix()), nil
return esdependencyreader.NewDependencyStore(client, s.logger, cfg.GetIndexPrefix(), cfg.GetMaxDocCount()), nil
}

// CreateArchiveSpanReader creates archive spanstore.Reader
Expand All @@ -115,7 +115,7 @@ func (s *StorageFactory) CreateArchiveSpanReader() (spanstore.Reader, error) {
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
IndexPrefix: cfg.GetIndexPrefix(),
MaxSpanAge: cfg.GetMaxSpanAge(),
MaxNumSpans: cfg.GetMaxNumSpans(),
MaxDocCount: cfg.GetMaxDocCount(),
TagDotReplacement: cfg.GetTagDotReplacement(),
}), nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ const (

timestampField = "timestamp"

// default number of documents to fetch in a query
// see search.max_buckets and index.max_result_window
defaultDocCount = 10_000
indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20
)

Expand All @@ -47,20 +44,22 @@ type DependencyStore struct {
client esclient.ElasticsearchClient
logger *zap.Logger
indexPrefix string
maxDocCount int
}

var _ dependencystore.Reader = (*DependencyStore)(nil)
var _ dependencystore.Writer = (*DependencyStore)(nil)

// NewDependencyStore creates dependency store.
func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, indexPrefix string) *DependencyStore {
func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, indexPrefix string, maxDocCount int) *DependencyStore {
if indexPrefix != "" {
indexPrefix += "-"
}
return &DependencyStore{
client: client,
logger: logger,
indexPrefix: indexPrefix + dependencyIndexBaseName + "-",
maxDocCount: maxDocCount,
}
}

Expand All @@ -84,10 +83,10 @@ func (r *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D

// GetDependencies implements dependencystore.Reader
func (r *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
searchBody := getSearchBody(endTs, lookback)
searchBody := getSearchBody(endTs, lookback, r.maxDocCount)

indices := dailyIndices(r.indexPrefix, endTs, lookback)
response, err := r.client.Search(ctx, searchBody, defaultDocCount, indices...)
response, err := r.client.Search(ctx, searchBody, r.maxDocCount, indices...)
if err != nil {
return nil, err
}
Expand All @@ -106,12 +105,12 @@ func (r *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time,
return dbmodel.ToDomainDependencies(dependencies), nil
}

func getSearchBody(endTs time.Time, lookback time.Duration) esclient.SearchBody {
func getSearchBody(endTs time.Time, lookback time.Duration, maxDocCount int) esclient.SearchBody {
return esclient.SearchBody{
Query: &esclient.Query{
RangeQueries: map[string]esclient.RangeQuery{timestampField: {GTE: endTs.Add(-lookback), LTE: endTs}},
},
Size: defaultDocCount,
Size: maxDocCount,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore/dbmodel"
)

const defaultMaxDocCount = 10_000

func TestCreateTemplates(t *testing.T) {
client := &mockClient{}
store := NewDependencyStore(client, zap.NewNop(), "foo")
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
template := "template"
err := store.CreateTemplates(template)
require.NoError(t, err)
Expand All @@ -46,7 +48,7 @@ func TestCreateTemplates(t *testing.T) {

func TestWriteDependencies(t *testing.T) {
client := &mockClient{}
store := NewDependencyStore(client, zap.NewNop(), "foo")
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
dependencies := []model.DependencyLink{{Parent: "foo", Child: "bar", CallCount: 1}}
tsNow := time.Now()
err := store.WriteDependencies(tsNow, dependencies)
Expand Down Expand Up @@ -85,7 +87,7 @@ func TestGetDependencies(t *testing.T) {
},
},
}
store := NewDependencyStore(client, zap.NewNop(), "foo")
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.NoError(t, err)
assert.Equal(t, timeDependencies, dbmodel.TimeDependencies{
Expand All @@ -107,7 +109,7 @@ func TestGetDependencies_err_unmarshall(t *testing.T) {
},
},
}
store := NewDependencyStore(client, zap.NewNop(), "foo")
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.Contains(t, err.Error(), "invalid character")
assert.Nil(t, dependencies)
Expand All @@ -118,7 +120,7 @@ func TestGetDependencies_err_client(t *testing.T) {
client := &mockClient{
searchErr: searchErr,
}
store := NewDependencyStore(client, zap.NewNop(), "foo")
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
tsNow := time.Now()
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.Error(t, err)
Expand All @@ -141,7 +143,7 @@ const query = `{

func TestSearchBody(t *testing.T) {
date := time.Date(2020, 8, 30, 15, 0, 0, 0, time.UTC)
sb := getSearchBody(date, time.Hour)
sb := getSearchBody(date, time.Hour, defaultMaxDocCount)
jsonQuery, err := json.MarshalIndent(sb, "", " ")
require.NoError(t, err)
assert.Equal(t, query, string(jsonQuery))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,15 @@ func findTraceIDsSearchBody(converter dbmodel.ToDomain, query *spanstore.TraceQu
}
}

func getServicesSearchBody() esclient.SearchBody {
aggs := fmt.Sprintf(getServicesAggregation, defaultDocCount)
func getServicesSearchBody(maxDocCount int) esclient.SearchBody {
aggs := fmt.Sprintf(getServicesAggregation, maxDocCount)
return esclient.SearchBody{
Aggregations: json.RawMessage(aggs),
}
}

func getOperationsSearchBody(serviceName string) esclient.SearchBody {
aggs := fmt.Sprintf(getOperationsAggregation, defaultDocCount)
func getOperationsSearchBody(serviceName string, maxDocCount int) esclient.SearchBody {
aggs := fmt.Sprintf(getOperationsAggregation, maxDocCount)
return esclient.SearchBody{
Aggregations: json.RawMessage(aggs),
Query: &esclient.Query{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
)

const (
defaultMaxDocCount = 10_000
servicesSearchBodyFixture = `{
"aggs": {
"serviceName": {
Expand Down Expand Up @@ -231,14 +232,14 @@ const (
)

func TestGetServicesSearchBody(t *testing.T) {
sb := getServicesSearchBody()
sb := getServicesSearchBody(defaultMaxDocCount)
jsonQuery, err := json.MarshalIndent(sb, "", " ")
require.NoError(t, err)
assert.Equal(t, servicesSearchBodyFixture, string(jsonQuery))
}

func TestGetOperationsSearchBody(t *testing.T) {
sb := getOperationsSearchBody("foo")
sb := getOperationsSearchBody("foo", defaultMaxDocCount)
jsonQuery, err := json.MarshalIndent(sb, "", " ")
require.NoError(t, err)
assert.Equal(t, operationsSearchBodyFixture, string(jsonQuery))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ import (
const (
// by default UI fetches 20 results
defaultNumTraces = 20
// default number of documents to fetch in a query
// see search.max_buckets and index.max_result_window
defaultDocCount = 10_000

spanIndexBaseName = "jaeger-span"
serviceIndexBaseName = "jaeger-service"
Expand Down Expand Up @@ -62,8 +59,7 @@ type Reader struct {
serviceIndexName indexNameProvider
spanIndexName indexNameProvider
maxSpanAge time.Duration
// maximum number of spans to fetch per query in multi search
maxNumberOfSpans int
maxDocCount int
archive bool
}

Expand All @@ -75,7 +71,7 @@ type Config struct {
UseReadWriteAliases bool
IndexPrefix string
MaxSpanAge time.Duration
MaxNumSpans int
MaxDocCount int
TagDotReplacement string
}

Expand All @@ -86,7 +82,7 @@ func NewEsSpanReader(client esclient.ElasticsearchClient, logger *zap.Logger, co
logger: logger,
archive: config.Archive,
maxSpanAge: config.MaxSpanAge,
maxNumberOfSpans: config.MaxNumSpans,
maxDocCount: config.MaxDocCount,
converter: dbmodel.NewToDomain(config.TagDotReplacement),
spanIndexName: newIndexNameProvider(spanIndexBaseName, config.IndexPrefix, config.UseReadWriteAliases, config.Archive),
serviceIndexName: newIndexNameProvider(serviceIndexBaseName, config.IndexPrefix, config.UseReadWriteAliases, config.Archive),
Expand Down Expand Up @@ -162,7 +158,7 @@ func (r *Reader) findTraceIDs(ctx context.Context, query *spanstore.TraceQueryPa

// GetServices implements spanstore.Reader
func (r *Reader) GetServices(ctx context.Context) ([]string, error) {
searchBody := getServicesSearchBody()
searchBody := getServicesSearchBody(r.maxDocCount)
currentTime := time.Now()
indices := r.serviceIndexName.get(currentTime.Add(-r.maxSpanAge), currentTime)
response, err := r.client.Search(ctx, searchBody, 0, indices...)
Expand All @@ -182,7 +178,7 @@ func (r *Reader) GetServices(ctx context.Context) ([]string, error) {

// GetOperations implements spanstore.Reader
func (r *Reader) GetOperations(ctx context.Context, query spanstore.OperationQueryParameters) ([]spanstore.Operation, error) {
searchBody := getOperationsSearchBody(query.ServiceName)
searchBody := getOperationsSearchBody(query.ServiceName, r.maxDocCount)
currentTime := time.Now()
indices := r.serviceIndexName.get(currentTime.Add(-r.maxSpanAge), currentTime)
response, err := r.client.Search(ctx, searchBody, 0, indices...)
Expand Down Expand Up @@ -290,8 +286,8 @@ func (r *Reader) multiSearchRequests(indices []string, traceIDs []model.TraceID,
s := esclient.SearchBody{
Indices: indices,
Query: traceIDQuery(traceID),
Size: defaultDocCount,
TerminateAfter: r.maxNumberOfSpans,
Size: r.maxDocCount,
TerminateAfter: r.maxDocCount,
}
if !r.archive {
s.SearchAfter = []interface{}{nextTime}
Expand Down
16 changes: 8 additions & 8 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Configuration struct {
AllowTokenFromContext bool `mapstructure:"-"`
Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing
SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"`
MaxNumSpans int `mapstructure:"-"` // defines maximum number of spans to fetch from storage per query
MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards" mapstructure:"num_shards"`
NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"`
Expand Down Expand Up @@ -87,7 +87,7 @@ type ClientBuilder interface {
GetNumShards() int64
GetNumReplicas() int64
GetMaxSpanAge() time.Duration
GetMaxNumSpans() int
GetMaxDocCount() int
GetIndexPrefix() string
GetTagsFilePath() string
GetAllTagsAsFields() bool
Expand Down Expand Up @@ -197,9 +197,6 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.MaxSpanAge == 0 {
c.MaxSpanAge = source.MaxSpanAge
}
if c.MaxNumSpans == 0 {
c.MaxNumSpans = source.MaxNumSpans
}
if c.NumShards == 0 {
c.NumShards = source.NumShards
}
Expand Down Expand Up @@ -233,6 +230,9 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.Tags.File == "" {
c.Tags.File = source.Tags.File
}
if c.MaxDocCount == 0 {
c.MaxDocCount = source.MaxDocCount
}
}

// GetNumShards returns number of shards from Configuration
Expand All @@ -250,9 +250,9 @@ func (c *Configuration) GetMaxSpanAge() time.Duration {
return c.MaxSpanAge
}

// GetMaxNumSpans returns max spans allowed per query from Configuration
func (c *Configuration) GetMaxNumSpans() int {
return c.MaxNumSpans
// GetMaxDocCount returns the maximum number of documents that a query should return
func (c *Configuration) GetMaxDocCount() int {
return c.MaxDocCount
}

// GetIndexPrefix returns index prefix
Expand Down
Loading

0 comments on commit 9dad32f

Please sign in to comment.