Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public enum Key {
/** PPL Settings. */
PPL_ENABLED("plugins.ppl.enabled"),

/** Enable Calcite as execution engine */
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),

/** Query Settings. */
FIELD_TYPE_TOLERANCE("plugins.query.field_type_tolerance"),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,13 @@
import java.util.function.BiFunction;
import lombok.Getter;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.RelBuilder;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

public class CalcitePlanContext {

public static class OSRelBuilder extends RelBuilder {

protected OSRelBuilder(
@Nullable Context context, RelOptCluster cluster, @Nullable RelOptSchema relOptSchema) {
super(context, cluster, relOptSchema);
}
}

public FrameworkConfig config;
public CalciteConnection connection;
public final RelBuilder relBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

package org.opensearch.sql.calcite.plan;

import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.impl.AbstractTableQueryable;

/** not in use now */
public class OpenSearchQueryable<T> extends AbstractTableQueryable<T> {

OpenSearchQueryable(
Expand All @@ -19,6 +21,12 @@ public class OpenSearchQueryable<T> extends AbstractTableQueryable<T> {

@Override
public Enumerator<T> enumerator() {
throw new UnsupportedOperationException("enumerator");
//noinspection unchecked
final Enumerable<T> enumerable = (Enumerable<T>) getTable().search();
return enumerable.enumerator();
}

private OpenSearchTable getTable() {
return (OpenSearchTable) table;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
@Override
public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
final RelOptCluster cluster = context.getCluster();
// return new LogicalTableScan(cluster, cluster.traitSetOf(Convention.NONE), ImmutableList.of(),
// relOptTable);
return new OpenSearchTableScan(cluster, relOptTable, this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public static RelDataType convertSchemaField(ExprType fieldType, boolean nullabl
} else {
if (fieldType.legacyTypeName().equalsIgnoreCase("binary")) {
return TYPE_FACTORY.createSqlType(SqlTypeName.BINARY, nullable);
} else if (fieldType.legacyTypeName().equalsIgnoreCase("timestamp")) {
return TYPE_FACTORY.createSqlType(SqlTypeName.TIMESTAMP, nullable);
} else if (fieldType.legacyTypeName().equalsIgnoreCase("geo_point")) {
return TYPE_FACTORY.createSqlType(SqlTypeName.GEOMETRY, nullable);
} else if (fieldType.legacyTypeName().equalsIgnoreCase("text")) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
import org.apache.calcite.sql.type.SqlTypeName;

public class OpenSearchTypeSystem extends RelDataTypeSystemImpl {
public static final RelDataTypeSystem INSTANCE = new OpenSearchTypeSystem();

private OpenSearchTypeSystem() {}

public RelDataType deriveAvgAggType(RelDataTypeFactory typeFactory, RelDataType argumentType) {
switch (argumentType.getSqlTypeName()) {
case INTEGER:
case BIGINT:
return typeFactory.createSqlType(SqlTypeName.DOUBLE);
Comment on lines +21 to +23
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a bug in Calcite? if the argumentType is INTEGER, deriveAvgAggType is INTEGER?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, for example table1:

id
1
2
3
4

select avg(id) from table1 returns 2 instead of 2.5.

Not sure it is a bug or by designed. From semantic purpose, query avg on an id column is semantic wrong. The column which could be applied avg should be designed as decimal number. Will open a Calcite issue for checking.


default:
return super.deriveSumType(typeFactory, argumentType);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why default to deriveSumType instead of deriveAvgAggType?

}
}
}
75 changes: 49 additions & 26 deletions core/src/main/java/org/opensearch/sql/executor/QueryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.sql.executor;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
Expand All @@ -17,6 +19,7 @@
import org.apache.calcite.jdbc.Driver;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.tools.FrameworkConfig;
Expand All @@ -31,6 +34,7 @@
import org.opensearch.sql.calcite.CalciteRelNodeVisitor;
import org.opensearch.sql.calcite.OpenSearchSchema;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.Planner;
Expand All @@ -53,6 +57,8 @@ public class QueryService {

private DataSourceService dataSourceService;

private Settings settings;

/**
* Execute the {@link UnresolvedPlan}, using {@link ResponseListener} to get response.<br>
* Todo. deprecated this interface after finalize {@link PlanContext}.
Expand All @@ -63,27 +69,47 @@ public class QueryService {
public void execute(
UnresolvedPlan plan, ResponseListener<ExecutionEngine.QueryResponse> listener) {
try {
try {
// Use simple calcite schema since we don't compute tables in advance of the query.
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false);
CalciteJdbc41Factory factory = new CalciteJdbc41Factory();
CalciteConnection connection =
factory.newConnection(
new Driver(), factory, "", new java.util.Properties(), rootSchema, null);
final SchemaPlus defaultSchema =
connection
.getRootSchema()
.add(
OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME,
new OpenSearchSchema(dataSourceService));
// Set opensearch schema as the default schema in config, otherwise we need to explicitly
// add schema path 'OpenSearch' before the opensearch table name
final FrameworkConfig config = buildFrameworkConfig(defaultSchema);
final CalcitePlanContext context = new CalcitePlanContext(config, connection);
executePlanByCalcite(analyze(plan, context), context, listener);
} catch (Exception e) {
LOG.warn("Fallback to V2 query engine since got exception", e);
boolean calciteEnabled = false;
if (settings != null) {
calciteEnabled = settings.getSettingValue(Settings.Key.CALCITE_ENGINE_ENABLED);
}
if (!calciteEnabled || relNodeVisitor == null) {
executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener);
} else {
try {
AccessController.doPrivileged(
(PrivilegedAction<Void>)
() -> {
// Use simple calcite schema since we don't compute tables in advance of the
// query.
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false);
CalciteJdbc41Factory factory = new CalciteJdbc41Factory();
CalciteConnection connection =
factory.newConnection(
new Driver(),
factory,
"",
new java.util.Properties(),
rootSchema,
null);
final SchemaPlus defaultSchema =
connection
.getRootSchema()
.add(
OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME,
new OpenSearchSchema(dataSourceService));
// Set opensearch schema as the default schema in config, otherwise we need to
// explicitly
// add schema path 'OpenSearch' before the opensearch table name
final FrameworkConfig config = buildFrameworkConfig(defaultSchema);
final CalcitePlanContext context = new CalcitePlanContext(config, connection);
executePlanByCalcite(analyze(plan, context), context, listener);
return null;
});
} catch (Exception e) {
LOG.warn("Fallback to V2 query engine since got exception", e);
executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener);
}
}
} catch (Exception e) {
listener.onFailure(e);
Expand Down Expand Up @@ -120,11 +146,7 @@ public void executePlanByCalcite(
RelNode plan,
CalcitePlanContext context,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
try {
executionEngine.execute(optimize(plan), context, listener);
} catch (Exception e) {
listener.onFailure(e);
}
executionEngine.execute(optimize(plan), context, listener);
}

/**
Expand Down Expand Up @@ -157,7 +179,8 @@ private FrameworkConfig buildFrameworkConfig(SchemaPlus defaultSchema) {
.parserConfig(SqlParser.Config.DEFAULT) // TODO check
.defaultSchema(defaultSchema)
.traitDefs((List<RelTraitDef>) null)
.programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2))
.programs(Programs.calc(DefaultRelMetadataProvider.INSTANCE))
.typeSystem(OpenSearchTypeSystem.INSTANCE)
.build();
}

Expand Down
7 changes: 6 additions & 1 deletion integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ String baseVersion = "2.17.0"
String bwcVersion = baseVersion + ".0";
String baseName = "sqlBwcCluster"
String bwcFilePath = "src/test/resources/bwc/"
String calciteCodegen = "$projectDir/src/test/java/codegen/"

repositories {
mavenCentral()
Expand Down Expand Up @@ -427,7 +428,11 @@ integTest {
finalizedBy stopPrometheus
}

systemProperty 'java.security.manager', 'disallow'
// enable calcite codegen in IT
systemProperty 'calcite.debug', 'false'
systemProperty 'org.codehaus.janino.source_debugging.enable', 'false'
systemProperty 'org.codehaus.janino.source_debugging.dir', calciteCodegen

systemProperty 'tests.security.manager', 'false'
systemProperty('project.root', project.projectDir.absolutePath)

Expand Down
Loading
Loading