Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ public RelOptTable getTableForMember(List<String> names) {
throw new SqlException(Common.INTERNAL_ERR, "Expected name of exactly two parts, but was " + names);
}

SchemaPlus schema = root.getSubSchema(names.get(0));
SchemaPlus schema = root.subSchemas().get(names.get(0));

if (schema == null) {
throw new SqlException(Common.INTERNAL_ERR, "Schema with name \"" + names.get(0) + "\" not found");
}

Table table = schema.getTable(names.get(1));
Table table = schema.tables().get(names.get(1));

if (table == null) {
throw new SqlException(Common.INTERNAL_ERR, "Table with name " + names + " not found");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ private SchemaPlus getDefaultSchema(int catalogVersion, String schemaName) {
assert rootSchema != null : "Root schema does not exist";

SchemaPlus schemaPlus = rootSchema.root();
SchemaPlus defaultSchema = schemaPlus.getSubSchema(schemaName);
SchemaPlus defaultSchema = schemaPlus.subSchemas().get(schemaName);
// If default schema does not exist or misconfigured, we should use the root schema as default one
// because there is no other schema for the validator to use.
if (defaultSchema == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@

package org.apache.ignite.internal.sql.engine.schema;

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.ignite.internal.util.CollectionUtils;
import org.jetbrains.annotations.Nullable;
import org.apache.calcite.schema.lookup.Lookup;
import org.apache.calcite.util.NameMap;
import org.jetbrains.annotations.TestOnly;

/**
* Schema implementation for sql engine.
Expand All @@ -37,16 +35,28 @@ public class IgniteSchema extends AbstractSchema {

private final int catalogVersion;

private final Map<String, IgniteDataSource> tableByName;

private final Int2ObjectMap<IgniteDataSource> tableById;
private final Lookup<Table> tableLookup;

/** Constructor. */
@TestOnly
public IgniteSchema(String name, int catalogVersion, Collection<? extends IgniteDataSource> tables) {
this(
name,
catalogVersion,
Lookup.of(NameMap.immutableCopyOf(tables.stream().collect(Collectors.toMap(IgniteDataSource::name, Table.class::cast))))
);
}

/** Constructor. */
public IgniteSchema(String name, int catalogVersion, Lookup<Table> tableLookup) {
this.name = name;
this.catalogVersion = catalogVersion;
this.tableByName = tables.stream().collect(Collectors.toMap(IgniteDataSource::name, Function.identity()));
this.tableById = tables.stream().collect(CollectionUtils.toIntMapCollector(IgniteDataSource::id, Function.identity()));
this.tableLookup = tableLookup;
}

@Override
public Lookup<Table> tables() {
return tableLookup;
}

/** Schema name. */
Expand All @@ -59,20 +69,8 @@ public int catalogVersion() {
return catalogVersion;
}

/** {@inheritDoc} */
@Override
protected Map<String, Table> getTableMap() {
return Collections.unmodifiableMap(tableByName);
}

/** Returns table by given id. */
@Nullable IgniteTable tableByIdOpt(int tableId) {
IgniteDataSource dataSource = tableById.get(tableId);

if (!(dataSource instanceof IgniteTable)) {
return null;
}

return (IgniteTable) dataSource;
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
Expand All @@ -39,6 +40,9 @@
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.lookup.CompatibilityLookup;
import org.apache.calcite.schema.lookup.Lookup;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.catalog.Catalog;
Expand Down Expand Up @@ -148,42 +152,43 @@ public CompletableFuture<Void> schemaReadyFuture(int catalogVersion) {
@Override
public IgniteTable table(int catalogVersion, int tableId) {
return fullDataTableCache.get(cacheKey(catalogVersion, tableId), key -> {
// Load actual table information from the catalog.
Catalog catalog = catalogManager.catalog(catalogVersion);

if (catalog == null) {
throw new IgniteInternalException(Common.INTERNAL_ERR, "Catalog of given version not found: " + catalogVersion);
}

CatalogTableDescriptor tableDescriptor = catalog.table(tableId);

if (tableDescriptor == null) {
throw new IgniteInternalException(Common.INTERNAL_ERR, "Table with given id not found: " + tableId);
}

IgniteSchemas rootSchema = schemaCache.get(catalogVersion);

// Retrieve table from the schema (if it exists).
if (rootSchema != null) {
SchemaPlus schemaPlus = rootSchema.root();

for (String name : schemaPlus.getSubSchemaNames()) {
SchemaPlus subSchema = schemaPlus.getSubSchema(name);
CatalogSchemaDescriptor schemaDescriptor = catalog.schema(tableDescriptor.schemaId());

assert subSchema != null : name;
assert schemaDescriptor != null;

IgniteSchema schema = subSchema.unwrap(IgniteSchema.class);

assert schema != null : "unknown schema " + subSchema;

// Schema contains a wrapper for IgniteTable that includes actual information for a table (indexes, etc).
ActualIgniteTable table = (ActualIgniteTable) schema.tableByIdOpt(tableId);

if (table != null) {
return table;
}
}
}
SchemaPlus subSchema = schemaPlus.subSchemas().get(schemaDescriptor.name());

// Load actual table information from the catalog.
assert subSchema != null : schemaDescriptor.name();

Catalog catalog = catalogManager.catalog(catalogVersion);
IgniteSchema schema = subSchema.unwrap(IgniteSchema.class);

if (catalog == null) {
throw new IgniteInternalException(Common.INTERNAL_ERR, "Catalog of given version not found: " + catalogVersion);
}
assert schema != null : "unknown schema " + subSchema;

CatalogTableDescriptor tableDescriptor = catalog.table(tableId);
// Schema contains a wrapper for IgniteTable that includes actual information for a table (indexes, etc).
ActualIgniteTable table = (ActualIgniteTable) schema.tables().get(tableDescriptor.name());

if (tableDescriptor == null) {
throw new IgniteInternalException(Common.INTERNAL_ERR, "Table with given id not found: " + tableId);
if (table != null) {
return table;
}
}

CacheKey tableKey = tableCacheKey(tableDescriptor.id(), tableDescriptor.updateTimestamp());
Expand Down Expand Up @@ -227,11 +232,12 @@ private IgniteSchema createSqlSchema(Catalog catalog, CatalogSchemaDescriptor sc
int catalogVersion = catalog.version();
String schemaName = schemaDescriptor.name();

int numTables = schemaDescriptor.tables().length;
List<IgniteDataSource> schemaDataSources = new ArrayList<>(numTables);
Lookup<Table> tableLookup = new CompatibilityLookup<>(
schemaDescriptor::table,
() -> Arrays.stream(schemaDescriptor.tables()).map(CatalogTableDescriptor::name).collect(Collectors.toSet())
).map((tableDescriptor, name) -> {

// Assemble sql-engine.TableDescriptors as they are required by indexes.
for (CatalogTableDescriptor tableDescriptor : schemaDescriptor.tables()) {
// Assemble sql-engine.TableDescriptors as they are required by indexes.
CacheKey tableKey = tableCacheKey(tableDescriptor.id(), tableDescriptor.updateTimestamp());

// Load cached table by (id, version)
Expand All @@ -246,26 +252,25 @@ private IgniteSchema createSqlSchema(Catalog catalog, CatalogSchemaDescriptor sc
tableDescriptor.primaryKeyIndexId()
);

// Store a wrapper for the table that includes actual information for a table (indexes, etc),
// because the cached table entry (id, version) may not include up-to-date information on indexes.
schemaDataSources.add(new ActualIgniteTable(igniteTable, tableIndexes));
}
return new ActualIgniteTable(igniteTable, tableIndexes);
});

for (CatalogSystemViewDescriptor systemViewDescriptor : schemaDescriptor.systemViews()) {
Lookup<Table> systemViewLookup = new CompatibilityLookup<>(
schemaDescriptor::systemView,
() -> Arrays.stream(schemaDescriptor.systemViews()).map(CatalogSystemViewDescriptor::name).collect(Collectors.toSet())
).map((systemViewDescriptor, name) -> {
int viewId = systemViewDescriptor.id();
String viewName = systemViewDescriptor.name();
TableDescriptor descriptor = createTableDescriptorForSystemView(systemViewDescriptor);

IgniteSystemView schemaTable = new IgniteSystemViewImpl(
return new IgniteSystemViewImpl(
viewName,
viewId,
descriptor
);
});

schemaDataSources.add(schemaTable);
}

return new IgniteSchema(schemaName, catalogVersion, schemaDataSources);
return new IgniteSchema(schemaName, catalogVersion, Lookup.concat(tableLookup, systemViewLookup));
}

private static IgniteIndex createSchemaIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ private IgniteRel parseQuery(IgniteSchema schema, String sqlStmt) {
try {
PlanningContext ctx = PlanningContext.builder()
.frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
.defaultSchema(createRootSchema(List.of(schema)).getSubSchema(schema.getName()))
.defaultSchema(createRootSchema(List.of(schema)).subSchemas().get(schema.getName()))
.build())
.defaultSchemaName(schema.getName())
.query(sqlStmt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.lookup.LikePattern;
import org.apache.calcite.schema.lookup.Lookup;
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchemas;
Expand Down Expand Up @@ -60,9 +63,10 @@ public PredefinedSchemaManager(Collection<IgniteSchema> schemas) {
for (IgniteSchema schema : schemas) {
schemaPlus.add(schema.getName(), schema);

Lookup<Table> tables = schema.tables();
tableById.putAll(
schema.getTableNames().stream()
.map(schema::getTable)
tables.getNames(LikePattern.any()).stream()
.map(tables::get)
.map(IgniteTable.class::cast)
.collect(toIntMapCollector(IgniteTable::id, identity()))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ protected PlanningContext plannerCtx(
}

SchemaPlus rootSchema = createRootSchema(schemas);
SchemaPlus defaultSchema = rootSchema.getSubSchema(DEFAULT_SCHEMA);
SchemaPlus defaultSchema = rootSchema.subSchemas().get(DEFAULT_SCHEMA);

if (defaultSchema == null && !schemas.isEmpty()) {
defaultSchema = rootSchema.getSubSchema(schemas.iterator().next().getName());
defaultSchema = rootSchema.subSchemas().get(schemas.iterator().next().getName());
}

assertNotNull(defaultSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,12 +553,12 @@ private static class TestCase {
}

List<Integer> colocationKeys() {
IgniteTable table = (IgniteTable) schema.getTable("T");
IgniteTable table = (IgniteTable) schema.tables().get("T");
return table.distribution().getKeys();
}

List<String> columnNames() {
IgniteTable table = (IgniteTable) schema.getTable("T");
IgniteTable table = (IgniteTable) schema.tables().get("T");
List<String> names = new ArrayList<>();
TableDescriptor tableDescriptor = table.descriptor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testLongPlanningTimeout() {
PlanningContext ctx = PlanningContext.builder()
.plannerTimeout(plannerTimeout)
.frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
.defaultSchema(createRootSchema(List.of(schema)).getSubSchema(schema.getName()))
.defaultSchema(createRootSchema(List.of(schema)).subSchemas().get(schema.getName()))
.build())
.defaultSchemaName(schema.getName())
.query(sql)
Expand Down
Loading