Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split ES Types into separate indices #263

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor reader to have split schema
  • Loading branch information
mh-park committed Jul 11, 2017
commit 545c3c3946517e5451ce190f57c1778607504155
19 changes: 10 additions & 9 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import (

const (
serviceName = "serviceName"
indexPrefix = "jaeger-"
spanIndexPrefix = "jaeger-span-"
serviceIndexPrefix = "jaeger-service-"
operationsAggregation = "distinct_operations"
servicesAggregation = "distinct_services"
traceIDAggregation = "traceIDs"
Expand Down Expand Up @@ -110,7 +111,7 @@ func (s *SpanReader) readTrace(traceID string, traceQuery *spanstore.TraceQueryP

traceQuery.StartTimeMax = traceQuery.StartTimeMax.Add(time.Hour)
traceQuery.StartTimeMin = traceQuery.StartTimeMin.Add(-time.Hour)
indices := s.findIndices(traceQuery)
indices := s.findIndices(spanIndexPrefix, traceQuery)
esSpansRaw, err := s.executeQuery(query, indices...)
if err != nil {
return nil, errors.Wrap(err, "Query execution failed")
Expand Down Expand Up @@ -165,7 +166,7 @@ func (s *SpanReader) unmarshallJSONSpan(esSpanRaw *elastic.SearchHit) (*jModel.S
}

// Returns the array of indices that we need to query, based on query params
func (s *SpanReader) findIndices(traceQuery *spanstore.TraceQueryParameters) []string {
func (s *SpanReader) findIndices(typeOfIndex string, traceQuery *spanstore.TraceQueryParameters) []string {
today := time.Now()
threeDaysAgo := today.AddDate(0, 0, -3) // TODO: make this configurable

Expand All @@ -177,7 +178,7 @@ func (s *SpanReader) findIndices(traceQuery *spanstore.TraceQueryParameters) []s
var indices []string
current := traceQuery.StartTimeMax
for current.After(traceQuery.StartTimeMin) && current.After(threeDaysAgo) {
index := IndexWithDate(current)
index := indexWithDate(typeOfIndex, current)
exists, _ := s.client.IndexExists(index).Do(s.ctx) // Don't care about error, if it's an error, exists will be false anyway
if exists {
indices = append(indices, index)
Expand All @@ -188,15 +189,15 @@ func (s *SpanReader) findIndices(traceQuery *spanstore.TraceQueryParameters) []s
}

// IndexWithDate returns the index name formatted to date.
func IndexWithDate(date time.Time) string {
return indexPrefix + date.Format("2006-01-02")
func indexWithDate(typeOfIndex string, date time.Time) string {
return typeOfIndex + date.Format("2006-01-02")
}

// GetServices returns all services traced by Jaeger, ordered by frequency
func (s *SpanReader) GetServices() ([]string, error) {
serviceAggregation := s.getServicesAggregation()

jaegerIndices := s.findIndices(&spanstore.TraceQueryParameters{})
jaegerIndices := s.findIndices(serviceIndexPrefix, &spanstore.TraceQueryParameters{})

searchService := s.client.Search(jaegerIndices...).
Type(serviceType).
Expand Down Expand Up @@ -226,7 +227,7 @@ func (s *SpanReader) getServicesAggregation() elastic.Query {
func (s *SpanReader) GetOperations(service string) ([]string, error) {
serviceQuery := elastic.NewTermQuery(serviceName, service)
serviceFilter := s.getOperationsAggregation()
jaegerIndices := s.findIndices(&spanstore.TraceQueryParameters{})
jaegerIndices := s.findIndices(serviceIndexPrefix, &spanstore.TraceQueryParameters{})

searchService := s.client.Search(jaegerIndices...).
Type(serviceType).
Expand Down Expand Up @@ -359,7 +360,7 @@ func (s *SpanReader) findTraceIDs(traceQuery *spanstore.TraceQueryParameters) ([
aggregation := s.buildTraceIDAggregation(traceQuery.NumTraces)
boolQuery := s.buildFindTraceIDsQuery(traceQuery)

jaegerIndices := s.findIndices(traceQuery)
jaegerIndices := s.findIndices(spanIndexPrefix, traceQuery)

searchService := s.client.Search(jaegerIndices...).
Type(spanType).
Expand Down
40 changes: 24 additions & 16 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,16 +280,16 @@ func TestSpanReader_findIndicesEmptyQuery(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
mockExistsService(r)

actual := r.reader.findIndices(&spanstore.TraceQueryParameters{})
actual := r.reader.findIndices(spanIndexPrefix, &spanstore.TraceQueryParameters{})

today := time.Now()
yesterday := today.AddDate(0, 0, -1)
twoDaysAgo := today.AddDate(0, 0, -2)

expected := []string{
IndexWithDate(today),
IndexWithDate(yesterday),
IndexWithDate(twoDaysAgo),
indexWithDate(spanIndexPrefix, today),
indexWithDate(spanIndexPrefix, yesterday),
indexWithDate(spanIndexPrefix, twoDaysAgo),
}

assert.EqualValues(t, expected, actual)
Expand All @@ -301,10 +301,13 @@ func TestSpanReader_findIndicesNoIndices(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
mockExistsService(r)

actual := r.reader.findIndices(&spanstore.TraceQueryParameters{
StartTimeMin: time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC),
StartTimeMax: time.Date(2017, time.April, 21, 4, 21, 19, 95, time.UTC),
})
actual := r.reader.findIndices(
spanIndexPrefix,
&spanstore.TraceQueryParameters{
StartTimeMin: time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC),
StartTimeMax: time.Date(2017, time.April, 21, 4, 21, 19, 95, time.UTC),
},
)

var expected []string

Expand All @@ -318,17 +321,20 @@ func TestSpanReader_findIndicesOnlyRecent(t *testing.T) {

today := time.Now()

actual := r.reader.findIndices(&spanstore.TraceQueryParameters{
StartTimeMin: today.AddDate(0, 0, -7),
StartTimeMax: today.AddDate(0, 0, -1),
})
actual := r.reader.findIndices(
spanIndexPrefix,
&spanstore.TraceQueryParameters{
StartTimeMin: today.AddDate(0, 0, -7),
StartTimeMax: today.AddDate(0, 0, -1),
},
)

yesterday := today.AddDate(0, 0, -1)
twoDaysAgo := today.AddDate(0, 0, -2)

expected := []string{
IndexWithDate(yesterday),
IndexWithDate(twoDaysAgo),
indexWithDate(spanIndexPrefix, yesterday),
indexWithDate(spanIndexPrefix, twoDaysAgo),
}

assert.EqualValues(t, expected, actual)
Expand All @@ -337,8 +343,10 @@ func TestSpanReader_findIndicesOnlyRecent(t *testing.T) {

func TestSpanReader_indexWithDate(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
actual := IndexWithDate(time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC))
assert.Equal(t, "jaeger-1995-04-21", actual)
actual := indexWithDate(spanIndexPrefix, time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC))
assert.Equal(t, "jaeger-span-1995-04-21", actual)
actual = indexWithDate(serviceIndexPrefix, time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC))
assert.Equal(t, "jaeger-service-1995-04-21", actual)
})
}

Expand Down