diff --git a/CHANGELOG.md b/CHANGELOG.md index d85e1a64dc6..e7ffde30491 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,20 @@ Changes by Version ### Backend Changes #### Breaking Changes +* Configurable ES doc count ([#2453](https://github.com/jaegertracing/jaeger/pull/2453), [@albertteoh](https://github.com/albertteoh)) + + The `--es.max-num-spans` flag has been deprecated in favour of `--es.max-doc-count`. + `--es.max-num-spans` is marked for removal in v1.21.0 as indicated in the flag description. + + If both `--es.max-num-spans` and `--es.max-doc-count` are set, the lesser of the two will be used. + + The use of `--es.max-doc-count` (which defaults to 10,000) will limit the results from all Elasticsearch + queries by the configured value, limiting counts for Jaeger UI: + + * Services + * Operations + * Dependencies (edges in a dependency graph) + * Span fetch size for a trace #### New Features diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go index 5819806152b..18274db2222 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go @@ -40,15 +40,16 @@ import ( ) const ( - host = "0.0.0.0" - esPort = "9200" - esHostPort = host + ":" + esPort - esURL = "http://" + esHostPort - indexPrefix = "integration-test" - tagKeyDeDotChar = "@" - maxSpanAge = time.Hour * 72 - numShards = 5 - numReplicas = 0 + host = "0.0.0.0" + esPort = "9200" + esHostPort = host + ":" + esPort + esURL = "http://" + esHostPort + indexPrefix = "integration-test" + tagKeyDeDotChar = "@" + maxSpanAge = time.Hour * 72 + numShards = 5 + numReplicas = 0 + defaultMaxDocCount = 10_000 ) type IntegrationTest struct { @@ -119,12 +120,12 @@ func (s *IntegrationTest) initSpanstore(allTagsAsFields bool) error { IndexPrefix: indexPrefix, TagDotReplacement: tagKeyDeDotChar, MaxSpanAge: maxSpanAge, - MaxNumSpans: 10_000, + MaxDocCount: defaultMaxDocCount, }) s.SpanReader = reader depMapping := es.GetDependenciesMappings(numShards, numReplicas, esVersion) - depStore := esdependencyreader.NewDependencyStore(elasticsearchClient, s.logger, indexPrefix) + depStore := esdependencyreader.NewDependencyStore(elasticsearchClient, s.logger, indexPrefix, defaultMaxDocCount) if err := depStore.CreateTemplates(depMapping); err != nil { return nil } diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go index b821c81270d..c631314e70f 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go @@ -88,7 +88,7 @@ func (s *StorageFactory) CreateSpanReader() (spanstore.Reader, error) { UseReadWriteAliases: cfg.GetUseReadWriteAliases(), IndexPrefix: cfg.GetIndexPrefix(), MaxSpanAge: cfg.GetMaxSpanAge(), - MaxNumSpans: cfg.GetMaxNumSpans(), + MaxDocCount: cfg.GetMaxDocCount(), TagDotReplacement: cfg.GetTagDotReplacement(), }), nil } @@ -100,7 +100,7 @@ func (s *StorageFactory) CreateDependencyReader() (dependencystore.Reader, error if err != nil { return nil, err } - return esdependencyreader.NewDependencyStore(client, s.logger, cfg.GetIndexPrefix()), nil + return esdependencyreader.NewDependencyStore(client, s.logger, cfg.GetIndexPrefix(), cfg.GetMaxDocCount()), nil } // CreateArchiveSpanReader creates archive spanstore.Reader @@ -115,7 +115,7 @@ func (s *StorageFactory) CreateArchiveSpanReader() (spanstore.Reader, error) { UseReadWriteAliases: cfg.GetUseReadWriteAliases(), IndexPrefix: cfg.GetIndexPrefix(), MaxSpanAge: cfg.GetMaxSpanAge(), - MaxNumSpans: cfg.GetMaxNumSpans(), + MaxDocCount: cfg.GetMaxDocCount(), TagDotReplacement: cfg.GetTagDotReplacement(), }), nil } diff --git a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go index 40ff973953e..174a54b2780 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go +++ b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go @@ -36,9 +36,6 @@ const ( timestampField = "timestamp" - // default number of documents to fetch in a query - // see search.max_buckets and index.max_result_window - defaultDocCount = 10_000 indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20 ) @@ -47,13 +44,14 @@ type DependencyStore struct { client esclient.ElasticsearchClient logger *zap.Logger indexPrefix string + maxDocCount int } var _ dependencystore.Reader = (*DependencyStore)(nil) var _ dependencystore.Writer = (*DependencyStore)(nil) // NewDependencyStore creates dependency store. -func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, indexPrefix string) *DependencyStore { +func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, indexPrefix string, maxDocCount int) *DependencyStore { if indexPrefix != "" { indexPrefix += "-" } @@ -61,6 +59,7 @@ func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, client: client, logger: logger, indexPrefix: indexPrefix + dependencyIndexBaseName + "-", + maxDocCount: maxDocCount, } } @@ -84,10 +83,10 @@ func (r *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D // GetDependencies implements dependencystore.Reader func (r *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { - searchBody := getSearchBody(endTs, lookback) + searchBody := getSearchBody(endTs, lookback, r.maxDocCount) indices := dailyIndices(r.indexPrefix, endTs, lookback) - response, err := r.client.Search(ctx, searchBody, defaultDocCount, indices...) + response, err := r.client.Search(ctx, searchBody, r.maxDocCount, indices...) if err != nil { return nil, err } @@ -106,12 +105,12 @@ func (r *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, return dbmodel.ToDomainDependencies(dependencies), nil } -func getSearchBody(endTs time.Time, lookback time.Duration) esclient.SearchBody { +func getSearchBody(endTs time.Time, lookback time.Duration, maxDocCount int) esclient.SearchBody { return esclient.SearchBody{ Query: &esclient.Query{ RangeQueries: map[string]esclient.RangeQuery{timestampField: {GTE: endTs.Add(-lookback), LTE: endTs}}, }, - Size: defaultDocCount, + Size: maxDocCount, } } diff --git a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go index c7d05a342ed..ac19e02b5c0 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go +++ b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go @@ -33,9 +33,11 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore/dbmodel" ) +const defaultMaxDocCount = 10_000 + func TestCreateTemplates(t *testing.T) { client := &mockClient{} - store := NewDependencyStore(client, zap.NewNop(), "foo") + store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) template := "template" err := store.CreateTemplates(template) require.NoError(t, err) @@ -46,7 +48,7 @@ func TestCreateTemplates(t *testing.T) { func TestWriteDependencies(t *testing.T) { client := &mockClient{} - store := NewDependencyStore(client, zap.NewNop(), "foo") + store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) dependencies := []model.DependencyLink{{Parent: "foo", Child: "bar", CallCount: 1}} tsNow := time.Now() err := store.WriteDependencies(tsNow, dependencies) @@ -85,7 +87,7 @@ func TestGetDependencies(t *testing.T) { }, }, } - store := NewDependencyStore(client, zap.NewNop(), "foo") + store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour) require.NoError(t, err) assert.Equal(t, timeDependencies, dbmodel.TimeDependencies{ @@ -107,7 +109,7 @@ func TestGetDependencies_err_unmarshall(t *testing.T) { }, }, } - store := NewDependencyStore(client, zap.NewNop(), "foo") + store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour) require.Contains(t, err.Error(), "invalid character") assert.Nil(t, dependencies) @@ -118,7 +120,7 @@ func TestGetDependencies_err_client(t *testing.T) { client := &mockClient{ searchErr: searchErr, } - store := NewDependencyStore(client, zap.NewNop(), "foo") + store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) tsNow := time.Now() dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour) require.Error(t, err) @@ -141,7 +143,7 @@ const query = `{ func TestSearchBody(t *testing.T) { date := time.Date(2020, 8, 30, 15, 0, 0, 0, time.UTC) - sb := getSearchBody(date, time.Hour) + sb := getSearchBody(date, time.Hour, defaultMaxDocCount) jsonQuery, err := json.MarshalIndent(sb, "", " ") require.NoError(t, err) assert.Equal(t, query, string(jsonQuery)) diff --git a/cmd/opentelemetry/app/internal/reader/es/esspanreader/query.go b/cmd/opentelemetry/app/internal/reader/es/esspanreader/query.go index fd94be96683..f015320beb9 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esspanreader/query.go +++ b/cmd/opentelemetry/app/internal/reader/es/esspanreader/query.go @@ -187,15 +187,15 @@ func findTraceIDsSearchBody(converter dbmodel.ToDomain, query *spanstore.TraceQu } } -func getServicesSearchBody() esclient.SearchBody { - aggs := fmt.Sprintf(getServicesAggregation, defaultDocCount) +func getServicesSearchBody(maxDocCount int) esclient.SearchBody { + aggs := fmt.Sprintf(getServicesAggregation, maxDocCount) return esclient.SearchBody{ Aggregations: json.RawMessage(aggs), } } -func getOperationsSearchBody(serviceName string) esclient.SearchBody { - aggs := fmt.Sprintf(getOperationsAggregation, defaultDocCount) +func getOperationsSearchBody(serviceName string, maxDocCount int) esclient.SearchBody { + aggs := fmt.Sprintf(getOperationsAggregation, maxDocCount) return esclient.SearchBody{ Aggregations: json.RawMessage(aggs), Query: &esclient.Query{ diff --git a/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go b/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go index b127113af2b..33ea5b45994 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go +++ b/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go @@ -28,6 +28,7 @@ import ( ) const ( + defaultMaxDocCount = 10_000 servicesSearchBodyFixture = `{ "aggs": { "serviceName": { @@ -231,14 +232,14 @@ const ( ) func TestGetServicesSearchBody(t *testing.T) { - sb := getServicesSearchBody() + sb := getServicesSearchBody(defaultMaxDocCount) jsonQuery, err := json.MarshalIndent(sb, "", " ") require.NoError(t, err) assert.Equal(t, servicesSearchBodyFixture, string(jsonQuery)) } func TestGetOperationsSearchBody(t *testing.T) { - sb := getOperationsSearchBody("foo") + sb := getOperationsSearchBody("foo", defaultMaxDocCount) jsonQuery, err := json.MarshalIndent(sb, "", " ") require.NoError(t, err) assert.Equal(t, operationsSearchBodyFixture, string(jsonQuery)) diff --git a/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go b/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go index d10aa60fb3b..42587ab26a8 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go +++ b/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go @@ -32,9 +32,6 @@ import ( const ( // by default UI fetches 20 results defaultNumTraces = 20 - // default number of documents to fetch in a query - // see search.max_buckets and index.max_result_window - defaultDocCount = 10_000 spanIndexBaseName = "jaeger-span" serviceIndexBaseName = "jaeger-service" @@ -62,8 +59,7 @@ type Reader struct { serviceIndexName indexNameProvider spanIndexName indexNameProvider maxSpanAge time.Duration - // maximum number of spans to fetch per query in multi search - maxNumberOfSpans int + maxDocCount int archive bool } @@ -75,7 +71,7 @@ type Config struct { UseReadWriteAliases bool IndexPrefix string MaxSpanAge time.Duration - MaxNumSpans int + MaxDocCount int TagDotReplacement string } @@ -86,7 +82,7 @@ func NewEsSpanReader(client esclient.ElasticsearchClient, logger *zap.Logger, co logger: logger, archive: config.Archive, maxSpanAge: config.MaxSpanAge, - maxNumberOfSpans: config.MaxNumSpans, + maxDocCount: config.MaxDocCount, converter: dbmodel.NewToDomain(config.TagDotReplacement), spanIndexName: newIndexNameProvider(spanIndexBaseName, config.IndexPrefix, config.UseReadWriteAliases, config.Archive), serviceIndexName: newIndexNameProvider(serviceIndexBaseName, config.IndexPrefix, config.UseReadWriteAliases, config.Archive), @@ -162,7 +158,7 @@ func (r *Reader) findTraceIDs(ctx context.Context, query *spanstore.TraceQueryPa // GetServices implements spanstore.Reader func (r *Reader) GetServices(ctx context.Context) ([]string, error) { - searchBody := getServicesSearchBody() + searchBody := getServicesSearchBody(r.maxDocCount) currentTime := time.Now() indices := r.serviceIndexName.get(currentTime.Add(-r.maxSpanAge), currentTime) response, err := r.client.Search(ctx, searchBody, 0, indices...) @@ -182,7 +178,7 @@ func (r *Reader) GetServices(ctx context.Context) ([]string, error) { // GetOperations implements spanstore.Reader func (r *Reader) GetOperations(ctx context.Context, query spanstore.OperationQueryParameters) ([]spanstore.Operation, error) { - searchBody := getOperationsSearchBody(query.ServiceName) + searchBody := getOperationsSearchBody(query.ServiceName, r.maxDocCount) currentTime := time.Now() indices := r.serviceIndexName.get(currentTime.Add(-r.maxSpanAge), currentTime) response, err := r.client.Search(ctx, searchBody, 0, indices...) @@ -290,8 +286,8 @@ func (r *Reader) multiSearchRequests(indices []string, traceIDs []model.TraceID, s := esclient.SearchBody{ Indices: indices, Query: traceIDQuery(traceID), - Size: defaultDocCount, - TerminateAfter: r.maxNumberOfSpans, + Size: r.maxDocCount, + TerminateAfter: r.maxDocCount, } if !r.archive { s.SearchAfter = []interface{}{nextTime} diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 0a0562fffd4..861f7033356 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -49,7 +49,7 @@ type Configuration struct { AllowTokenFromContext bool `mapstructure:"-"` Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"` - MaxNumSpans int `mapstructure:"-"` // defines maximum number of spans to fetch from storage per query + MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads NumShards int64 `yaml:"shards" mapstructure:"num_shards"` NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"` @@ -87,7 +87,7 @@ type ClientBuilder interface { GetNumShards() int64 GetNumReplicas() int64 GetMaxSpanAge() time.Duration - GetMaxNumSpans() int + GetMaxDocCount() int GetIndexPrefix() string GetTagsFilePath() string GetAllTagsAsFields() bool @@ -197,9 +197,6 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { if c.MaxSpanAge == 0 { c.MaxSpanAge = source.MaxSpanAge } - if c.MaxNumSpans == 0 { - c.MaxNumSpans = source.MaxNumSpans - } if c.NumShards == 0 { c.NumShards = source.NumShards } @@ -233,6 +230,9 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { if c.Tags.File == "" { c.Tags.File = source.Tags.File } + if c.MaxDocCount == 0 { + c.MaxDocCount = source.MaxDocCount + } } // GetNumShards returns number of shards from Configuration @@ -250,9 +250,9 @@ func (c *Configuration) GetMaxSpanAge() time.Duration { return c.MaxSpanAge } -// GetMaxNumSpans returns max spans allowed per query from Configuration -func (c *Configuration) GetMaxNumSpans() int { - return c.MaxNumSpans +// GetMaxDocCount returns the maximum number of documents that a query should return +func (c *Configuration) GetMaxDocCount() int { + return c.MaxDocCount } // GetIndexPrefix returns index prefix diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index fc983bbf109..46bfd6b5d30 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -40,10 +40,11 @@ type DependencyStore struct { client es.Client logger *zap.Logger indexPrefix string + maxDocCount int } // NewDependencyStore returns a DependencyStore -func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string) *DependencyStore { +func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string, maxDocCount int) *DependencyStore { var prefix string if indexPrefix != "" { prefix = indexPrefix + "-" @@ -52,6 +53,7 @@ func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string client: client, logger: logger, indexPrefix: prefix + dependencyIndex, + maxDocCount: maxDocCount, } } @@ -82,7 +84,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { indices := getIndices(s.indexPrefix, endTs, lookback) searchResult, err := s.client.Search(indices...). - Size(10000). // the default elasticsearch allowed limit + Size(s.maxDocCount). Query(buildTSQuery(endTs, lookback)). IgnoreUnavailable(true). Do(ctx) diff --git a/plugin/storage/es/dependencystore/storage_test.go b/plugin/storage/es/dependencystore/storage_test.go index 2b6f460fad4..a4558f80be0 100644 --- a/plugin/storage/es/dependencystore/storage_test.go +++ b/plugin/storage/es/dependencystore/storage_test.go @@ -34,6 +34,8 @@ import ( "github.com/jaegertracing/jaeger/storage/dependencystore" ) +const defaultMaxDocCount = 10_000 + type depStorageTest struct { client *mocks.Client logger *zap.Logger @@ -41,14 +43,14 @@ type depStorageTest struct { storage *DependencyStore } -func withDepStorage(indexPrefix string, fn func(r *depStorageTest)) { +func withDepStorage(indexPrefix string, maxDocCount int, fn func(r *depStorageTest)) { client := &mocks.Client{} logger, logBuffer := testutils.NewLogger() r := &depStorageTest{ client: client, logger: logger, logBuffer: logBuffer, - storage: NewDependencyStore(client, logger, indexPrefix), + storage: NewDependencyStore(client, logger, indexPrefix, maxDocCount), } fn(r) } @@ -67,7 +69,7 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) { } for _, testCase := range testCases { client := &mocks.Client{} - r := NewDependencyStore(client, zap.NewNop(), testCase.prefix) + r := NewDependencyStore(client, zap.NewNop(), testCase.prefix, defaultMaxDocCount) assert.Equal(t, testCase.expected+dependencyIndex, r.indexPrefix) } } @@ -88,7 +90,7 @@ func TestWriteDependencies(t *testing.T) { }, } for _, testCase := range testCases { - withDepStorage("", func(r *depStorageTest) { + withDepStorage("", defaultMaxDocCount, func(r *depStorageTest) { fixedTime := time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC) indexName := indexWithDate("", fixedTime) writeService := &mocks.IndexService{} @@ -130,6 +132,7 @@ func TestGetDependencies(t *testing.T) { expectedError string expectedOutput []model.DependencyLink indexPrefix string + maxDocCount int indices []interface{} }{ { @@ -141,7 +144,8 @@ func TestGetDependencies(t *testing.T) { CallCount: 12, }, }, - indices: []interface{}{"jaeger-dependencies-1995-04-21", "jaeger-dependencies-1995-04-20"}, + indices: []interface{}{"jaeger-dependencies-1995-04-21", "jaeger-dependencies-1995-04-20"}, + maxDocCount: 1000, // can be anything, assertion will check this value is used in search query. }, { searchResult: createSearchResult(badDependencies), @@ -161,13 +165,15 @@ func TestGetDependencies(t *testing.T) { }, } for _, testCase := range testCases { - withDepStorage(testCase.indexPrefix, func(r *depStorageTest) { + withDepStorage(testCase.indexPrefix, testCase.maxDocCount, func(r *depStorageTest) { fixedTime := time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC) searchService := &mocks.SearchService{} r.client.On("Search", testCase.indices...).Return(searchService) - searchService.On("Size", mock.Anything).Return(searchService) + searchService.On("Size", mock.MatchedBy(func(size int) bool { + return size == testCase.maxDocCount + })).Return(searchService) searchService.On("Query", mock.Anything).Return(searchService) searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService) searchService.On("Do", mock.Anything).Return(testCase.searchResult, testCase.searchError) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 53522305386..b41724db7b0 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -111,7 +111,7 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - reader := esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix()) + reader := esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix(), f.primaryConfig.GetMaxDocCount()) return reader, nil } @@ -142,7 +142,7 @@ func createSpanReader( Client: client, Logger: logger, MetricsFactory: mFactory, - MaxNumSpans: cfg.GetMaxNumSpans(), + MaxDocCount: cfg.GetMaxDocCount(), MaxSpanAge: cfg.GetMaxSpanAge(), IndexPrefix: cfg.GetIndexPrefix(), TagDotReplacement: cfg.GetTagDotReplacement(), diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 7b751a93341..06da56a3d8f 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -17,6 +17,7 @@ package es import ( "flag" + "math" "strings" "time" @@ -35,7 +36,7 @@ const ( suffixTokenPath = ".token-file" suffixServerURLs = ".server-urls" suffixMaxSpanAge = ".max-span-age" - suffixMaxNumSpans = ".max-num-spans" + suffixMaxNumSpans = ".max-num-spans" // deprecated suffixNumShards = ".num-shards" suffixNumReplicas = ".num-replicas" suffixBulkSize = ".bulk.size" @@ -53,8 +54,12 @@ const ( suffixCreateIndexTemplate = ".create-index-templates" suffixEnabled = ".enabled" suffixVersion = ".version" + suffixMaxDocCount = ".max-doc-count" - defaultServerURL = "http://127.0.0.1:9200" + // default number of documents to return from a query (elasticsearch allowed limit) + // see search.max_buckets and index.max_result_window + defaultMaxDocCount = 10_000 + defaultServerURL = "http://127.0.0.1:9200" ) // TODO this should be moved next to config.Configuration struct (maybe ./flags package) @@ -83,7 +88,6 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { Password: "", Sniffer: false, MaxSpanAge: 72 * time.Hour, - MaxNumSpans: 10000, NumShards: 5, NumReplicas: 1, BulkSize: 5 * 1000 * 1000, @@ -97,6 +101,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { CreateIndexTemplates: true, Version: 0, Servers: []string{defaultServerURL}, + MaxDocCount: defaultMaxDocCount, }, namespace: primaryNamespace, }, @@ -173,8 +178,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { "The maximum lookback for spans in Elasticsearch") flagSet.Int( nsConfig.namespace+suffixMaxNumSpans, - nsConfig.MaxNumSpans, - "The maximum number of spans to fetch at a time per query in Elasticsearch") + nsConfig.MaxDocCount, + "(deprecated, will be removed in release v1.21.0. Please use es.max-doc-count). "+ + "The maximum number of spans to fetch at a time per query in Elasticsearch. "+ + "The lesser of es.max-num-spans and es.max-doc-count will be used if both are set.") flagSet.Int64( nsConfig.namespace+suffixNumShards, nsConfig.NumShards, @@ -237,6 +244,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixSnifferTLSEnabled, nsConfig.SnifferTLSEnabled, "Option to enable TLS when sniffing an Elasticsearch Cluster ; client uses sniffing process to find all nodes automatically, disabled by default") + flagSet.Int( + nsConfig.namespace+suffixMaxDocCount, + nsConfig.MaxDocCount, + "The maximum document count to return from an Elasticsearch query. This will also apply to aggregations.") if nsConfig.namespace == archiveNamespace { flagSet.Bool( nsConfig.namespace+suffixEnabled, @@ -262,7 +273,6 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.SnifferTLSEnabled = v.GetBool(cfg.namespace + suffixSnifferTLSEnabled) cfg.Servers = strings.Split(stripWhiteSpace(v.GetString(cfg.namespace+suffixServerURLs)), ",") cfg.MaxSpanAge = v.GetDuration(cfg.namespace + suffixMaxSpanAge) - cfg.MaxNumSpans = v.GetInt(cfg.namespace + suffixMaxNumSpans) cfg.NumShards = v.GetInt64(cfg.namespace + suffixNumShards) cfg.NumReplicas = v.GetInt64(cfg.namespace + suffixNumReplicas) cfg.BulkSize = v.GetInt(cfg.namespace + suffixBulkSize) @@ -279,6 +289,14 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled) cfg.CreateIndexTemplates = v.GetBool(cfg.namespace + suffixCreateIndexTemplate) cfg.Version = uint(v.GetInt(cfg.namespace + suffixVersion)) + + cfg.MaxDocCount = v.GetInt(cfg.namespace + suffixMaxDocCount) + + if v.IsSet(cfg.namespace + suffixMaxNumSpans) { + maxNumSpans := v.GetInt(cfg.namespace + suffixMaxNumSpans) + cfg.MaxDocCount = int(math.Min(float64(maxNumSpans), float64(cfg.MaxDocCount))) + } + // TODO: Need to figure out a better way for do this. cfg.AllowTokenFromContext = v.GetBool(spanstore.StoragePropagationKey) cfg.TLS = cfg.getTLSFlagsConfig().InitFromViper(v) diff --git a/plugin/storage/es/options_test.go b/plugin/storage/es/options_test.go index 0e2daf5ecde..65ac35e405a 100644 --- a/plugin/storage/es/options_test.go +++ b/plugin/storage/es/options_test.go @@ -95,3 +95,29 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "./file.txt", aux.Tags.File) assert.Equal(t, "test,tags", aux.Tags.Include) } + +func TestMaxDocCount(t *testing.T) { + testCases := []struct { + name string + flags []string + wantMaxDocCount int + }{ + {"neither defined", []string{}, 10_000}, + {"max-num-spans only", []string{"--es.max-num-spans=1000"}, 1000}, + {"max-doc-count only", []string{"--es.max-doc-count=1000"}, 1000}, + {"max-num-spans == max-doc-count", []string{"--es.max-num-spans=1000", "--es.max-doc-count=1000"}, 1000}, + {"max-num-spans < max-doc-count", []string{"--es.max-num-spans=999", "--es.max-doc-count=1000"}, 999}, + {"max-num-spans > max-doc-count", []string{"--es.max-num-spans=1000", "--es.max-doc-count=999"}, 999}, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + opts := NewOptions("es", "es.aux") + v, command := config.Viperize(opts.AddFlags) + command.ParseFlags(tc.flags) + opts.InitFromViper(v) + + primary := opts.GetPrimary() + assert.Equal(t, tc.wantMaxDocCount, primary.MaxDocCount) + }) + } +} diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 038bbe5130b..5c7176d514e 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -58,7 +58,6 @@ const ( tagKeyField = "key" tagValueField = "value" - defaultDocCount = 10000 // the default elasticsearch allowed limit defaultNumTraces = 100 ) @@ -101,6 +100,7 @@ type SpanReader struct { spanConverter dbmodel.ToDomain timeRangeIndices timeRangeIndexFn sourceFn sourceFn + maxDocCount int } // SpanReaderParams holds constructor params for NewSpanReader @@ -108,7 +108,7 @@ type SpanReaderParams struct { Client es.Client Logger *zap.Logger MaxSpanAge time.Duration - MaxNumSpans int + MaxDocCount int MetricsFactory metrics.Factory IndexPrefix string TagDotReplacement string @@ -127,7 +127,8 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex), spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases), - sourceFn: getSourceFn(p.Archive, p.MaxNumSpans), + sourceFn: getSourceFn(p.Archive, p.MaxDocCount), + maxDocCount: p.MaxDocCount, } } @@ -155,12 +156,12 @@ func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn { return timeRangeIndices } -func getSourceFn(archive bool, maxNumSpans int) sourceFn { +func getSourceFn(archive bool, maxDocCount int) sourceFn { return func(query elastic.Query, nextTime uint64) *elastic.SearchSource { s := elastic.NewSearchSource(). Query(query). - Size(defaultDocCount). - TerminateAfter(maxNumSpans) + Size(maxDocCount). + TerminateAfter(maxDocCount) if !archive { s.Sort("startTime", true). SearchAfter(nextTime) @@ -241,7 +242,7 @@ func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) { defer span.Finish() currentTime := time.Now() jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime) - return s.serviceOperationStorage.getServices(ctx, jaegerIndices) + return s.serviceOperationStorage.getServices(ctx, jaegerIndices, s.maxDocCount) } // GetOperations returns all operations for a specific service traced by Jaeger @@ -253,7 +254,7 @@ func (s *SpanReader) GetOperations( defer span.Finish() currentTime := time.Now() jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime) - operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName) + operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName, s.maxDocCount) if err != nil { return nil, err } diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 1901f3c7378..997579d19e4 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -40,6 +40,8 @@ import ( "github.com/jaegertracing/jaeger/storage/spanstore" ) +const defaultMaxDocCount = 10_000 + var exampleESSpan = []byte( `{ "traceID": "1", @@ -101,6 +103,7 @@ func withSpanReader(fn func(r *spanReaderTest)) { MaxSpanAge: 0, IndexPrefix: "", TagDotReplacement: "@", + MaxDocCount: defaultMaxDocCount, }), } fn(r) @@ -804,7 +807,18 @@ func TestSpanReader_FindTracesSpanCollectionFailure(t *testing.T) { } func TestFindTraceIDs(t *testing.T) { - testGet(traceIDAggregation, t) + testCases := []struct { + aggregrationID string + }{ + {traceIDAggregation}, + {servicesAggregation}, + {operationsAggregation}, + } + for _, testCase := range testCases { + t.Run(testCase.aggregrationID, func(t *testing.T) { + testGet(testCase.aggregrationID, t) + }) + } } func TestTraceIDsStringsToModelsConversion(t *testing.T) { @@ -835,15 +849,23 @@ func mockArchiveMultiSearchService(r *spanReaderTest, indexName string) *mock.Ca return multiSearchService.On("Do", mock.AnythingOfType("*context.valueCtx")) } +// matchTermsAggregation uses reflection to match the size attribute of the TermsAggregation; neither +// attributes nor getters are exported by TermsAggregation. +func matchTermsAggregation(termsAgg *elastic.TermsAggregation) bool { + val := reflect.ValueOf(termsAgg).Elem() + sizeVal := val.FieldByName("size").Elem().Int() + return sizeVal == defaultMaxDocCount +} + func mockSearchService(r *spanReaderTest) *mock.Call { searchService := &mocks.SearchService{} searchService.On("Query", mock.Anything).Return(searchService) searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService) - searchService.On("Size", mock.MatchedBy(func(i int) bool { - return i == 0 || i == defaultDocCount + searchService.On("Size", mock.MatchedBy(func(size int) bool { + return size == 0 // Aggregations apply size (bucket) limits in their own query objects, and do not apply at the parent query level. })).Return(searchService) - searchService.On("Aggregation", stringMatcher(servicesAggregation), mock.AnythingOfType("*elastic.TermsAggregation")).Return(searchService) - searchService.On("Aggregation", stringMatcher(operationsAggregation), mock.AnythingOfType("*elastic.TermsAggregation")).Return(searchService) + searchService.On("Aggregation", stringMatcher(servicesAggregation), mock.MatchedBy(matchTermsAggregation)).Return(searchService) + searchService.On("Aggregation", stringMatcher(operationsAggregation), mock.MatchedBy(matchTermsAggregation)).Return(searchService) searchService.On("Aggregation", stringMatcher(traceIDAggregation), mock.AnythingOfType("*elastic.TermsAggregation")).Return(searchService) r.client.On("Search", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(searchService) return searchService.On("Do", mock.MatchedBy(func(ctx context.Context) bool { diff --git a/plugin/storage/es/spanstore/service_operation.go b/plugin/storage/es/spanstore/service_operation.go index 44f2acb8a24..71ad0c2ee39 100644 --- a/plugin/storage/es/spanstore/service_operation.go +++ b/plugin/storage/es/spanstore/service_operation.go @@ -77,8 +77,8 @@ func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *dbmodel.Span } } -func (s *ServiceOperationStorage) getServices(context context.Context, indices []string) ([]string, error) { - serviceAggregation := getServicesAggregation() +func (s *ServiceOperationStorage) getServices(context context.Context, indices []string, maxDocCount int) ([]string, error) { + serviceAggregation := getServicesAggregation(maxDocCount) searchService := s.client.Search(indices...). Size(0). // set to 0 because we don't want actual documents. @@ -100,15 +100,15 @@ func (s *ServiceOperationStorage) getServices(context context.Context, indices [ return bucketToStringArray(serviceNamesBucket) } -func getServicesAggregation() elastic.Query { +func getServicesAggregation(maxDocCount int) elastic.Query { return elastic.NewTermsAggregation(). Field(serviceName). - Size(defaultDocCount) // Must set to some large number. ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838 + Size(maxDocCount) // ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838 } -func (s *ServiceOperationStorage) getOperations(context context.Context, indices []string, service string) ([]string, error) { +func (s *ServiceOperationStorage) getOperations(context context.Context, indices []string, service string, maxDocCount int) ([]string, error) { serviceQuery := elastic.NewTermQuery(serviceName, service) - serviceFilter := getOperationsAggregation() + serviceFilter := getOperationsAggregation(maxDocCount) searchService := s.client.Search(indices...). Size(0). @@ -131,10 +131,10 @@ func (s *ServiceOperationStorage) getOperations(context context.Context, indices return bucketToStringArray(operationNamesBucket) } -func getOperationsAggregation() elastic.Query { +func getOperationsAggregation(maxDocCount int) elastic.Query { return elastic.NewTermsAggregation(). Field(operationNameField). - Size(defaultDocCount) // Must set to some large number. ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838 + Size(maxDocCount) // ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838 } func hashCode(s dbmodel.Service) string { diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 6bd0de3f3f5..301f68a3a3c 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -39,13 +39,14 @@ import ( ) const ( - host = "0.0.0.0" - queryPort = "9200" - queryHostPort = host + ":" + queryPort - queryURL = "http://" + queryHostPort - indexPrefix = "integration-test" - tagKeyDeDotChar = "@" - maxSpanAge = time.Hour * 72 + host = "0.0.0.0" + queryPort = "9200" + queryHostPort = host + ":" + queryPort + queryURL = "http://" + queryHostPort + indexPrefix = "integration-test" + tagKeyDeDotChar = "@" + maxSpanAge = time.Hour * 72 + defaultMaxDocCount = 10_000 ) type ESStorageIntegration struct { @@ -129,8 +130,9 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro MaxSpanAge: maxSpanAge, TagDotReplacement: tagKeyDeDotChar, Archive: archive, + MaxDocCount: defaultMaxDocCount, }) - dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix) + dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix, defaultMaxDocCount) depMapping := es.GetDependenciesMappings(5, 1, client.GetVersion()) err = dependencyStore.CreateTemplates(depMapping) if err != nil {