-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Move pipeline agg validation to coordinating node #53669
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
*/ | ||
package org.elasticsearch.search.aggregations; | ||
|
||
import org.elasticsearch.action.ActionRequestValidationException; | ||
import org.elasticsearch.common.ParsingException; | ||
import org.elasticsearch.common.Strings; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
|
@@ -283,8 +284,6 @@ public boolean mustVisitAllDocs() { | |
return false; | ||
} | ||
|
||
|
||
|
||
public Builder addAggregator(AggregationBuilder factory) { | ||
if (!names.add(factory.name)) { | ||
throw new IllegalArgumentException("Two sibling aggregations cannot have the same name: [" + factory.name + "]"); | ||
|
@@ -298,23 +297,59 @@ public Builder addPipelineAggregator(PipelineAggregationBuilder pipelineAggregat | |
return this; | ||
} | ||
|
||
/** | ||
* Validate the root of the aggregation tree. | ||
*/ | ||
public ActionRequestValidationException validate(ActionRequestValidationException e) { | ||
PipelineAggregationBuilder.ValidationContext context = | ||
PipelineAggregationBuilder.ValidationContext.forTreeRoot(aggregationBuilders, pipelineAggregatorBuilders, e); | ||
validatePipelines(context); | ||
return validateChildren(context.getValidationException()); | ||
} | ||
|
||
/** | ||
* Validate a the pipeline aggregations in this factory. | ||
*/ | ||
private void validatePipelines(PipelineAggregationBuilder.ValidationContext context) { | ||
List<PipelineAggregationBuilder> orderedPipelineAggregators; | ||
try { | ||
orderedPipelineAggregators = resolvePipelineAggregatorOrder(pipelineAggregatorBuilders, aggregationBuilders); | ||
} catch (IllegalArgumentException iae) { | ||
context.addValidationError(iae.getMessage()); | ||
return; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we allow the validations to keep running down the tree, so we can tell the user all the problems at once? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was tempted but I think the tree is pretty borked at this point and you'll end up with duplicate error messages all about the same thing. And I figured we were just returning a single error message right now so it probably isn't worse than it was before and we could do it later if we wanted it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense to me 👍 |
||
} | ||
for (PipelineAggregationBuilder builder : orderedPipelineAggregators) { | ||
builder.validate(context); | ||
} | ||
} | ||
|
||
/** | ||
* Validate a the children of this factory. | ||
*/ | ||
private ActionRequestValidationException validateChildren(ActionRequestValidationException e) { | ||
for (AggregationBuilder agg : aggregationBuilders) { | ||
PipelineAggregationBuilder.ValidationContext context = | ||
PipelineAggregationBuilder.ValidationContext.forInsideTree(agg, e); | ||
agg.factoriesBuilder.validatePipelines(context); | ||
e = agg.factoriesBuilder.validateChildren(context.getValidationException()); | ||
} | ||
return e; | ||
} | ||
|
||
public AggregatorFactories build(QueryShardContext queryShardContext, AggregatorFactory parent) throws IOException { | ||
if (aggregationBuilders.isEmpty() && pipelineAggregatorBuilders.isEmpty()) { | ||
return EMPTY; | ||
} | ||
List<PipelineAggregationBuilder> orderedpipelineAggregators = null; | ||
orderedpipelineAggregators = resolvePipelineAggregatorOrder(this.pipelineAggregatorBuilders, this.aggregationBuilders); | ||
for (PipelineAggregationBuilder builder : orderedpipelineAggregators) { | ||
builder.validate(parent, aggregationBuilders, pipelineAggregatorBuilders); | ||
} | ||
List<PipelineAggregationBuilder> orderedPipelineAggregators = | ||
resolvePipelineAggregatorOrder(pipelineAggregatorBuilders, aggregationBuilders); | ||
AggregatorFactory[] aggFactories = new AggregatorFactory[aggregationBuilders.size()]; | ||
|
||
int i = 0; | ||
for (AggregationBuilder agg : aggregationBuilders) { | ||
aggFactories[i] = agg.build(queryShardContext, parent); | ||
++i; | ||
} | ||
return new AggregatorFactories(aggFactories, orderedpipelineAggregators); | ||
return new AggregatorFactories(aggFactories, orderedPipelineAggregators); | ||
} | ||
|
||
private List<PipelineAggregationBuilder> resolvePipelineAggregatorOrder( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,14 +18,20 @@ | |
*/ | ||
package org.elasticsearch.search.aggregations; | ||
|
||
import org.elasticsearch.action.ActionRequestValidationException; | ||
import org.elasticsearch.action.ValidateActions; | ||
import org.elasticsearch.common.Strings; | ||
import org.elasticsearch.common.io.stream.NamedWriteable; | ||
import org.elasticsearch.common.xcontent.ToXContentFragment; | ||
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; | ||
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder; | ||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; | ||
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; | ||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; | ||
|
||
import java.util.Collection; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
/** | ||
* A factory that knows how to create an {@link PipelineAggregator} of a | ||
|
@@ -64,11 +70,145 @@ public final String[] getBucketsPaths() { | |
} | ||
|
||
/** | ||
* Internal: Validates the state of this factory (makes sure the factory is properly | ||
* configured) | ||
* Makes sure this builder is properly configured. | ||
*/ | ||
protected abstract void validate(AggregatorFactory parent, Collection<AggregationBuilder> aggregationBuilders, | ||
Collection<PipelineAggregationBuilder> pipelineAggregatorBuilders); | ||
protected abstract void validate(ValidationContext context); | ||
public abstract static class ValidationContext { | ||
/** | ||
* Build the context for the root of the aggregation tree. | ||
*/ | ||
public static ValidationContext forTreeRoot(Collection<AggregationBuilder> siblingAggregations, | ||
Collection<PipelineAggregationBuilder> siblingPipelineAggregations, | ||
ActionRequestValidationException validationFailuresSoFar) { | ||
return new ForTreeRoot(siblingAggregations, siblingPipelineAggregations, validationFailuresSoFar); | ||
} | ||
|
||
/** | ||
* Build the context for a node inside the aggregation tree. | ||
*/ | ||
public static ValidationContext forInsideTree(AggregationBuilder parent, | ||
ActionRequestValidationException validationFailuresSoFar) { | ||
return new ForInsideTree(parent, validationFailuresSoFar); | ||
} | ||
|
||
|
||
private ActionRequestValidationException e; | ||
|
||
private ValidationContext(ActionRequestValidationException validationFailuresSoFar) { | ||
this.e = validationFailuresSoFar; | ||
} | ||
|
||
private static class ForTreeRoot extends ValidationContext { | ||
private final Collection<AggregationBuilder> siblingAggregations; | ||
private final Collection<PipelineAggregationBuilder> siblingPipelineAggregations; | ||
|
||
ForTreeRoot(Collection<AggregationBuilder> siblingAggregations, | ||
Collection<PipelineAggregationBuilder> siblingPipelineAggregations, | ||
ActionRequestValidationException validationFailuresSoFar) { | ||
super(validationFailuresSoFar); | ||
this.siblingAggregations = Objects.requireNonNull(siblingAggregations); | ||
this.siblingPipelineAggregations = Objects.requireNonNull(siblingPipelineAggregations); | ||
} | ||
|
||
@Override | ||
public Collection<AggregationBuilder> getSiblingAggregations() { | ||
return siblingAggregations; | ||
} | ||
|
||
@Override | ||
public Collection<PipelineAggregationBuilder> getSiblingPipelineAggregations() { | ||
return siblingPipelineAggregations; | ||
} | ||
|
||
@Override | ||
public void validateParentAggSequentiallyOrdered(String type, String name) { | ||
addValidationError(type + " aggregation [" + name | ||
+ "] must have a histogram, date_histogram or auto_date_histogram as parent but doesn't have a parent"); | ||
} | ||
} | ||
|
||
private static class ForInsideTree extends ValidationContext { | ||
private final AggregationBuilder parent; | ||
|
||
ForInsideTree(AggregationBuilder parent, ActionRequestValidationException validationFailuresSoFar) { | ||
super(validationFailuresSoFar); | ||
this.parent = Objects.requireNonNull(parent); | ||
} | ||
|
||
@Override | ||
public Collection<AggregationBuilder> getSiblingAggregations() { | ||
return parent.getSubAggregations(); | ||
} | ||
|
||
@Override | ||
public Collection<PipelineAggregationBuilder> getSiblingPipelineAggregations() { | ||
return parent.getPipelineAggregations(); | ||
} | ||
|
||
@Override | ||
public void validateParentAggSequentiallyOrdered(String type, String name) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahh yes, this thing. Would be nice someday if we could get rid of these instanceofs with some kind of Battle for another day :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +++++++++++++ |
||
if (parent instanceof HistogramAggregationBuilder) { | ||
HistogramAggregationBuilder histoParent = (HistogramAggregationBuilder) parent; | ||
if (histoParent.minDocCount() != 0) { | ||
addValidationError( | ||
"parent histogram of " + type + " aggregation [" + name + "] must have min_doc_count of 0"); | ||
} | ||
} else if (parent instanceof DateHistogramAggregationBuilder) { | ||
DateHistogramAggregationBuilder histoParent = (DateHistogramAggregationBuilder) parent; | ||
if (histoParent.minDocCount() != 0) { | ||
addValidationError( | ||
"parent histogram of " + type + " aggregation [" + name + "] must have min_doc_count of 0"); | ||
} | ||
} else if (parent instanceof AutoDateHistogramAggregationBuilder) { | ||
// Nothing to check | ||
} else { | ||
addValidationError( | ||
type + " aggregation [" + name + "] must have a histogram, date_histogram or auto_date_histogram as parent"); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Aggregations that are siblings to the aggregation being validated. | ||
*/ | ||
public abstract Collection<AggregationBuilder> getSiblingAggregations(); | ||
|
||
/** | ||
* Pipeline aggregations that are siblings to the aggregation being validated. | ||
*/ | ||
public abstract Collection<PipelineAggregationBuilder> getSiblingPipelineAggregations(); | ||
|
||
/** | ||
* Add a validation error to this context. All validation errors | ||
* are accumulated in a list and, if there are any, the request | ||
* is not executed and the entire list is returned as the error | ||
* response. | ||
*/ | ||
public void addValidationError(String error) { | ||
e = ValidateActions.addValidationError(error, e); | ||
} | ||
|
||
/** | ||
* Add a validation error about the {@code buckets_path}. | ||
*/ | ||
public void addBucketPathValidationError(String error) { | ||
addValidationError(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName() + ' ' + error); | ||
} | ||
|
||
/** | ||
* Validates that the parent is sequentially ordered. | ||
*/ | ||
public abstract void validateParentAggSequentiallyOrdered(String type, String name); | ||
|
||
/** | ||
* The validation exception, if there is one. It'll be {@code null} | ||
* if the context wasn't provided with any exception on creation | ||
* and none were added. | ||
*/ | ||
public ActionRequestValidationException getValidationException() { | ||
return e; | ||
} | ||
} | ||
|
||
/** | ||
* Creates the pipeline aggregator | ||
|
Uh oh!
There was an error while loading. Please reload this page.