Skip to content

Commit

Permalink
EQL: Introduce filter pipe (#61805)
Browse files Browse the repository at this point in the history
Allow filtering through a pipe, across events and sequences.
Filter pipes are pushed down to base queries.
For now filtering after limit (head/tail) is forbidden as the
semantics are still up for debate.

Fix #59763

(cherry picked from commit 80569a388b76cecb5f55037fe989c8b6f140761b)
  • Loading branch information
costin committed Sep 2, 2020
1 parent a0e4331 commit e6dc805
Show file tree
Hide file tree
Showing 22 changed files with 379 additions and 208 deletions.
10 changes: 0 additions & 10 deletions x-pack/plugin/eql/qa/common/src/main/resources/test_queries.toml
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,6 @@ case_insensitive = true
query = 'process where process_name >= "SYSTE" and process_name <= "systex"'
expected_event_ids = [1, 2]


[[queries]]
name = "processWithStringEqualityCaseInsensitive1"
case_insensitive = true
query = '''
process where process_name == "VMACTHLP.exe" and unique_pid == 12
| filter true
'''
expected_event_ids = [12]

[[queries]]
name = "processNameIN"
query = '''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,35 +46,6 @@ name = "equalsNullHead"
expected_event_ids = [1, 2, 3, 4, 5]
query = 'process where bad_field == null | head 5'


[[queries]]
name = "lteAndGtWithFilter"
tags = ["comparisons", "pipes"]
query = '''
process where serial_event_id <= 8 and serial_event_id > 7
| filter serial_event_id == 8
'''
expected_event_ids = [8]

[[queries]]
name = "filtersLteAndGt"
query = '''
process where true
| filter serial_event_id <= 10
| filter serial_event_id > 6
'''
expected_event_ids = [7, 8, 9, 10]

[[queries]]
name = "filterLteAndGtWithHead"
query = '''
process where true
| filter serial_event_id <= 10
| filter serial_event_id > 6
| head 2
'''
expected_event_ids = [7, 8]

[[queries]]
name = "headWithFiltersAndTail"
query = '''
Expand Down Expand Up @@ -129,7 +100,6 @@ case_insensitive = true
query = 'process where process_name >= "SYSTE" and process_name <= "systex"'
expected_event_ids = [1, 2]


[[queries]]
name = "processWithStringEqualityCaseInsensitive1"
case_insensitive = true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.eql.analysis;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.xpack.eql.plan.logical.KeyedFilter;
import org.elasticsearch.xpack.eql.plan.logical.LimitWithOffset;
import org.elasticsearch.xpack.eql.session.EqlConfiguration;
import org.elasticsearch.xpack.ql.expression.Literal;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.plan.logical.Project;
import org.elasticsearch.xpack.ql.tree.NodeUtils;
import org.elasticsearch.xpack.ql.tree.Source;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.util.Holder;

import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.ql.tree.Source.synthetic;

/**
* Post processor of the user query once it got analyzed and verified.
* The purpose of this class is to add implicit blocks to the query based on the user request
* that help with the query execution not its semantics.
*
* This could have been done in the optimizer however due to its wrapping nature (which is clunky to do with rules)
* and since the optimized is not parameterized, making this a separate step (similar to the pre-analyzer) is more natural.
*/
public class PostAnalyzer {

private static final Logger log = LogManager.getLogger(PostAnalyzer.class);

public LogicalPlan postAnalyze(LogicalPlan plan, EqlConfiguration configuration) {
LogicalPlan initial = plan;
if (plan.analyzed()) {
// implicit limit

// implicit sequence fetch size

// implicit project + fetch size (if needed)

Holder<Boolean> hasJoin = new Holder<>(Boolean.FALSE);

Source projectCtx = synthetic("<implicit-project>");
// first per KeyedFilter
plan = plan.transformUp(k -> {
hasJoin.set(Boolean.TRUE);
Project p = new Project(projectCtx, k.child(), k.extractionAttributes());

// TODO: this could be incorporated into the query generation
LogicalPlan fetchSize = new LimitWithOffset(synthetic("<fetch-size>"),
new Literal(synthetic("<fetch-value>"), configuration.fetchSize(), DataTypes.INTEGER),
p);

return new KeyedFilter(k.source(), fetchSize, k.keys(), k.timestamp(), k.tiebreaker());
}, KeyedFilter.class);

// in case of event queries, filter everything
if (hasJoin.get() == false) {
plan = new Project(projectCtx, plan, emptyList());
}
}

if (log.isTraceEnabled()) {
log.trace("Applied post-analysys\n{}", NodeUtils.diffString(initial, plan));
}
return plan;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.elasticsearch.xpack.eql.plan.logical.Head;
import org.elasticsearch.xpack.eql.plan.logical.Join;
import org.elasticsearch.xpack.eql.plan.logical.LimitWithOffset;
import org.elasticsearch.xpack.eql.plan.logical.Sequence;
import org.elasticsearch.xpack.eql.plan.logical.Tail;
import org.elasticsearch.xpack.eql.stats.FeatureMetric;
Expand All @@ -17,10 +18,8 @@
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.UnresolvedAttribute;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.plan.logical.Project;
import org.elasticsearch.xpack.ql.tree.Node;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.util.Holder;
import org.elasticsearch.xpack.ql.util.StringUtils;

import java.util.ArrayList;
Expand All @@ -34,7 +33,6 @@
import static java.util.stream.Collectors.toMap;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.EVENT;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_UNTIL;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_KEYS_FIVE_OR_MORE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_KEYS_FOUR;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_KEYS_ONE;
Expand All @@ -44,15 +42,16 @@
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_QUERIES_FOUR;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_QUERIES_THREE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_QUERIES_TWO;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_UNTIL;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.PIPE_HEAD;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.PIPE_TAIL;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_UNTIL;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_MAXSPAN;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_QUERIES_FIVE_OR_MORE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_QUERIES_FOUR;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_QUERIES_THREE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_QUERIES_TWO;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_UNTIL;
import static org.elasticsearch.xpack.ql.common.Failure.fail;

/**
Expand Down Expand Up @@ -140,15 +139,33 @@ Collection<Failure> verify(LogicalPlan plan) {
failures.addAll(localFailures);
});

// Concrete verifications

// if there are no (major) unresolved failures, do more in-depth analysis

if (failures.isEmpty()) {

plan.forEachDown(p -> {
Set<Failure> localFailures = new LinkedHashSet<>();

checkNoPipesAfterLimit(p, localFailures);

failures.addAll(localFailures);

// mark the plan as analyzed
// if everything checks out
if (failures.isEmpty()) {
p.setAnalyzed();
}
});
}

// gather metrics
if (failures.isEmpty()) {
BitSet b = new BitSet(FeatureMetric.values().length);
Holder<Boolean> isLikelyAnEventQuery = new Holder<>(false);

plan.forEachDown(p -> {
if (p instanceof Project) {
isLikelyAnEventQuery.set(true);
} else if (p instanceof Head) {
if (p instanceof Head) {
b.set(PIPE_HEAD.ordinal());
} else if (p instanceof Tail) {
b.set(PIPE_TAIL.ordinal());
Expand Down Expand Up @@ -212,7 +229,7 @@ Collection<Failure> verify(LogicalPlan plan) {
}
});

if (isLikelyAnEventQuery.get() && b.get(SEQUENCE.ordinal()) == false && b.get(JOIN.ordinal()) == false) {
if (b.get(SEQUENCE.ordinal()) == false && b.get(JOIN.ordinal()) == false) {
b.set(EVENT.ordinal());
}

Expand All @@ -223,4 +240,12 @@ Collection<Failure> verify(LogicalPlan plan) {

return failures;
}

private void checkNoPipesAfterLimit(LogicalPlan p, Set<Failure> localFailures) {
if ((p instanceof LimitWithOffset) == false) {
if (p.anyMatch(LimitWithOffset.class::isInstance)) {
localFailures.add(fail(p, "Pipe [{}] not allowed (yet) after head/tail", p.sourceText()));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.xpack.eql.analysis.PostAnalyzer;
import org.elasticsearch.xpack.eql.analysis.PreAnalyzer;
import org.elasticsearch.xpack.eql.analysis.Verifier;
import org.elasticsearch.xpack.eql.expression.function.EqlFunctionRegistry;
Expand All @@ -33,6 +34,7 @@ public class PlanExecutor {
private final FunctionRegistry functionRegistry;

private final PreAnalyzer preAnalyzer;
private final PostAnalyzer postAnalyzer;
private final Verifier verifier;
private final Optimizer optimizer;
private final Planner planner;
Expand All @@ -50,13 +52,14 @@ public PlanExecutor(Client client, IndexResolver indexResolver, NamedWriteableRe
this.metrics = new Metrics();

this.preAnalyzer = new PreAnalyzer();
this.postAnalyzer = new PostAnalyzer();
this.verifier = new Verifier(metrics);
this.optimizer = new Optimizer();
this.planner = new Planner();
}

private EqlSession newSession(EqlConfiguration cfg) {
return new EqlSession(client, cfg, indexResolver, preAnalyzer, functionRegistry, verifier, optimizer, planner, this);
return new EqlSession(client, cfg, indexResolver, preAnalyzer, postAnalyzer, functionRegistry, verifier, optimizer, planner, this);
}

public void eql(EqlConfiguration cfg, String eql, ParserParams parserParams, ActionListener<Results> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.xpack.ql.expression.Order;
import org.elasticsearch.xpack.ql.expression.Order.NullsPosition;
import org.elasticsearch.xpack.ql.expression.Order.OrderDirection;
import org.elasticsearch.xpack.ql.expression.predicate.logical.And;
import org.elasticsearch.xpack.ql.expression.predicate.logical.Not;
import org.elasticsearch.xpack.ql.expression.predicate.nulls.IsNotNull;
import org.elasticsearch.xpack.ql.expression.predicate.nulls.IsNull;
Expand All @@ -38,6 +39,7 @@
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.SetAsOptimized;
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.TransformDirection;
import org.elasticsearch.xpack.ql.plan.logical.Filter;
import org.elasticsearch.xpack.ql.plan.logical.LeafPlan;
import org.elasticsearch.xpack.ql.plan.logical.Limit;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.plan.logical.OrderBy;
Expand All @@ -51,6 +53,7 @@
import java.util.List;

import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;

public class Optimizer extends RuleExecutor<LogicalPlan> {

Expand All @@ -60,7 +63,8 @@ public LogicalPlan optimize(LogicalPlan verified) {

@Override
protected Iterable<RuleExecutor<LogicalPlan>.Batch> batches() {
Batch substitutions = new Batch("Operator Replacement", Limiter.ONCE, new ReplaceSurrogateFunction());
Batch substitutions = new Batch("Substitution", Limiter.ONCE,
new ReplaceSurrogateFunction());

Batch operators = new Batch("Operator Optimization",
new ConstantFolding(),
Expand All @@ -76,7 +80,8 @@ protected Iterable<RuleExecutor<LogicalPlan>.Batch> batches() {
// prune/elimination
new PruneFilters(),
new PruneLiteralsInOrderBy(),
new CombineLimits());
new CombineLimits(),
new PushDownFilterPipe());

Batch ordering = new Batch("Implicit Order",
new SortByLimit(),
Expand All @@ -92,8 +97,7 @@ protected Iterable<RuleExecutor<LogicalPlan>.Batch> batches() {

return Arrays.asList(substitutions, operators, ordering, local, label);
}



private static class ReplaceWildcards extends OptimizerRule<Filter> {

private static boolean isWildcard(Expression expr) {
Expand Down Expand Up @@ -237,6 +241,42 @@ protected LogicalPlan rule(LimitWithOffset limit) {
}
}

/**
* Push down filter pipes.
*/
static class PushDownFilterPipe extends OptimizerRule<Filter> {

@Override
protected LogicalPlan rule(Filter filter) {
LogicalPlan child = filter.child();

// can't push it down further
if (child instanceof LeafPlan) {
return filter;
}
// combine filters if possible
if (child instanceof Filter) {
Filter f = (Filter) child;
return new Filter(f.source(), f.child(), new And(filter.source(), f.condition(), filter.condition()));
}

// treat Join separately to avoid pushing the filter on until
if (child instanceof Join) {
Join j = (Join) child;
return j.with(j.queries().stream()
.map(q -> {
Filter f = new Filter(filter.source(), q.child(), filter.condition());
return new KeyedFilter(q.source(), f, q.keys(), q.timestamp(), q.tiebreaker());
})
.collect(toList()), j.until(), j.direction());
}
// otherwise keep pushing it down
return child.replaceChildren(child.children().stream()
.map(c -> new Filter(filter.source(), c, filter.condition()))
.collect(toList()));
}
}

/**
* Align the implicit order with the limit (head means ASC or tail means DESC).
*/
Expand Down
Loading

0 comments on commit e6dc805

Please sign in to comment.