Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 44 additions & 5 deletions pkg/quickwit/data_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ func (e *elasticsearchDataQuery) processQuery(q *Query, ms *es.MultiSearchReques
processDocumentQuery(q, b, from, to, defaultTimeField)
} else {
// Otherwise, it is a time series query and we process it
processTimeSeriesQuery(q, b, from, to, defaultTimeField)
err := processTimeSeriesQuery(q, b, from, to, defaultTimeField)
if err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -164,12 +167,18 @@ func (bucketAgg BucketAgg) generateSettingsForDSL() map[string]interface{} {
return bucketAgg.Settings.MustMap()
}

func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo int64, timeField string) es.AggBuilder {
func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo int64, timeField string) (es.AggBuilder, error) {
// If no field is specified, use the time field
field := bucketAgg.Field
if field == "" {
field = timeField
}

// Validate that we have a valid field name to prevent downstream errors
if field == "" {
return aggBuilder, fmt.Errorf("date_histogram aggregation '%s' has no field specified and datasource timeField is empty", bucketAgg.ID)
}

aggBuilder.DateHistogram(bucketAgg.ID, field, func(a *es.DateHistogramAgg, b es.AggBuilder) {
a.FixedInterval = bucketAgg.Settings.Get("interval").MustString("auto")
a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0)
Expand Down Expand Up @@ -204,7 +213,7 @@ func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFro
aggBuilder = b
})

return aggBuilder
return aggBuilder, nil
}

func addHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
Expand Down Expand Up @@ -331,6 +340,30 @@ func isQueryWithError(query *Query) error {
if len(query.Metrics) == 0 || !(isLogsQuery(query) || isDocumentQuery(query)) {
return fmt.Errorf("invalid query, missing metrics and aggregations")
}
} else {
// Validate bucket aggregations have valid fields where required
for _, bucketAgg := range query.BucketAggs {
// Check which aggregation types require fields
switch bucketAgg.Type {
case dateHistType:
// For date_histogram, field can be empty (will use timeField as fallback)
// Validation will happen at query processing time
continue
case histogramType, termsType, geohashGridType, nestedType:
// These aggregation types require a field
if bucketAgg.Field == "" {
return fmt.Errorf("invalid query, bucket aggregation '%s' (type: %s) is missing required field", bucketAgg.ID, bucketAgg.Type)
}
case filtersType:
// Filters aggregations don't need a field
continue
default:
// For unknown aggregation types, be conservative and require field
if bucketAgg.Field == "" {
return fmt.Errorf("invalid query, bucket aggregation '%s' (type: %s) is missing required field", bucketAgg.ID, bucketAgg.Type)
}
}
}
}
return nil
}
Expand Down Expand Up @@ -380,7 +413,7 @@ func processDocumentQuery(q *Query, b *es.SearchRequestBuilder, from, to int64,
b.Size(stringToIntWithDefaultValue(metric.Settings.Get("size").MustString(), defaultSize))
}

func processTimeSeriesQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) {
func processTimeSeriesQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) error {
aggBuilder := b.Agg()
// Process buckets
// iterate backwards to create aggregations bottom-down
Expand All @@ -390,7 +423,11 @@ func processTimeSeriesQuery(q *Query, b *es.SearchRequestBuilder, from, to int64
)
switch bucketAgg.Type {
case dateHistType:
aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to, defaultTimeField)
var err error
aggBuilder, err = addDateHistogramAgg(aggBuilder, bucketAgg, from, to, defaultTimeField)
if err != nil {
return err
}
case histogramType:
aggBuilder = addHistogramAgg(aggBuilder, bucketAgg)
case filtersType:
Expand Down Expand Up @@ -471,6 +508,8 @@ func processTimeSeriesQuery(q *Query, b *es.SearchRequestBuilder, from, to int64
})
}
}

return nil
}

func stringToIntWithDefaultValue(valueStr string, defaultValue int) int {
Expand Down
6 changes: 4 additions & 2 deletions pkg/quickwit/timestamp_infos.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,15 @@ func FindTimestampFormat(timestampFieldName string, parentName *string, fieldMap
if nil != parentName {
fieldName = fmt.Sprintf("%s.%s", *parentName, fieldName)
}

if field.Type == "datetime" && fieldName == timestampFieldName && nil != field.OutputFormat {
return *field.OutputFormat, true
} else if field.Type == "object" && nil != field.FieldMappings {
return FindTimestampFormat(timestampFieldName, &field.Name, field.FieldMappings)
if result, found := FindTimestampFormat(timestampFieldName, &field.Name, field.FieldMappings); found {
return result, true
}
}
}

qwlog.Debug(fmt.Sprintf("FindTimestampFormat: no match found for %s", timestampFieldName))
return "", false
}
Loading
Loading