Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,36 @@

import java.util.function.BiFunction;
import lombok.Getter;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.RelBuilder;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

public class CalcitePlanContext {

public static class OSRelBuilder extends RelBuilder {

protected OSRelBuilder(
@Nullable Context context, RelOptCluster cluster, @Nullable RelOptSchema relOptSchema) {
super(context, cluster, relOptSchema);
}
}

public FrameworkConfig config;
public CalciteConnection connection;
public final RelBuilder relBuilder;
public final ExtendedRexBuilder rexBuilder;

@Getter private boolean isResolvingJoinCondition = false;

public CalcitePlanContext(FrameworkConfig config) {
public CalcitePlanContext(FrameworkConfig config, CalciteConnection connection) {
this.config = config;
this.connection = connection;
this.relBuilder = RelBuilder.create(config);
this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder());
}
Expand All @@ -35,7 +50,8 @@ public RexNode resolveJoinCondition(
return result;
}

// for testing only
public static CalcitePlanContext create(FrameworkConfig config) {
return new CalcitePlanContext(config);
return new CalcitePlanContext(config, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelBuilder.AggCall;
import org.opensearch.sql.ast.AbstractNodeVisitor;
Expand Down Expand Up @@ -63,10 +62,6 @@ public RelNode analyze(UnresolvedPlan unresolved, CalcitePlanContext context) {
@Override
public RelNode visitRelation(Relation node, CalcitePlanContext context) {
for (QualifiedName qualifiedName : node.getQualifiedNames()) {
SchemaPlus schema = context.config.getDefaultSchema();
if (schema != null && schema.getName().equals(OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME)) {
schema.unwrap(OpenSearchSchema.class).registerTable(qualifiedName);
}
context.relBuilder.scan(qualifiedName.getParts());
}
if (node.getQualifiedNames().size() > 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,16 @@ public class OpenSearchSchema extends AbstractSchema {

private final DataSourceService dataSourceService;

private final Map<String, Table> tableMap = new HashMap<>();
private final Map<String, Table> tableMap =
new HashMap<>() {
@Override
public Table get(Object key) {
if (!super.containsKey(key)) {
registerTable(new QualifiedName((String) key));
}
return super.get(key);
}
};

public void registerTable(QualifiedName qualifiedName) {
DataSourceSchemaIdentifierNameResolver nameResolver =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.calcite.plan;

import java.lang.reflect.Type;
import org.apache.calcite.adapter.java.AbstractQueryableTable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.linq4j.Queryable;
Expand All @@ -15,16 +16,17 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.QueryableTable;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.opensearch.sql.calcite.utils.OpenSearchRelDataTypes;
import org.opensearch.sql.data.model.ExprValue;

public abstract class OpenSearchTable extends AbstractTable
implements TranslatableTable, QueryableTable, org.opensearch.sql.storage.Table {
public abstract class OpenSearchTable extends AbstractQueryableTable
implements TranslatableTable, org.opensearch.sql.storage.Table {

protected OpenSearchTable(Type elementType) {
super(elementType);
}

@Override
public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
Expand All @@ -34,6 +36,8 @@ public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
@Override
public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
final RelOptCluster cluster = context.getCluster();
// return new LogicalTableScan(cluster, cluster.traitSetOf(Convention.NONE), ImmutableList.of(),
// relOptTable);
return new OpenSearchTableScan(cluster, relOptTable, this);
}

Expand All @@ -53,5 +57,5 @@ public Expression getExpression(SchemaPlus schema, String tableName, Class clazz
return Schemas.tableExpression(schema, getElementType(), tableName, clazz);
}

public abstract Enumerable<ExprValue> search();
public abstract Enumerable<Object> search();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's reason to replace ExprValue?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will throw exception and says that ExprValue is loaded in app classloader while Object is loaded in bootstrap classloader.

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.CalciteJdbc41Factory;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.Driver;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.schema.SchemaPlus;
Expand Down Expand Up @@ -60,8 +64,22 @@ public void execute(
UnresolvedPlan plan, ResponseListener<ExecutionEngine.QueryResponse> listener) {
try {
try {
final FrameworkConfig config = buildFrameworkConfig();
final CalcitePlanContext context = new CalcitePlanContext(config);
// Use simple calcite schema since we don't compute tables in advance of the query.
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false);
CalciteJdbc41Factory factory = new CalciteJdbc41Factory();
CalciteConnection connection =
factory.newConnection(
new Driver(), factory, "", new java.util.Properties(), rootSchema, null);
final SchemaPlus defaultSchema =
connection
.getRootSchema()
.add(
OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME,
new OpenSearchSchema(dataSourceService));
// Set opensearch schema as the default schema in config, otherwise we need to explicitly
// add schema path 'OpenSearch' before the opensearch table name
final FrameworkConfig config = buildFrameworkConfig(defaultSchema);
final CalcitePlanContext context = new CalcitePlanContext(config, connection);
executePlanByCalcite(analyze(plan, context), context, listener);
} catch (Exception e) {
LOG.warn("Fallback to V2 query engine since got exception", e);
Expand Down Expand Up @@ -134,14 +152,10 @@ public RelNode analyze(UnresolvedPlan plan, CalcitePlanContext context) {
return relNodeVisitor.analyze(plan, context);
}

private FrameworkConfig buildFrameworkConfig() {
final SchemaPlus rootSchema = Frameworks.createRootSchema(true);
final SchemaPlus opensearchSchema =
rootSchema.add(
OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME, new OpenSearchSchema(dataSourceService));
private FrameworkConfig buildFrameworkConfig(SchemaPlus defaultSchema) {
return Frameworks.newConfigBuilder()
.parserConfig(SqlParser.Config.DEFAULT) // TODO check
.defaultSchema(opensearchSchema)
.defaultSchema(defaultSchema)
.traitDefs((List<RelTraitDef>) null)
.programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,29 @@

package org.opensearch.sql.opensearch.executor;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.tools.RelRunners;
import org.apache.calcite.tools.RelRunner;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.data.model.ExprStringValue;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.executor.ExecutionContext;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.ExecutionEngine.Schema.Column;
import org.opensearch.sql.executor.Explain;
import org.opensearch.sql.executor.pagination.PlanSerializer;
import org.opensearch.sql.opensearch.client.OpenSearchClient;
Expand Down Expand Up @@ -102,30 +108,48 @@ public ExplainResponseNode visitTableScan(
@Override
public void execute(
RelNode rel, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
try (PreparedStatement statement = RelRunners.run(rel)) {
Connection connection = context.connection;
try {
RelRunner runner = connection.unwrap(RelRunner.class);
PreparedStatement statement = runner.prepareStatement(rel);
ResultSet result = statement.executeQuery();
printResultSet(result);
printResultSet(result, listener);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

// for testing only
private void printResultSet(ResultSet resultSet) throws SQLException {
private void printResultSet(ResultSet resultSet, ResponseListener<QueryResponse> listener)
throws SQLException {
// Get the ResultSet metadata to know about columns
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();

List<ExprValue> values = new ArrayList<>();
// Iterate through the ResultSet
while (resultSet.next()) {
Map<String, ExprValue> row = new LinkedHashMap<String, ExprValue>();
// Loop through each column
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i);
String value = resultSet.getString(i);
System.out.println(columnName + ": " + value);

row.put(columnName, new ExprStringValue(value));
}
values.add(ExprTupleValue.fromExprValueMap(row));
System.out.println("-------------------"); // Separator between rows
}

List<Column> columns = new ArrayList<>(metaData.getColumnCount());
for (int i = 1; i <= columnCount; ++i) {
// TODO: mapping RelDataType to ExprType or deprecate ExprType
columns.add(new Column(metaData.getColumnName(i), null, ExprCoreType.STRING));
}
Schema schema = new Schema(columns);
QueryResponse response = new QueryResponse(schema, values, null);
listener.onResponse(response);
}

private RelDataType makeStruct(RelDataTypeFactory typeFactory, RelDataType type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
import java.util.Map;
import java.util.function.Function;
import lombok.RequiredArgsConstructor;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.calcite.plan.OpenSearchTable;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.opensearch.client.OpenSearchClient;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class OpenSearchIndex extends OpenSearchTable {

/** Constructor. */
public OpenSearchIndex(OpenSearchClient client, Settings settings, String indexName) {
super(null);
this.client = client;
this.settings = settings;
this.indexName = new OpenSearchRequest.IndexName(indexName);
Expand Down Expand Up @@ -190,6 +192,17 @@ public boolean isFieldTypeTolerance() {
return settings.getSettingValue(Settings.Key.FIELD_TYPE_TOLERANCE);
}

// @Override
public Enumerable<Object[]> scan(DataContext root) {
return new AbstractEnumerable<@Nullable Object[]>() {
@Override
public Enumerator<@Nullable Object[]> enumerator() {
return null;
// return search().toMap(v -> new Object[] {v});
}
};
}

@VisibleForTesting
@RequiredArgsConstructor
public static class OpenSearchDefaultImplementor extends DefaultImplementor<OpenSearchIndexScan> {
Expand Down Expand Up @@ -217,10 +230,10 @@ public PhysicalPlan visitML(LogicalML node, OpenSearchIndexScan context) {
}

@Override
public Enumerable<ExprValue> search() {
return new AbstractEnumerable<ExprValue>() {
public Enumerable<Object> search() {
return new AbstractEnumerable<Object>() {
@Override
public Enumerator<ExprValue> enumerator() {
public Enumerator<Object> enumerator() {
final int querySizeLimit = settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT);

final TimeValue cursorKeepAlive =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
import org.opensearch.sql.opensearch.response.OpenSearchResponse;

public class OpenSearchIndexEnumerator implements Enumerator<ExprValue> {
public class OpenSearchIndexEnumerator implements Enumerator<Object> {

/** OpenSearch client. */
private final OpenSearchClient client;
Expand Down Expand Up @@ -50,9 +50,9 @@ private void fetchNextBatch() {
}

@Override
public ExprValue current() {
public Object current() {
queryCount++;
return iterator.next();
return iterator.next().tupleValue().values().stream().map(ExprValue::value).toArray();
}

@Override
Expand Down
Loading