Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/trigger_files/beam_PostCommit_SQL.json
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
{}
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
}
8 changes: 8 additions & 0 deletions sdks/java/extensions/sql/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ data: {
"org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlCreate"
"org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlDrop"
"org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName"
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlCreateCatalog"
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlCreateExternalTable"
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlCreateFunction"
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlDropCatalog"
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlDdlNodes"
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlSetCatalog"
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlSetOptionBeam"
"org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils"
"org.apache.beam.sdk.schemas.Schema"
Expand All @@ -41,6 +44,7 @@ data: {
"JAR"
"LOCATION"
"TBLPROPERTIES"
"PROPERTIES"
]

# List of keywords from "keywords" section that are not reserved.
Expand Down Expand Up @@ -364,6 +368,7 @@ data: {
"JAR"
"LOCATION"
"TBLPROPERTIES"
"PROPERTIES"
]

# List of non-reserved keywords to add;
Expand All @@ -385,6 +390,7 @@ data: {
# Return type of method implementation should be 'SqlNode'.
# Example: SqlShowDatabases(), SqlShowTables().
statementParserMethods: [
"SqlSetCatalog(Span.of(), null)"
"SqlSetOptionBeam(Span.of(), null)"
]

Expand Down Expand Up @@ -416,6 +422,7 @@ data: {
# List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
# Each must accept arguments "(SqlParserPos pos, boolean replace)".
createStatementParserMethods: [
"SqlCreateCatalog"
"SqlCreateExternalTable"
"SqlCreateFunction"
"SqlCreateTableNotSupportedMessage"
Expand All @@ -425,6 +432,7 @@ data: {
# Each must accept arguments "(SqlParserPos pos)".
dropStatementParserMethods: [
"SqlDropTable"
"SqlDropCatalog"
]

# Binary operators tokens
Expand Down
119 changes: 119 additions & 0 deletions sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,125 @@ Schema.Field Field() :
}
}

SqlNodeList PropertyList() :
{
SqlNodeList list = new SqlNodeList(getPos());
SqlNode property;
}
{
property = Property() { list.add(property); }
(
<COMMA> property = Property() { list.add(property); }
)*
{
return list;
}
}


SqlNode Property() :
{
SqlNode key;
SqlNode value;
}
{
key = StringLiteral()
<EQ>
value = StringLiteral()
{
SqlNodeList pair = new SqlNodeList(getPos());
pair.add(key);
pair.add(value);
return pair;
}
}

/**
* CREATE CATALOG ( IF NOT EXISTS )? catalog_name
* TYPE type_name
* ( PROPERTIES '(' key = value ( ',' key = value )* ')' )?
*/
SqlCreate SqlCreateCatalog(Span s, boolean replace) :
{
final boolean ifNotExists;
final SqlNode catalogName;
final SqlNode type;
SqlNodeList properties = null;
}
{

<CATALOG> {
s.add(this);
}

ifNotExists = IfNotExistsOpt()
(
catalogName = StringLiteral()
|
catalogName = SimpleIdentifier()
)
<TYPE>
(
type = StringLiteral()
|
type = SimpleIdentifier()
)
[ <PROPERTIES> <LPAREN> properties = PropertyList() <RPAREN> ]

{
return new SqlCreateCatalog(
s.end(this),
replace,
ifNotExists,
catalogName,
type,
properties);
}
}

/**
* SET CATALOG catalog_name
*/
SqlCall SqlSetCatalog(Span s, String scope) :
{
final SqlNode catalogName;
}
{
<SET> {
s.add(this);
}
<CATALOG>
(
catalogName = StringLiteral()
|
catalogName = SimpleIdentifier()
)
{
return new SqlSetCatalog(
s.end(this),
scope,
catalogName);
}
}


SqlDrop SqlDropCatalog(Span s, boolean replace) :
{
final boolean ifExists;
final SqlNode catalogName;
}
{
<CATALOG> ifExists = IfExistsOpt()
(
catalogName = StringLiteral()
|
catalogName = SimpleIdentifier()
)
{
return new SqlDropCatalog(s.end(this), ifExists, catalogName);
}
}

/**
* Note: This example is probably out of sync with the code.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.beam.sdk.extensions.sql;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.ParseException;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager;
import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand All @@ -30,19 +32,25 @@
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
@Internal
public class BeamSqlCli {
private BeamSqlEnv env;
/** The store which persists all the table meta data. */
private MetaStore metaStore;

public BeamSqlCli metaStore(MetaStore metaStore) {
return metaStore(metaStore, false, PipelineOptionsFactory.create());
public BeamSqlCli catalogManager(CatalogManager catalogManager) {
return build(BeamSqlEnv.builder(catalogManager), false, PipelineOptionsFactory.create());
}

public BeamSqlCli metaStore(
MetaStore metaStore, boolean autoLoadUdfUdaf, PipelineOptions pipelineOptions) {
public BeamSqlCli metaStore(MetaStore metaStore) {
this.metaStore = metaStore;
BeamSqlEnv.BeamSqlEnvBuilder builder = BeamSqlEnv.builder(metaStore);
return build(BeamSqlEnv.builder(metaStore), false, PipelineOptionsFactory.create());
}

public BeamSqlCli build(
BeamSqlEnv.BeamSqlEnvBuilder builder,
boolean autoLoadUdfUdaf,
PipelineOptions pipelineOptions) {
if (autoLoadUdfUdaf) {
builder.autoLoadUserDefinedFunctions();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalogManager;
import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
Expand Down Expand Up @@ -136,9 +136,9 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
public PCollection<Row> expand(PInput input) {
TableProvider inputTableProvider =
new ReadOnlyTableProvider(PCOLLECTION_NAME, toTableMap(input));
InMemoryMetaStore metaTableProvider = new InMemoryMetaStore();
metaTableProvider.registerProvider(inputTableProvider);
BeamSqlEnvBuilder sqlEnvBuilder = BeamSqlEnv.builder(metaTableProvider);
InMemoryCatalogManager catalogManager = new InMemoryCatalogManager();
catalogManager.registerTableProvider(PCOLLECTION_NAME, inputTableProvider);
BeamSqlEnvBuilder sqlEnvBuilder = BeamSqlEnv.builder(catalogManager);

// TODO: validate duplicate functions.
registerFunctions(sqlEnvBuilder);
Expand All @@ -147,7 +147,7 @@ public PCollection<Row> expand(PInput input) {
// the same names are reused.
if (autoLoading()) {
sqlEnvBuilder.autoLoadUserDefinedFunctions();
ServiceLoader.load(TableProvider.class).forEach(metaTableProvider::registerProvider);
ServiceLoader.load(TableProvider.class).forEach(catalogManager::registerTableProvider);
}

tableProviderMap().forEach(sqlEnvBuilder::addSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
*/
package org.apache.beam.sdk.extensions.sql.impl;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog;
import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.linq4j.tree.Expression;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.type.RelProtoDataType;
Expand All @@ -37,7 +41,8 @@
@SuppressWarnings({"keyfor", "nullness"}) // TODO(https://github.com/apache/beam/issues/20497)
public class BeamCalciteSchema implements Schema {
private JdbcConnection connection;
private TableProvider tableProvider;
private @Nullable TableProvider tableProvider;
private @Nullable CatalogManager catalogManager;
private Map<String, BeamCalciteSchema> subSchemas;

BeamCalciteSchema(JdbcConnection jdbcConnection, TableProvider tableProvider) {
Expand All @@ -46,8 +51,22 @@ public class BeamCalciteSchema implements Schema {
this.subSchemas = new HashMap<>();
}

/**
* Creates a {@link BeamCalciteSchema} representing a {@link CatalogManager}. This will typically
* be the root node of a pipeline.
*/
BeamCalciteSchema(JdbcConnection jdbcConnection, CatalogManager catalogManager) {
this.connection = jdbcConnection;
this.catalogManager = catalogManager;
this.subSchemas = new HashMap<>();
}

public TableProvider getTableProvider() {
return tableProvider;
return resolveMetastore();
}

public @Nullable CatalogManager getCatalogManager() {
return catalogManager;
}

public Map<String, String> getPipelineOptions() {
Expand Down Expand Up @@ -87,7 +106,7 @@ public Expression getExpression(SchemaPlus parentSchema, String name) {

@Override
public Set<String> getTableNames() {
return tableProvider.getTables().keySet();
return resolveMetastore().getTables().keySet();
}

@Override
Expand All @@ -103,12 +122,12 @@ public Set<String> getTypeNames() {
@Override
public org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.schema.Table getTable(
String name) {
Table table = tableProvider.getTable(name);
Table table = resolveMetastore().getTable(name);
if (table == null) {
return null;
}
return new BeamCalciteTable(
tableProvider.buildBeamSqlTable(table),
resolveMetastore().buildBeamSqlTable(table),
getPipelineOptions(),
connection.getPipelineOptions());
}
Expand All @@ -125,17 +144,36 @@ public Collection<Function> getFunctions(String name) {

@Override
public Set<String> getSubSchemaNames() {
return tableProvider.getSubProviders();
return resolveMetastore().getSubProviders();
}

/**
* If this is the root schema (in other words, a {@link CatalogManager}), the sub schema will be a
* {@link Catalog}'s metastore.
*
* <p>Otherwise, the sub-schema is derived from the {@link TableProvider} implementation.
*/
@Override
public Schema getSubSchema(String name) {
if (!subSchemas.containsKey(name)) {
TableProvider subProvider = tableProvider.getSubProvider(name);
BeamCalciteSchema subSchema =
subProvider == null ? null : new BeamCalciteSchema(connection, subProvider);
BeamCalciteSchema subSchema;
if (tableProvider != null) {
@Nullable TableProvider subProvider = tableProvider.getSubProvider(name);
subSchema = subProvider != null ? new BeamCalciteSchema(connection, subProvider) : null;
} else {
@Nullable Catalog catalog = checkStateNotNull(catalogManager).getCatalog(name);
subSchema = catalog != null ? new BeamCalciteSchema(connection, catalog.metaStore()) : null;
}
subSchemas.put(name, subSchema);
}

return subSchemas.get(name);
}

public TableProvider resolveMetastore() {
if (tableProvider != null) {
return tableProvider;
}
return checkStateNotNull(catalogManager).currentCatalog().metaStore();
}
}
Loading
Loading