Skip to content

Commit

Permalink
Changed Range Query to use startTimeMillis date field instead of star…
Browse files Browse the repository at this point in the history
…tTime field (#2980)

* Fix exitting collector abruptly after Zipkin sever close during explicit shutdown

Signed-off-by: Sreevani871 <sreevani@freshdesk.com>

* Changed to use startTimeMillis in ES timeRangeTerm query when es.use-alises:true

Signed-off-by: Sreevani871 <sreevani.karasala@freshworks.com>

* tests fail fixes

Signed-off-by: Sreevani871 <sreevani.karasala@freshworks.com>

* go-lint errors fix

Signed-off-by: Sreevani871 <sreevani.karasala@freshworks.com>

* simplified code changes

Signed-off-by: Sreevani871 <sreevani.karasala@freshworks.com>

Co-authored-by: Sreevani871 <sreevani@freshdesk.com>
Co-authored-by: Albert <26584478+albertteoh@users.noreply.github.com>
  • Loading branch information
3 people authored May 11, 2021
1 parent b9e7dfb commit 073a341
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 9 deletions.
19 changes: 15 additions & 4 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
traceIDField = "traceID"
durationField = "duration"
startTimeField = "startTime"
startTimeMillisField = "startTimeMillis"
serviceNameField = "process.serviceName"
operationNameField = "operationName"
objectTagsField = "tag"
Expand Down Expand Up @@ -102,6 +103,7 @@ type SpanReader struct {
timeRangeIndices timeRangeIndexFn
sourceFn sourceFn
maxDocCount int
useReadWriteAliases bool
}

// SpanReaderParams holds constructor params for NewSpanReader
Expand Down Expand Up @@ -133,6 +135,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader {
timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters),
sourceFn: getSourceFn(p.Archive, p.MaxDocCount),
maxDocCount: p.MaxDocCount,
useReadWriteAliases: p.UseReadWriteAliases,
}
}

Expand Down Expand Up @@ -352,7 +355,6 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st
// i.e starts in one and ends in another.
indices := s.timeRangeIndices(s.spanIndexPrefix, s.indexDateLayout, startTime.Add(-time.Hour), endTime.Add(time.Hour))
nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour))

searchAfterTime := make(map[model.TraceID]uint64)
totalDocumentsFetched := make(map[model.TraceID]int)
tracesMap := make(map[model.TraceID]*model.Trace)
Expand All @@ -362,13 +364,19 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st
}
searchRequests := make([]*elastic.SearchRequest, len(traceIDs))
for i, traceID := range traceIDs {
query := buildTraceByIDQuery(traceID)
traceQuery := buildTraceByIDQuery(traceID)
query := elastic.NewBoolQuery().
Must(traceQuery)
if s.useReadWriteAliases {
startTimeRangeQuery := s.buildStartTimeQuery(startTime.Add(-time.Hour*24), endTime.Add(time.Hour*24))
query = query.Must(startTimeRangeQuery)
}

if val, ok := searchAfterTime[traceID]; ok {
nextTime = val
}

s := s.sourceFn(query, nextTime)

searchRequests[i] = elastic.NewSearchRequest().
IgnoreUnavailable(true).
Source(s)
Expand Down Expand Up @@ -617,7 +625,10 @@ func (s *SpanReader) buildDurationQuery(durationMin time.Duration, durationMax t
func (s *SpanReader) buildStartTimeQuery(startTimeMin time.Time, startTimeMax time.Time) elastic.Query {
minStartTimeMicros := model.TimeAsEpochMicroseconds(startTimeMin)
maxStartTimeMicros := model.TimeAsEpochMicroseconds(startTimeMax)
return elastic.NewRangeQuery(startTimeField).Gte(minStartTimeMicros).Lte(maxStartTimeMicros)
// startTimeMillisField is date field in ES mapping.
// Using date field in range queries helps to skip search on unnecessary shards at Elasticsearch side.
// https://discuss.elastic.co/t/timeline-query-on-timestamped-indices/129328/2
return elastic.NewRangeQuery(startTimeMillisField).Gte(minStartTimeMicros / 1000).Lte(maxStartTimeMicros / 1000)
}

func (s *SpanReader) buildServiceNameQuery(serviceName string) elastic.Query {
Expand Down
12 changes: 7 additions & 5 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,17 @@ func TestSpanReader_multiRead_followUp_query(t *testing.T) {
spanBytesID2, err := json.Marshal(spanID2)
require.NoError(t, err)

id1Query := elastic.NewBoolQuery().Should(
traceID1Query := elastic.NewBoolQuery().Should(
elastic.NewTermQuery(traceIDField, model.TraceID{High: 0, Low: 1}.String()).Boost(2),
elastic.NewTermQuery(traceIDField, fmt.Sprintf("%x", 1)))
id1Query := elastic.NewBoolQuery().Must(traceID1Query)
id1Search := elastic.NewSearchRequest().
IgnoreUnavailable(true).
Source(r.reader.sourceFn(id1Query, model.TimeAsEpochMicroseconds(date.Add(-time.Hour))))
id2Query := elastic.NewBoolQuery().Should(
traceID2Query := elastic.NewBoolQuery().Should(
elastic.NewTermQuery(traceIDField, model.TraceID{High: 0, Low: 2}.String()).Boost(2),
elastic.NewTermQuery(traceIDField, fmt.Sprintf("%x", 2)))
id2Query := elastic.NewBoolQuery().Must(traceID2Query)
id2Search := elastic.NewSearchRequest().
IgnoreUnavailable(true).
Source(r.reader.sourceFn(id2Query, model.TimeAsEpochMicroseconds(date.Add(-time.Hour))))
Expand Down Expand Up @@ -1020,7 +1022,7 @@ func TestSpanReader_buildDurationQuery(t *testing.T) {
func TestSpanReader_buildStartTimeQuery(t *testing.T) {
expectedStr :=
`{ "range":
{ "startTime": { "include_lower": true,
{ "startTimeMillis": { "include_lower": true,
"include_upper": true,
"from": 1000000,
"to": 2000000 }
Expand All @@ -1036,8 +1038,8 @@ func TestSpanReader_buildStartTimeQuery(t *testing.T) {
expected := make(map[string]interface{})
json.Unmarshal([]byte(expectedStr), &expected)
// We need to do this because we cannot process a json into uint64.
expected["range"].(map[string]interface{})["startTime"].(map[string]interface{})["from"] = model.TimeAsEpochMicroseconds(startTimeMin)
expected["range"].(map[string]interface{})["startTime"].(map[string]interface{})["to"] = model.TimeAsEpochMicroseconds(startTimeMax)
expected["range"].(map[string]interface{})["startTimeMillis"].(map[string]interface{})["from"] = model.TimeAsEpochMicroseconds(startTimeMin) / 1000
expected["range"].(map[string]interface{})["startTimeMillis"].(map[string]interface{})["to"] = model.TimeAsEpochMicroseconds(startTimeMax) / 1000

assert.EqualValues(t, expected, actual)
})
Expand Down

0 comments on commit 073a341

Please sign in to comment.