Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,15 @@ allprojects {
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib:1.9.10"
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-common:1.9.10"
resolutionStrategy.force "net.bytebuddy:byte-buddy:1.14.9"
resolutionStrategy.force "org.apache.httpcomponents.client5:httpclient5:5.3.1"
resolutionStrategy.force 'org.apache.httpcomponents.core5:httpcore5:5.2.5'
resolutionStrategy.force 'org.apache.httpcomponents.core5:httpcore5-h2:5.2.5'
resolutionStrategy.force 'com.fasterxml.jackson.core:jackson-annotations:2.17.2'
resolutionStrategy.force 'com.fasterxml.jackson:jackson-bom:2.17.2'
resolutionStrategy.force "org.apache.httpcomponents.client5:httpclient5:${versions.httpclient5}"
resolutionStrategy.force "org.apache.httpcomponents.core5:httpcore5:${versions.httpcore5}"
resolutionStrategy.force "org.apache.httpcomponents.core5:httpcore5-h2:${versions.httpcore5}"
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${versions.jackson}"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}"
resolutionStrategy.force 'com.google.protobuf:protobuf-java:3.25.5'
resolutionStrategy.force 'org.locationtech.jts:jts-core:1.19.0'
resolutionStrategy.force 'com.google.errorprone:error_prone_annotations:2.28.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public enum Key {

/** Enable Calcite as execution engine */
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),
CALCITE_FALLBACK_ALLOWED("plugins.calcite.fallback.allowed"),

/** Query Settings. */
FIELD_TYPE_TOLERANCE("plugins.query.field_type_tolerance"),
Expand Down
5 changes: 2 additions & 3 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,14 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.datasource.model.DataSource',
'org.opensearch.sql.datasource.model.DataSourceStatus',
'org.opensearch.sql.datasource.model.DataSourceType',
'org.opensearch.sql.executor.ExecutionEngine'
]
limit {
counter = 'LINE'
minimum = 0.5 // calcite dev only
minimum = 0.0 // calcite dev only
}
limit {
counter = 'BRANCH'
minimum = 0.5 // calcite dev only
minimum = 0.0 // calcite dev only
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,29 @@

package org.opensearch.sql.calcite;

import java.sql.Connection;
import java.util.function.BiFunction;
import lombok.Getter;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.RelBuilder;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;

public class CalcitePlanContext {

public FrameworkConfig config;
public final Connection connection;
public final RelBuilder relBuilder;
public final ExtendedRexBuilder rexBuilder;

@Getter private boolean isResolvingJoinCondition = false;

public CalcitePlanContext(FrameworkConfig config) {
private CalcitePlanContext(FrameworkConfig config, JavaTypeFactory typeFactory) {
this.config = config;
this.relBuilder = RelBuilder.create(config);
this.connection = CalciteToolsHelper.connect(config, typeFactory);
this.relBuilder = CalciteToolsHelper.create(config, typeFactory, connection);
this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder());
}

Expand All @@ -35,8 +40,11 @@ public RexNode resolveJoinCondition(
return result;
}

// for testing only
public static CalcitePlanContext create(FrameworkConfig config) {
return new CalcitePlanContext(config);
return new CalcitePlanContext(config, null);
}

public static CalcitePlanContext create(FrameworkConfig config, JavaTypeFactory typeFactory) {
return new CalcitePlanContext(config, typeFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,14 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) {
@Override
public RelNode visitProject(Project node, CalcitePlanContext context) {
visitChildren(node, context);
List<RexNode> projectList =
node.getProjectList().stream()
.filter(expr -> !(expr instanceof AllFields))
.map(expr -> rexVisitor.analyze(expr, context))
.collect(Collectors.toList());
if (projectList.isEmpty()) {
List<RexNode> projectList;
if (node.getProjectList().stream().anyMatch(e -> e instanceof AllFields)) {
return context.relBuilder.peek();
} else {
projectList =
node.getProjectList().stream()
.map(expr -> rexVisitor.analyze(expr, context))
.collect(Collectors.toList());
}
if (node.isExcluded()) {
context.relBuilder.projectExcept(projectList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.TranslatableTable;
import org.opensearch.sql.calcite.utils.OpenSearchRelDataTypes;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;

public abstract class OpenSearchTable extends AbstractQueryableTable
implements TranslatableTable, org.opensearch.sql.storage.Table {
Expand All @@ -27,7 +27,7 @@ protected OpenSearchTable(Type elementType) {

@Override
public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
return OpenSearchRelDataTypes.convertSchema(this);
return OpenSearchTypeFactory.convertSchema(this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add calcite's license header after ours since we copy code from that repo.

There is similar example in

*/

/*
* This file contains code from the Apache Calcite project (original license below).
* It contains modifications, which are licensed as above:
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.opensearch.sql.calcite.utils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaFactory;
import org.apache.calcite.avatica.UnregisteredDriver;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.interpreter.Bindables;
import org.apache.calcite.jdbc.CalciteFactory;
import org.apache.calcite.jdbc.CalciteJdbc41Factory;
import org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.Driver;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.prepare.CalcitePrepareImpl;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.server.CalciteServerStatement;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelRunner;
import org.apache.calcite.util.Util;
import org.opensearch.sql.calcite.CalcitePlanContext;

/**
* Calcite Tools Helper. This class is used to create customized: 1. Connection 2. JavaTypeFactory
* 3. RelBuilder 4. RelRunner TODO delete it in future if possible.
*/
public class CalciteToolsHelper {

/** Create a RelBuilder with testing */
public static RelBuilder create(FrameworkConfig config) {
return RelBuilder.create(config);
}

/** Create a RelBuilder with typeFactory */
public static RelBuilder create(
FrameworkConfig config, JavaTypeFactory typeFactory, Connection connection) {
return withPrepare(
config,
typeFactory,
connection,
(cluster, relOptSchema, rootSchema, statement) ->
new OpenSearchRelBuilder(config.getContext(), cluster, relOptSchema));
}

public static Connection connect(FrameworkConfig config, JavaTypeFactory typeFactory) {
final Properties info = new Properties();
if (config.getTypeSystem() != RelDataTypeSystem.DEFAULT) {
info.setProperty(
CalciteConnectionProperty.TYPE_SYSTEM.camelName(),
config.getTypeSystem().getClass().getName());
}
try {
return new OpenSearchDriver().connect("jdbc:calcite:", info, null, typeFactory);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

/**
* This method copied from {@link Frameworks#withPrepare(FrameworkConfig,
* Frameworks.BasePrepareAction)}. The purpose is the method {@link
* CalciteFactory#newConnection(UnregisteredDriver, AvaticaFactory, String, Properties)} create
* connection with null instance of JavaTypeFactory. So we add a parameter JavaTypeFactory.
*/
private static <R> R withPrepare(
FrameworkConfig config,
JavaTypeFactory typeFactory,
Connection connection,
Frameworks.BasePrepareAction<R> action) {
try {
final Properties info = new Properties();
if (config.getTypeSystem() != RelDataTypeSystem.DEFAULT) {
info.setProperty(
CalciteConnectionProperty.TYPE_SYSTEM.camelName(),
config.getTypeSystem().getClass().getName());
}
final CalciteServerStatement statement =
connection.createStatement().unwrap(CalciteServerStatement.class);
return new OpenSearchPrepareImpl().perform(statement, config, typeFactory, action);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static class OpenSearchDriver extends Driver {

public Connection connect(
String url, Properties info, CalciteSchema rootSchema, JavaTypeFactory typeFactory)
throws SQLException {
CalciteJdbc41Factory factory = new CalciteJdbc41Factory();
AvaticaConnection connection =
factory.newConnection((Driver) this, factory, url, info, rootSchema, typeFactory);
this.handler.onConnectionInit(connection);
return connection;
}
}

/** do nothing, just extend for a public construct for new */
public static class OpenSearchRelBuilder extends RelBuilder {
public OpenSearchRelBuilder(Context context, RelOptCluster cluster, RelOptSchema relOptSchema) {
super(context, cluster, relOptSchema);
}
}

public static class OpenSearchPrepareImpl extends CalcitePrepareImpl {
/**
* Similar to {@link CalcitePrepareImpl#perform(CalciteServerStatement, FrameworkConfig,
* Frameworks.BasePrepareAction)}, but with a custom typeFactory.
*/
public <R> R perform(
CalciteServerStatement statement,
FrameworkConfig config,
JavaTypeFactory typeFactory,
Frameworks.BasePrepareAction<R> action) {
final CalcitePrepare.Context prepareContext = statement.createPrepareContext();
SchemaPlus defaultSchema = config.getDefaultSchema();
final CalciteSchema schema =
defaultSchema != null
? CalciteSchema.from(defaultSchema)
: prepareContext.getRootSchema();
CalciteCatalogReader catalogReader =
new CalciteCatalogReader(
schema.root(), schema.path(null), typeFactory, prepareContext.config());
final RexBuilder rexBuilder = new RexBuilder(typeFactory);
final RelOptPlanner planner =
createPlanner(prepareContext, config.getContext(), config.getCostFactory());
final RelOptCluster cluster = createCluster(planner, rexBuilder);
return action.apply(cluster, catalogReader, prepareContext.getRootSchema().plus(), statement);
}
}

public static class OpenSearchRelRunners {
/**
* Runs a relational expression by existing connection. This class copied from {@link
* org.apache.calcite.tools.RelRunners#run(RelNode)}
*/
public static PreparedStatement run(CalcitePlanContext context, RelNode rel) {
final RelShuttle shuttle =
new RelHomogeneousShuttle() {
@Override
public RelNode visit(TableScan scan) {
final RelOptTable table = scan.getTable();
if (scan instanceof LogicalTableScan
&& Bindables.BindableTableScan.canHandle(table)) {
// Always replace the LogicalTableScan with BindableTableScan
// because it's implementation does not require a "schema" as context.
return Bindables.BindableTableScan.create(scan.getCluster(), table);
}
return super.visit(scan);
}
};
rel = rel.accept(shuttle);
// the line we changed here
try (Connection connection = context.connection) {
final RelRunner runner = connection.unwrap(RelRunner.class);
return runner.prepareStatement(rel);
} catch (SQLException e) {
throw Util.throwAsRuntime(e);
}
}
}
}
Loading
Loading