-
Couldn't load subscription status.
- Fork 176
Fix execution errors caused by plan gap #3350
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
19732b3
378d165
4698600
efc8cd8
2de4f5c
6573ab7
71f853e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,8 +42,9 @@ public void onMatch(RelOptRuleCall call) { | |
| } | ||
|
|
||
| protected void apply(RelOptRuleCall call, Filter filter, CalciteOpenSearchIndexScan scan) { | ||
| if (scan.pushDownFilter(filter)) { | ||
| call.transformTo(scan); | ||
| CalciteOpenSearchIndexScan newScan = scan.pushDownFilter(filter); | ||
| if (newScan != null) { | ||
|
Comment on lines
+45
to
+46
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, it's typo. It happens the current logic works as well. |
||
| call.transformTo(newScan); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,7 @@ | |
|
|
||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| import java.util.ArrayDeque; | ||
| import java.util.List; | ||
| import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; | ||
| import org.apache.calcite.adapter.enumerable.PhysType; | ||
|
|
@@ -32,20 +33,26 @@ | |
| import org.checkerframework.checker.nullness.qual.Nullable; | ||
| import org.opensearch.index.query.QueryBuilder; | ||
| import org.opensearch.sql.calcite.plan.OpenSearchTableScan; | ||
| import org.opensearch.sql.common.setting.Settings; | ||
| import org.opensearch.sql.opensearch.planner.physical.OpenSearchIndexRules; | ||
| import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; | ||
| import org.opensearch.sql.opensearch.request.PredicateAnalyzer; | ||
| import org.opensearch.sql.opensearch.request.PredicateAnalyzer.ExpressionNotAnalyzableException; | ||
| import org.opensearch.sql.opensearch.request.PredicateAnalyzer.PredicateAnalyzerException; | ||
| import org.opensearch.sql.opensearch.storage.OpenSearchIndex; | ||
|
|
||
| /** Relational expression representing a scan of an OpenSearchIndex type. */ | ||
| public class CalciteOpenSearchIndexScan extends OpenSearchTableScan { | ||
| private static final Logger LOG = LogManager.getLogger(CalciteOpenSearchIndexScan.class); | ||
|
|
||
| private final OpenSearchIndex osIndex; | ||
| private final OpenSearchRequestBuilder requestBuilder; | ||
| // The schema of this scan operator, it's initialized with the row type of the table, but may be | ||
| // changed by push down operations. | ||
| private final RelDataType schema; | ||
| // This context maintains all the push down actions, which will be applied to the requestBuilder | ||
| // when it begins to scan data from OpenSearch. | ||
| // Because OpenSearchRequestBuilder doesn't support deep copy while we want to keep the | ||
| // requestBuilder independent among different plans produced in the optimization process, | ||
| // so we cannot apply these actions right away. | ||
| private final PushDownContext pushDownContext; | ||
|
|
||
| /** | ||
| * Creates an CalciteOpenSearchIndexScan. | ||
|
|
@@ -56,24 +63,31 @@ public class CalciteOpenSearchIndexScan extends OpenSearchTableScan { | |
| */ | ||
| public CalciteOpenSearchIndexScan( | ||
| RelOptCluster cluster, RelOptTable table, OpenSearchIndex index) { | ||
| this(cluster, table, index, index.createRequestBuilder(), table.getRowType()); | ||
| this(cluster, table, index, table.getRowType(), new PushDownContext()); | ||
| } | ||
|
|
||
| public CalciteOpenSearchIndexScan( | ||
| private CalciteOpenSearchIndexScan( | ||
| RelOptCluster cluster, | ||
| RelOptTable table, | ||
| OpenSearchIndex index, | ||
| OpenSearchRequestBuilder requestBuilder, | ||
| RelDataType schema) { | ||
| RelDataType schema, | ||
| PushDownContext pushDownContext) { | ||
| super(cluster, table); | ||
| this.osIndex = requireNonNull(index, "OpenSearch index"); | ||
| this.requestBuilder = requestBuilder; | ||
| this.schema = schema; | ||
| this.pushDownContext = pushDownContext; | ||
| } | ||
|
|
||
| public CalciteOpenSearchIndexScan copy() { | ||
| return new CalciteOpenSearchIndexScan( | ||
| getCluster(), table, osIndex, this.schema, pushDownContext.clone()); | ||
| } | ||
|
|
||
| public CalciteOpenSearchIndexScan copyWithNewSchema(RelDataType schema) { | ||
| // TODO: need to do deep-copy on requestBuilder in case non-idempotent push down. | ||
| return new CalciteOpenSearchIndexScan(getCluster(), table, osIndex, requestBuilder, schema); | ||
| // Do shallow copy for requestBuilder, thus requestBuilder among different plans produced in the | ||
| // optimization process won't affect each other. | ||
| return new CalciteOpenSearchIndexScan( | ||
| getCluster(), table, osIndex, schema, pushDownContext.clone()); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -85,8 +99,10 @@ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { | |
| @Override | ||
| public void register(RelOptPlanner planner) { | ||
| super.register(planner); | ||
| for (RelOptRule rule : OpenSearchIndexRules.OPEN_SEARCH_INDEX_SCAN_RULES) { | ||
| planner.addRule(rule); | ||
| if (osIndex.getSettings().getSettingValue(Settings.Key.CALCITE_PUSHDOWN_ENABLED)) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it for test purpose, right? I do not think end-user has any reasons to disable optimization. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If we do want to ensure collation of the final result, the PPL should be
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @penghuo Adding this configuration is by my suggestion. This is an advanced configuration aimed at developers. Similar to how databases typically have switches for optimization rules. Moreover, this configuration is currently very useful for our development - in some scenarios, enabling push-down might cause issues, and having this switch helps us determine whether problems are caused by push-down or other factors. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The optimal solution would be to provide a method that allows free selection of optimization rules. However, considering there aren't many custom optimization rules in the short term, adding a push-down config seems to be the most cost-effective approach. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. got it, make sense, if Settings.Key.CALCITE_PUSHDOWN_ENABLED default value is enabled, should be fine. let's consider remove it when we are confident. |
||
| for (RelOptRule rule : OpenSearchIndexRules.OPEN_SEARCH_INDEX_SCAN_RULES) { | ||
| planner.addRule(rule); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -97,15 +113,23 @@ public RelDataType deriveRowType() { | |
|
|
||
| @Override | ||
| public Result implement(EnumerableRelImplementor implementor, Prefer pref) { | ||
| // Avoid optimizing the java row type since the scan will always return an array. | ||
| /* In Calcite enumerable operators, row of single column will be optimized to a scalar value. | ||
| * See {@link PhysTypeImpl}. | ||
| * Since we need to combine this operator with their original ones, | ||
| * let's follow this convention to apply the optimization here and ensure `scan` method | ||
| * returns the correct data format for single column rows. | ||
| * See {@link OpenSearchIndexEnumerator} | ||
| */ | ||
| PhysType physType = | ||
| PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray(), false); | ||
| PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray()); | ||
|
|
||
| Expression scanOperator = implementor.stash(this, CalciteOpenSearchIndexScan.class); | ||
| return implementor.result(physType, Blocks.toBlock(Expressions.call(scanOperator, "scan"))); | ||
| } | ||
|
|
||
| public Enumerable<@Nullable Object> scan() { | ||
| OpenSearchRequestBuilder requestBuilder = osIndex.createRequestBuilder(); | ||
| pushDownContext.forEach(action -> action.apply(requestBuilder)); | ||
| return new AbstractEnumerable<>() { | ||
| @Override | ||
| public Enumerator<Object> enumerator() { | ||
|
|
@@ -118,17 +142,18 @@ public Enumerator<Object> enumerator() { | |
| }; | ||
| } | ||
|
|
||
| public boolean pushDownFilter(Filter filter) { | ||
| public CalciteOpenSearchIndexScan pushDownFilter(Filter filter) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why change the return type? seems the return value is never in use besides the null-checker in |
||
| try { | ||
| CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(filter.getRowType()); | ||
| List<String> schema = this.getRowType().getFieldNames(); | ||
| QueryBuilder filterBuilder = PredicateAnalyzer.analyze(filter.getCondition(), schema); | ||
| requestBuilder.pushDownFilter(filterBuilder); | ||
| newScan.pushDownContext.add(requestBuilder -> requestBuilder.pushDownFilter(filterBuilder)); | ||
| // TODO: handle the case where condition contains a score function | ||
| return true; | ||
| } catch (ExpressionNotAnalyzableException | PredicateAnalyzerException e) { | ||
| return newScan; | ||
| } catch (Exception e) { | ||
| LOG.warn("Cannot analyze the filter condition {}", filter.getCondition(), e); | ||
| } | ||
| return false; | ||
| return null; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -143,7 +168,19 @@ public CalciteOpenSearchIndexScan pushDownProject(List<Integer> selectedColumns) | |
| } | ||
| RelDataType newSchema = builder.build(); | ||
| CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(newSchema); | ||
| newScan.requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream()); | ||
| newScan.pushDownContext.add( | ||
| requestBuilder -> requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream())); | ||
| return newScan; | ||
| } | ||
|
|
||
| static class PushDownContext extends ArrayDeque<PushDownAction> { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add some comments about this context. such as purpose and usage |
||
| @Override | ||
| public PushDownContext clone() { | ||
| return (PushDownContext) super.clone(); | ||
| } | ||
| } | ||
|
|
||
| private interface PushDownAction { | ||
| void apply(OpenSearchRequestBuilder requestBuilder); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,7 +11,6 @@ | |
| import lombok.EqualsAndHashCode; | ||
| import lombok.ToString; | ||
| import org.apache.calcite.linq4j.Enumerator; | ||
| import org.opensearch.sql.data.model.ExprNullValue; | ||
| import org.opensearch.sql.data.model.ExprValue; | ||
| import org.opensearch.sql.opensearch.client.OpenSearchClient; | ||
| import org.opensearch.sql.opensearch.request.OpenSearchRequest; | ||
|
|
@@ -62,11 +61,13 @@ private void fetchNextBatch() { | |
|
|
||
| @Override | ||
| public Object current() { | ||
| Object[] p = | ||
| fields.stream() | ||
| .map(k -> current.tupleValue().getOrDefault(k, ExprNullValue.of()).valueForCalcite()) | ||
| .toArray(); | ||
| return p; | ||
| /* In Calcite enumerable operators, row of single column will be optimized to a scalar value. | ||
| * See {@link PhysTypeImpl} | ||
| */ | ||
| if (fields.size() == 1) { | ||
| return current.tupleValue().get(fields.getFirst()).valueForCalcite(); | ||
| } | ||
| return fields.stream().map(k -> current.tupleValue().get(k).valueForCalcite()).toArray(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The number of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've pushed the workaround code to dev branch mistakenly. could you merge with the latest branch code to resolve conflicts @qianheng-aws |
||
| } | ||
|
|
||
| @Override | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason for adding this NotNull annotation? Would be better to add this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing special, it's auto-generated by IDEA. We can remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could u elberate more on this? i do not think PPL command should preserved order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@qianheng-aws @LantaoJin
I seem, some PPL command should preserved order, for instance,
fields,take, but others not,stats.In ANSI SQL,
select a from tbl order by b, order is perserved becuase of logically processing order of select statement is acutallfrom -> select -> order.