Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ static RelBuilder.AggCall translate(
// case STDDEV:
// return context.relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV,
// field);
case VARSAMP:
return context.relBuilder.aggregateCall(SqlStdOperatorTable.VAR_SAMP, field);
case VARPOP:
return context.relBuilder.aggregateCall(SqlStdOperatorTable.VAR_POP, field);
case STDDEV_POP:
return context.relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV_POP, field);
case STDDEV_SAMP:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,21 @@ public void testApproxCountDistinct() {
"source=%s | stats distinct_count_approx(state) by gender", TEST_INDEX_BANK));
}

@Test
public void testVarSampVarPop() {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats var_samp(balance) as vs, var_pop(balance) as vp by gender",
TEST_INDEX_BANK));
verifySchema(
actual, schema("gender", "string"), schema("vs", "double"), schema("vp", "double"));
verifyDataRows(
actual,
rows("F", 58127404, 38751602.666666664),
rows("M", 261699024.91666666, 196274268.6875));
}

@Test
public void testStddevSampStddevPop() {
JSONObject actual =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalTableScan;
import org.opensearch.sql.opensearch.storage.scan.CalciteOpenSearchIndexScan;
import org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableIndexScan;
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;

/** Rule to convert a {@link CalciteLogicalTableScan} to a {@link CalciteOpenSearchIndexScan}. */
/** Rule to convert a {@link CalciteLogicalIndexScan} to a {@link CalciteEnumerableIndexScan}. */
public class EnumerableIndexScanRule extends ConverterRule {
/** Default configuration. */
public static final Config DEFAULT_CONFIG =
Config.INSTANCE
.as(Config.class)
.withConversion(
CalciteLogicalTableScan.class,
CalciteLogicalIndexScan.class,
s -> s.getOsIndex() != null,
Convention.NONE,
EnumerableConvention.INSTANCE,
Expand All @@ -34,13 +34,19 @@ protected EnumerableIndexScanRule(Config config) {

@Override
public boolean matches(RelOptRuleCall call) {
CalciteLogicalTableScan scan = call.rel(0);
CalciteLogicalIndexScan scan = call.rel(0);
return scan.getVariablesSet().isEmpty();
}

@Override
public RelNode convert(RelNode rel) {
final CalciteLogicalTableScan scan = (CalciteLogicalTableScan) rel;
return new CalciteOpenSearchIndexScan(scan.getCluster(), scan.getTable(), scan.getOsIndex());
final CalciteLogicalIndexScan scan = (CalciteLogicalIndexScan) rel;
return new CalciteEnumerableIndexScan(
scan.getCluster(),
scan.getHints(),
scan.getTable(),
scan.getOsIndex(),
scan.getSchema(),
scan.getPushDownContext());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.sql.opensearch.planner.physical;

import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.immutables.value.Value;
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;

/** Planner rule that push a {@link LogicalAggregate} down to {@link CalciteLogicalIndexScan} */
@Value.Enclosing
public class OpenSearchAggregateIndexScanRule
extends RelRule<OpenSearchAggregateIndexScanRule.Config> {

/** Creates a OpenSearchAggregateIndexScanRule. */
protected OpenSearchAggregateIndexScanRule(Config config) {
super(config);
}

@Override
public void onMatch(RelOptRuleCall call) {
if (call.rels.length == 2) {
// the ordinary variant
final LogicalAggregate aggregate = call.rel(0);
final CalciteLogicalIndexScan scan = call.rel(1);
apply(call, aggregate, scan);
} else {
throw new AssertionError(
String.format(
"The length of rels should be %s but got %s",
this.operands.size(), call.rels.length));
}
}

protected void apply(
RelOptRuleCall call, LogicalAggregate aggregate, CalciteLogicalIndexScan scan) {
CalciteLogicalIndexScan newScan = scan.pushDownAggregate(aggregate);
if (newScan != null) {
call.transformTo(newScan);
}
}

/** Rule configuration. */
@Value.Immutable
public interface Config extends RelRule.Config {
/** Config that matches Aggregate on OpenSearchProjectIndexScanRule. */
Config DEFAULT =
ImmutableOpenSearchAggregateIndexScanRule.Config.builder()
.build()
.withOperandSupplier(
b0 ->
b0.operand(LogicalAggregate.class)
.oneInput(
b1 ->
b1.operand(CalciteLogicalIndexScan.class)
.predicate(OpenSearchIndexScanRule::test)
.noInputs()));

@Override
default OpenSearchAggregateIndexScanRule toRule() {
return new OpenSearchAggregateIndexScanRule(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@
package org.opensearch.sql.opensearch.planner.physical;

import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.immutables.value.Value;
import org.opensearch.sql.opensearch.storage.OpenSearchIndex;
import org.opensearch.sql.opensearch.storage.scan.CalciteOpenSearchIndexScan;
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;

/** Planner rule that push a {@link Filter} down to {@link CalciteOpenSearchIndexScan} */
/** Planner rule that push a {@link LogicalFilter} down to {@link CalciteLogicalIndexScan} */
@Value.Enclosing
public class OpenSearchFilterIndexScanRule extends RelRule<OpenSearchFilterIndexScanRule.Config> {

Expand All @@ -21,17 +20,12 @@ protected OpenSearchFilterIndexScanRule(Config config) {
super(config);
}

protected static boolean test(CalciteOpenSearchIndexScan scan) {
final RelOptTable table = scan.getTable();
return table.unwrap(OpenSearchIndex.class) != null;
}

@Override
public void onMatch(RelOptRuleCall call) {
if (call.rels.length == 2) {
// the ordinary variant
final Filter filter = call.rel(0);
final CalciteOpenSearchIndexScan scan = call.rel(1);
final LogicalFilter filter = call.rel(0);
final CalciteLogicalIndexScan scan = call.rel(1);
apply(call, filter, scan);
} else {
throw new AssertionError(
Expand All @@ -41,8 +35,8 @@ public void onMatch(RelOptRuleCall call) {
}
}

protected void apply(RelOptRuleCall call, Filter filter, CalciteOpenSearchIndexScan scan) {
CalciteOpenSearchIndexScan newScan = scan.pushDownFilter(filter);
protected void apply(RelOptRuleCall call, Filter filter, CalciteLogicalIndexScan scan) {
CalciteLogicalIndexScan newScan = scan.pushDownFilter(filter);
if (newScan != null) {
call.transformTo(newScan);
}
Expand All @@ -51,17 +45,17 @@ protected void apply(RelOptRuleCall call, Filter filter, CalciteOpenSearchIndexS
/** Rule configuration. */
@Value.Immutable
public interface Config extends RelRule.Config {
/** Config that matches Filter on CalciteOpenSearchIndexScan. */
/** Config that matches Filter on CalciteLogicalIndexScan. */
Config DEFAULT =
ImmutableOpenSearchFilterIndexScanRule.Config.builder()
.build()
.withOperandSupplier(
b0 ->
b0.operand(Filter.class)
b0.operand(LogicalFilter.class)
.oneInput(
b1 ->
b1.operand(CalciteOpenSearchIndexScan.class)
.predicate(OpenSearchFilterIndexScanRule::test)
b1.operand(CalciteLogicalIndexScan.class)
.predicate(OpenSearchIndexScanRule::test)
.noInputs()));

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ public class OpenSearchIndexRules {
OpenSearchProjectIndexScanRule.Config.DEFAULT.toRule();
private static final OpenSearchFilterIndexScanRule FILTER_INDEX_SCAN =
OpenSearchFilterIndexScanRule.Config.DEFAULT.toRule();
private static final OpenSearchAggregateIndexScanRule AGGREGATE_INDEX_SCAN =
OpenSearchAggregateIndexScanRule.Config.DEFAULT.toRule();

public static final List<RelOptRule> OPEN_SEARCH_INDEX_SCAN_RULES =
ImmutableList.of(PROJECT_INDEX_SCAN, FILTER_INDEX_SCAN);
ImmutableList.of(PROJECT_INDEX_SCAN, FILTER_INDEX_SCAN, AGGREGATE_INDEX_SCAN);

// prevent instantiation
private OpenSearchIndexRules() {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.opensearch.sql.opensearch.planner.physical;

import org.apache.calcite.plan.RelOptTable;
import org.opensearch.sql.opensearch.storage.OpenSearchIndex;
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;

public interface OpenSearchIndexScanRule {

// CalciteOpenSearchIndexScan doesn't allow push-down anymore (except Sort under some strict
// condition) after Aggregate push-down.
static boolean test(CalciteLogicalIndexScan scan) {
if (scan.getPushDownContext().isAggregatePushed()) return false;
final RelOptTable table = scan.getTable();
return table.unwrap(OpenSearchIndex.class) != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@

import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.adapter.enumerable.EnumerableProject;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
Expand All @@ -20,9 +20,9 @@
import org.apache.calcite.util.mapping.Mappings;
import org.immutables.value.Value;
import org.opensearch.sql.opensearch.storage.OpenSearchIndex;
import org.opensearch.sql.opensearch.storage.scan.CalciteOpenSearchIndexScan;
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;

/** Planner rule that push a {@link Project} down to {@link CalciteOpenSearchIndexScan} */
/** Planner rule that push a {@link EnumerableProject} down to {@link CalciteLogicalIndexScan} */
@Value.Enclosing
public class OpenSearchProjectIndexScanRule extends RelRule<OpenSearchProjectIndexScanRule.Config> {

Expand All @@ -31,17 +31,12 @@ protected OpenSearchProjectIndexScanRule(Config config) {
super(config);
}

protected static boolean test(CalciteOpenSearchIndexScan scan) {
final RelOptTable table = scan.getTable();
return table.unwrap(OpenSearchIndex.class) != null;
}

@Override
public void onMatch(RelOptRuleCall call) {
if (call.rels.length == 2) {
// the ordinary variant
final Project project = call.rel(0);
final CalciteOpenSearchIndexScan scan = call.rel(1);
final EnumerableProject project = call.rel(0);
final CalciteLogicalIndexScan scan = call.rel(1);
apply(call, project, scan);
} else {
throw new AssertionError(
Expand All @@ -51,10 +46,13 @@ public void onMatch(RelOptRuleCall call) {
}
}

protected void apply(RelOptRuleCall call, Project project, CalciteOpenSearchIndexScan scan) {
protected void apply(
RelOptRuleCall call, EnumerableProject project, CalciteLogicalIndexScan scan) {
final RelOptTable table = scan.getTable();
requireNonNull(table.unwrap(OpenSearchIndex.class));

// TODO: support script pushdown for project instead of only reference
// https://github.com/opensearch-project/sql/issues/3387
final List<Integer> selectedColumns = new ArrayList<>();
final RexVisitorImpl<Void> visitor =
new RexVisitorImpl<Void>(true) {
Expand All @@ -70,7 +68,7 @@ public Void visitInputRef(RexInputRef inputRef) {
// Only do push down when an actual projection happens
if (!selectedColumns.isEmpty() && selectedColumns.size() != scan.getRowType().getFieldCount()) {
Mapping mapping = Mappings.target(selectedColumns, scan.getRowType().getFieldCount());
CalciteOpenSearchIndexScan newScan = scan.pushDownProject(selectedColumns);
CalciteLogicalIndexScan newScan = scan.pushDownProject(selectedColumns);
final List<RexNode> newProjectRexNodes = RexUtil.apply(mapping, project.getProjects());

if (RexUtil.isIdentity(newProjectRexNodes, newScan.getRowType())) {
Expand All @@ -90,11 +88,11 @@ public interface Config extends RelRule.Config {
.build()
.withOperandSupplier(
b0 ->
b0.operand(Project.class)
b0.operand(EnumerableProject.class)
.oneInput(
b1 ->
b1.operand(CalciteOpenSearchIndexScan.class)
.predicate(OpenSearchProjectIndexScanRule::test)
b1.operand(CalciteLogicalIndexScan.class)
.predicate(OpenSearchIndexScanRule::test)
.noInputs()));

@Override
Expand Down
Loading
Loading