Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public enum Key {
/** Enable Calcite as execution engine */
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),
CALCITE_FALLBACK_ALLOWED("plugins.calcite.fallback.allowed"),
CALCITE_PUSHDOWN_ENABLED("plugins.calcite.pushdown.enabled"),

/** Query Settings. */
FIELD_TYPE_TOLERANCE("plugins.query.field_type_tolerance"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
import lombok.RequiredArgsConstructor;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.parser.SqlParser;
Expand Down Expand Up @@ -129,7 +133,31 @@ public void executePlanByCalcite(
RelNode plan,
CalcitePlanContext context,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
executionEngine.execute(optimize(plan), context, listener);
executionEngine.execute(convertToCalcitePlan(optimize(plan)), context, listener);
}

/**
* Convert OpenSearch Plan to Calcite Plan. Although both plans consist of Calcite RelNodes, there
* are some differences in the topological structures or semantics between them.
*
* @param osPlan Logical Plan derived from OpenSearch PPL
*/
private static RelNode convertToCalcitePlan(RelNode osPlan) {
RelNode calcitePlan = osPlan;

/* Calcite only ensures collation of the final result produced from the root sort operator.
* While we expect that the collation can be preserved through the pipes over PPL, we need to
* explicitly add a sort operator on top of the original plan
* to ensure the correct collation of the final result.
* See logic in ${@link CalcitePrepareImpl}
* For the redundant sort, we rely on Calcite optimizer to eliminate
*/
RelCollation collation = osPlan.getTraitSet().getCollation();
if (!(osPlan instanceof Sort) && collation != RelCollations.EMPTY) {
calcitePlan = LogicalSort.create(osPlan, collation, null, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for adding this NotNull annotation? Would be better to add this?

if (calcitePlan = null) return osPlan;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing special, it's auto-generated by IDEA. We can remove it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we expect that the collation can be preserved through the pipes over PPL

could u elberate more on this? i do not think PPL command should preserved order.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qianheng-aws @LantaoJin
I seem, some PPL command should preserved order, for instance, fields, take, but others not, stats.
In ANSI SQL, select a from tbl order by b, order is perserved becuase of logically processing order of select statement is acutall from -> select -> order.

}

return calcitePlan;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ private Settings defaultSettings() {
.put(Key.FIELD_TYPE_TOLERANCE, true)
.put(Key.CALCITE_ENGINE_ENABLED, true)
.put(Key.CALCITE_FALLBACK_ALLOWED, false)
.put(Key.CALCITE_PUSHDOWN_ENABLED, false)
.build();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;

import java.io.IOException;
import org.junit.Ignore;
import org.junit.jupiter.api.Test;

/** testSortXXAndXX could fail. TODO Remove this @Ignore when the issue fixed. */
@Ignore
public class CalcitePPLSortIT extends CalcitePPLIntegTestCase {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ public void onMatch(RelOptRuleCall call) {
}

protected void apply(RelOptRuleCall call, Filter filter, CalciteOpenSearchIndexScan scan) {
if (scan.pushDownFilter(filter)) {
call.transformTo(scan);
CalciteOpenSearchIndexScan newScan = scan.pushDownFilter(filter);
if (newScan != null) {
Comment on lines +45 to +46
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newScan won't be used after this check. the name newScan confused me.
could you change it to

if (scan.pushDownFilter(filter) != null) {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, it's typo. It happens the current logic works as well.

call.transformTo(scan);
  
->

call.transformTo(newScan);

call.transformTo(newScan);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<?> CALCITE_PUSHDOWN_ENABLED_SETTING =
Setting.boolSetting(
Key.CALCITE_PUSHDOWN_ENABLED.getKeyValue(),
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<?> QUERY_MEMORY_LIMIT_SETTING =
new Setting<>(
Key.QUERY_MEMORY_LIMIT.getKeyValue(),
Expand Down Expand Up @@ -302,6 +309,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.CALCITE_FALLBACK_ALLOWED,
CALCITE_FALLBACK_ALLOWED_SETTING,
new Updater(Key.CALCITE_FALLBACK_ALLOWED));
register(
settingBuilder,
clusterSettings,
Key.CALCITE_PUSHDOWN_ENABLED,
CALCITE_PUSHDOWN_ENABLED_SETTING,
new Updater(Key.CALCITE_PUSHDOWN_ENABLED));
register(
settingBuilder,
clusterSettings,
Expand Down Expand Up @@ -478,6 +491,7 @@ public static List<Setting<?>> pluginSettings() {
.add(PPL_ENABLED_SETTING)
.add(CALCITE_ENGINE_ENABLED_SETTING)
.add(CALCITE_FALLBACK_ALLOWED_SETTING)
.add(CALCITE_PUSHDOWN_ENABLED_SETTING)
.add(QUERY_MEMORY_LIMIT_SETTING)
.add(QUERY_SIZE_LIMIT_SETTING)
.add(METRICS_ROLLING_WINDOW_SETTING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class OpenSearchIndex extends OpenSearchTable {
/** OpenSearch client connection. */
@Getter private final OpenSearchClient client;

private final Settings settings;
@Getter private final Settings settings;

/** {@link OpenSearchRequest.IndexName}. */
private final OpenSearchRequest.IndexName indexName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static java.util.Objects.requireNonNull;

import java.util.ArrayDeque;
import java.util.List;
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.calcite.adapter.enumerable.PhysType;
Expand All @@ -32,20 +33,26 @@
import org.checkerframework.checker.nullness.qual.Nullable;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.sql.calcite.plan.OpenSearchTableScan;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.opensearch.planner.physical.OpenSearchIndexRules;
import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder;
import org.opensearch.sql.opensearch.request.PredicateAnalyzer;
import org.opensearch.sql.opensearch.request.PredicateAnalyzer.ExpressionNotAnalyzableException;
import org.opensearch.sql.opensearch.request.PredicateAnalyzer.PredicateAnalyzerException;
import org.opensearch.sql.opensearch.storage.OpenSearchIndex;

/** Relational expression representing a scan of an OpenSearchIndex type. */
public class CalciteOpenSearchIndexScan extends OpenSearchTableScan {
private static final Logger LOG = LogManager.getLogger(CalciteOpenSearchIndexScan.class);

private final OpenSearchIndex osIndex;
private final OpenSearchRequestBuilder requestBuilder;
// The schema of this scan operator, it's initialized with the row type of the table, but may be
// changed by push down operations.
private final RelDataType schema;
// This context maintains all the push down actions, which will be applied to the requestBuilder
// when it begins to scan data from OpenSearch.
// Because OpenSearchRequestBuilder doesn't support deep copy while we want to keep the
// requestBuilder independent among different plans produced in the optimization process,
// so we cannot apply these actions right away.
private final PushDownContext pushDownContext;

/**
* Creates an CalciteOpenSearchIndexScan.
Expand All @@ -56,24 +63,31 @@ public class CalciteOpenSearchIndexScan extends OpenSearchTableScan {
*/
public CalciteOpenSearchIndexScan(
RelOptCluster cluster, RelOptTable table, OpenSearchIndex index) {
this(cluster, table, index, index.createRequestBuilder(), table.getRowType());
this(cluster, table, index, table.getRowType(), new PushDownContext());
}

public CalciteOpenSearchIndexScan(
private CalciteOpenSearchIndexScan(
RelOptCluster cluster,
RelOptTable table,
OpenSearchIndex index,
OpenSearchRequestBuilder requestBuilder,
RelDataType schema) {
RelDataType schema,
PushDownContext pushDownContext) {
super(cluster, table);
this.osIndex = requireNonNull(index, "OpenSearch index");
this.requestBuilder = requestBuilder;
this.schema = schema;
this.pushDownContext = pushDownContext;
}

public CalciteOpenSearchIndexScan copy() {
return new CalciteOpenSearchIndexScan(
getCluster(), table, osIndex, this.schema, pushDownContext.clone());
}

public CalciteOpenSearchIndexScan copyWithNewSchema(RelDataType schema) {
// TODO: need to do deep-copy on requestBuilder in case non-idempotent push down.
return new CalciteOpenSearchIndexScan(getCluster(), table, osIndex, requestBuilder, schema);
// Do shallow copy for requestBuilder, thus requestBuilder among different plans produced in the
// optimization process won't affect each other.
return new CalciteOpenSearchIndexScan(
getCluster(), table, osIndex, schema, pushDownContext.clone());
}

@Override
Expand All @@ -85,8 +99,10 @@ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
@Override
public void register(RelOptPlanner planner) {
super.register(planner);
for (RelOptRule rule : OpenSearchIndexRules.OPEN_SEARCH_INDEX_SCAN_RULES) {
planner.addRule(rule);
if (osIndex.getSettings().getSettingValue(Settings.Key.CALCITE_PUSHDOWN_ENABLED)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it for test purpose, right? I do not think end-user has any reasons to disable optimization.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. For example, PPL source=table | sort a | field a will have plan scan -> sort -> project, we expect the final result should have collation on column a.
    However, calcite won't ensure that. If we translate the above PPL or plan into SQL, it should be select a from (select * from table order by a). In Calcite, it doesn't ensure collation inner a subquery and will have several optimization to remove the sort operator.

If we do want to ensure collation of the final result, the PPL should be source=table | field a | sort a, which can be translated to SQL select a from table order by a. That's why we need add sort operator to ensure collation and rely on optimizer to eliminate the redundant sort for the consideration of simplicity.

  1. Yeah, I think so. It's per @LantaoJin's suggestion.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. And it allows us to disable the push down feature quickly without rollback if there are some unexpected issues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@penghuo Adding this configuration is by my suggestion. This is an advanced configuration aimed at developers. Similar to how databases typically have switches for optimization rules. Moreover, this configuration is currently very useful for our development - in some scenarios, enabling push-down might cause issues, and having this switch helps us determine whether problems are caused by push-down or other factors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The optimal solution would be to provide a method that allows free selection of optimization rules. However, considering there aren't many custom optimization rules in the short term, adding a push-down config seems to be the most cost-effective approach.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, make sense, if Settings.Key.CALCITE_PUSHDOWN_ENABLED default value is enabled, should be fine. let's consider remove it when we are confident.

for (RelOptRule rule : OpenSearchIndexRules.OPEN_SEARCH_INDEX_SCAN_RULES) {
planner.addRule(rule);
}
}
}

Expand All @@ -97,15 +113,23 @@ public RelDataType deriveRowType() {

@Override
public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
// Avoid optimizing the java row type since the scan will always return an array.
/* In Calcite enumerable operators, row of single column will be optimized to a scalar value.
* See {@link PhysTypeImpl}.
* Since we need to combine this operator with their original ones,
* let's follow this convention to apply the optimization here and ensure `scan` method
* returns the correct data format for single column rows.
* See {@link OpenSearchIndexEnumerator}
*/
PhysType physType =
PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray(), false);
PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray());

Expression scanOperator = implementor.stash(this, CalciteOpenSearchIndexScan.class);
return implementor.result(physType, Blocks.toBlock(Expressions.call(scanOperator, "scan")));
}

public Enumerable<@Nullable Object> scan() {
OpenSearchRequestBuilder requestBuilder = osIndex.createRequestBuilder();
pushDownContext.forEach(action -> action.apply(requestBuilder));
return new AbstractEnumerable<>() {
@Override
public Enumerator<Object> enumerator() {
Expand All @@ -118,17 +142,18 @@ public Enumerator<Object> enumerator() {
};
}

public boolean pushDownFilter(Filter filter) {
public CalciteOpenSearchIndexScan pushDownFilter(Filter filter) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change the return type? seems the return value is never in use besides the null-checker in OpenSearchFilterIndexScanRule

try {
CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(filter.getRowType());
List<String> schema = this.getRowType().getFieldNames();
QueryBuilder filterBuilder = PredicateAnalyzer.analyze(filter.getCondition(), schema);
requestBuilder.pushDownFilter(filterBuilder);
newScan.pushDownContext.add(requestBuilder -> requestBuilder.pushDownFilter(filterBuilder));
// TODO: handle the case where condition contains a score function
return true;
} catch (ExpressionNotAnalyzableException | PredicateAnalyzerException e) {
return newScan;
} catch (Exception e) {
LOG.warn("Cannot analyze the filter condition {}", filter.getCondition(), e);
}
return false;
return null;
}

/**
Expand All @@ -143,7 +168,19 @@ public CalciteOpenSearchIndexScan pushDownProject(List<Integer> selectedColumns)
}
RelDataType newSchema = builder.build();
CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(newSchema);
newScan.requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream());
newScan.pushDownContext.add(
requestBuilder -> requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream()));
return newScan;
}

static class PushDownContext extends ArrayDeque<PushDownAction> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comments about this context. such as purpose and usage

@Override
public PushDownContext clone() {
return (PushDownContext) super.clone();
}
}

private interface PushDownAction {
void apply(OpenSearchRequestBuilder requestBuilder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.calcite.linq4j.Enumerator;
import org.opensearch.sql.data.model.ExprNullValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.opensearch.client.OpenSearchClient;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
Expand Down Expand Up @@ -62,11 +61,13 @@ private void fetchNextBatch() {

@Override
public Object current() {
Object[] p =
fields.stream()
.map(k -> current.tupleValue().getOrDefault(k, ExprNullValue.of()).valueForCalcite())
.toArray();
return p;
/* In Calcite enumerable operators, row of single column will be optimized to a scalar value.
* See {@link PhysTypeImpl}
*/
if (fields.size() == 1) {
return current.tupleValue().get(fields.getFirst()).valueForCalcite();
}
return fields.stream().map(k -> current.tupleValue().get(k).valueForCalcite()).toArray();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of fields here has been reduced to the actual number of outputs, right? So get(k) won't return null anymore

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pushed the workaround code to dev branch mistakenly. could you merge with the latest branch code to resolve conflicts @qianheng-aws

}

@Override
Expand Down
Loading