Skip to content

Commit

Permalink
Implement engine support for CREATE OR REPLACE TABLE
Browse files Browse the repository at this point in the history
  • Loading branch information
mdesmet authored and Praveen2112 committed Oct 25, 2023
1 parent 386dc92 commit e73e1e9
Show file tree
Hide file tree
Showing 37 changed files with 478 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.SaveMode;
import io.trino.spi.security.AccessDeniedException;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeNotFoundException;
Expand Down Expand Up @@ -83,7 +84,6 @@
import static io.trino.sql.tree.LikeClause.PropertiesOption.EXCLUDING;
import static io.trino.sql.tree.LikeClause.PropertiesOption.INCLUDING;
import static io.trino.sql.tree.SaveMode.FAIL;
import static io.trino.sql.tree.SaveMode.IGNORE;
import static io.trino.sql.tree.SaveMode.REPLACE;
import static io.trino.type.UnknownType.UNKNOWN;
import static java.util.Locale.ENGLISH;
Expand Down Expand Up @@ -130,10 +130,6 @@ public ListenableFuture<Void> execute(
ListenableFuture<Void> internalExecute(CreateTable statement, Session session, List<Expression> parameters, Consumer<Output> outputConsumer)
{
checkArgument(!statement.getElements().isEmpty(), "no columns for table");
// TODO: Remove when engine is supporting table replacement
if (statement.getSaveMode() == REPLACE) {
throw semanticException(NOT_SUPPORTED, statement, "Replace table is not supported");
}

Map<NodeRef<Parameter>, Expression> parameterLookup = bindParameters(statement, parameters);
QualifiedObjectName tableName = createQualifiedObjectName(session, statement, statement.getName());
Expand All @@ -147,7 +143,7 @@ ListenableFuture<Void> internalExecute(CreateTable statement, Session session, L
}
throw e;
}
if (tableHandle.isPresent()) {
if (tableHandle.isPresent() && statement.getSaveMode() != REPLACE) {
if (statement.getSaveMode() == FAIL) {
throw semanticException(TABLE_ALREADY_EXISTS, statement, "Table '%s' already exists", tableName);
}
Expand Down Expand Up @@ -298,7 +294,7 @@ else if (element instanceof LikeClause likeClause) {
Map<String, Object> finalProperties = combineProperties(specifiedPropertyKeys, properties, inheritedProperties);
ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(tableName.asSchemaTableName(), ImmutableList.copyOf(columns.values()), finalProperties, statement.getComment());
try {
plannerContext.getMetadata().createTable(session, catalogName, tableMetadata, statement.getSaveMode() == IGNORE);
plannerContext.getMetadata().createTable(session, catalogName, tableMetadata, toConnectorSaveMode(statement.getSaveMode()));
}
catch (TrinoException e) {
// connectors are not required to handle the ignoreExisting flag
Expand Down Expand Up @@ -334,4 +330,13 @@ private static Map<String, Object> combineProperties(Set<String> specifiedProper
}
return finalProperties;
}

private static SaveMode toConnectorSaveMode(io.trino.sql.tree.SaveMode saveMode)
{
return switch (saveMode) {
case FAIL -> SaveMode.FAIL;
case IGNORE -> SaveMode.IGNORE;
case REPLACE -> SaveMode.REPLACE;
};
}
}
7 changes: 4 additions & 3 deletions core/trino-main/src/main/java/io/trino/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SampleApplicationResult;
import io.trino.spi.connector.SampleType;
import io.trino.spi.connector.SaveMode;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SortItem;
import io.trino.spi.connector.SystemTable;
Expand Down Expand Up @@ -213,9 +214,9 @@ Optional<TableExecuteHandle> getTableHandleForExecute(
/**
* Creates a table using the specified table metadata.
*
* @throws TrinoException with {@code ALREADY_EXISTS} if the table already exists and {@param ignoreExisting} is not set
* @throws TrinoException with {@code ALREADY_EXISTS} if the table already exists and {@param saveMode} is set to FAIL.
*/
void createTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, boolean ignoreExisting);
void createTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, SaveMode saveMode);

/**
* Rename the specified table.
Expand Down Expand Up @@ -314,7 +315,7 @@ Optional<TableExecuteHandle> getTableHandleForExecute(
/**
* Begin the atomic creation of a table with data.
*/
OutputTableHandle beginCreateTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, Optional<TableLayout> layout);
OutputTableHandle beginCreateTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, Optional<TableLayout> layout, boolean replace);

/**
* Finish a table creation with data after the data is written.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SampleApplicationResult;
import io.trino.spi.connector.SampleType;
import io.trino.spi.connector.SaveMode;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.connector.SortItem;
Expand Down Expand Up @@ -769,12 +770,12 @@ public void setSchemaAuthorization(Session session, CatalogSchemaName source, Tr
}

@Override
public void createTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, boolean ignoreExisting)
public void createTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, SaveMode saveMode)
{
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, catalogName);
CatalogHandle catalogHandle = catalogMetadata.getCatalogHandle();
ConnectorMetadata metadata = catalogMetadata.getMetadata(session);
metadata.createTable(session.toConnectorSession(catalogHandle), tableMetadata, ignoreExisting);
metadata.createTable(session.toConnectorSession(catalogHandle), tableMetadata, saveMode);
if (catalogMetadata.getSecurityManagement() == SYSTEM) {
systemSecurityMetadata.tableCreated(session, new CatalogSchemaTableName(catalogName, tableMetadata.getTable()));
}
Expand Down Expand Up @@ -1043,15 +1044,15 @@ public void cleanupQuery(Session session)
}

@Override
public OutputTableHandle beginCreateTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, Optional<TableLayout> layout)
public OutputTableHandle beginCreateTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, Optional<TableLayout> layout, boolean replace)
{
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, catalogName);
CatalogHandle catalogHandle = catalogMetadata.getCatalogHandle();
ConnectorMetadata metadata = catalogMetadata.getMetadata(session);

ConnectorTransactionHandle transactionHandle = catalogMetadata.getTransactionHandleFor(catalogHandle);
ConnectorSession connectorSession = session.toConnectorSession(catalogHandle);
ConnectorOutputTableHandle handle = metadata.beginCreateTable(connectorSession, tableMetadata, layout.map(TableLayout::getLayout), getRetryPolicy(session).getRetryMode());
ConnectorOutputTableHandle handle = metadata.beginCreateTable(connectorSession, tableMetadata, layout.map(TableLayout::getLayout), getRetryPolicy(session).getRetryMode(), replace);
return new OutputTableHandle(catalogHandle, tableMetadata.getTable(), transactionHandle, handle);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1328,19 +1328,22 @@ public static final class Create
private final Optional<TableLayout> layout;
private final boolean createTableAsSelectWithData;
private final boolean createTableAsSelectNoOp;
private final boolean replace;

public Create(
Optional<QualifiedObjectName> destination,
Optional<ConnectorTableMetadata> metadata,
Optional<TableLayout> layout,
boolean createTableAsSelectWithData,
boolean createTableAsSelectNoOp)
boolean createTableAsSelectNoOp,
boolean replace)
{
this.destination = requireNonNull(destination, "destination is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.layout = requireNonNull(layout, "layout is null");
this.createTableAsSelectWithData = createTableAsSelectWithData;
this.createTableAsSelectNoOp = createTableAsSelectNoOp;
this.replace = replace;
}

public Optional<QualifiedObjectName> getDestination()
Expand All @@ -1367,6 +1370,11 @@ public boolean isCreateTableAsSelectNoOp()
{
return createTableAsSelectNoOp;
}

public boolean isReplace()
{
return replace;
}
}

@Immutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -884,23 +884,19 @@ protected Scope visitAnalyze(Analyze node, Optional<Scope> scope)
@Override
protected Scope visitCreateTableAsSelect(CreateTableAsSelect node, Optional<Scope> scope)
{
// TODO: Remove when engine is supporting table replacement
if (node.getSaveMode() == REPLACE) {
throw semanticException(NOT_SUPPORTED, node, "Replace table is not supported");
}

// turn this into a query that has a new table writer node on top.
QualifiedObjectName targetTable = createQualifiedObjectName(session, node, node.getName());

Optional<TableHandle> targetTableHandle = metadata.getTableHandle(session, targetTable);
if (targetTableHandle.isPresent()) {
if (targetTableHandle.isPresent() && node.getSaveMode() != REPLACE) {
if (node.getSaveMode() == IGNORE) {
analysis.setCreate(new Analysis.Create(
Optional.of(targetTable),
Optional.empty(),
Optional.empty(),
node.isWithData(),
true));
true,
false));
analysis.setUpdateType("CREATE TABLE");
analysis.setUpdateTarget(targetTableHandle.get().getCatalogHandle().getVersion(), targetTable, Optional.empty(), Optional.of(ImmutableList.of()));
return createAndAssignScope(node, scope, Field.newUnqualified("rows", BIGINT));
Expand Down Expand Up @@ -989,7 +985,8 @@ protected Scope visitCreateTableAsSelect(CreateTableAsSelect node, Optional<Scop
Optional.of(tableMetadata),
newTableLayout,
node.isWithData(),
false));
false,
node.getSaveMode() == REPLACE));

analysis.setUpdateType("CREATE TABLE");
analysis.setUpdateTarget(
Expand Down Expand Up @@ -1126,11 +1123,6 @@ protected Scope visitSetSchemaAuthorization(SetSchemaAuthorization node, Optiona
@Override
protected Scope visitCreateTable(CreateTable node, Optional<Scope> scope)
{
// TODO: Remove when engine is supporting table replacement
if (node.getSaveMode() == REPLACE) {
throw semanticException(NOT_SUPPORTED, node, "Replace table is not supported");
}

validateProperties(node.getProperties(), scope);
return createAndAssignScope(node, scope);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ private RelationPlan createTableCreationPlan(Analysis analysis, Query query)
analysis,
plan.getRoot(),
visibleFields(plan),
new CreateReference(catalogName, tableMetadata, newTableLayout),
new CreateReference(catalogName, tableMetadata, newTableLayout, create.isReplace()),
columnNames,
newTableLayout,
statisticsMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,12 @@ private WriterTarget createWriterTarget(WriterTarget target)
// TODO: we shouldn't need to store the schemaTableName in the handles, but there isn't a good way to pass this around with the current architecture
if (target instanceof CreateReference create) {
return new CreateTarget(
metadata.beginCreateTable(session, create.getCatalog(), create.getTableMetadata(), create.getLayout()),
metadata.beginCreateTable(session, create.getCatalog(), create.getTableMetadata(), create.getLayout(), create.isReplace()),
create.getTableMetadata().getTable(),
target.supportsMultipleWritersPerPartition(metadata, session),
target.getMaxWriterTasks(metadata, session),
target.getWriterScalingOptions(metadata, session));
target.getWriterScalingOptions(metadata, session),
create.isReplace());
}
if (target instanceof InsertReference insert) {
return new InsertTarget(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,14 @@ public static class CreateReference
private final String catalog;
private final ConnectorTableMetadata tableMetadata;
private final Optional<TableLayout> layout;
private final boolean replace;

public CreateReference(String catalog, ConnectorTableMetadata tableMetadata, Optional<TableLayout> layout)
public CreateReference(String catalog, ConnectorTableMetadata tableMetadata, Optional<TableLayout> layout, boolean replace)
{
this.catalog = requireNonNull(catalog, "catalog is null");
this.tableMetadata = requireNonNull(tableMetadata, "tableMetadata is null");
this.layout = requireNonNull(layout, "layout is null");
this.replace = replace;
}

public String getCatalog()
Expand Down Expand Up @@ -253,6 +255,11 @@ public ConnectorTableMetadata getTableMetadata()
return tableMetadata;
}

public boolean isReplace()
{
return replace;
}

@Override
public String toString()
{
Expand All @@ -268,20 +275,23 @@ public static class CreateTarget
private final boolean multipleWritersPerPartitionSupported;
private final OptionalInt maxWriterTasks;
private final WriterScalingOptions writerScalingOptions;
private final boolean replace;

@JsonCreator
public CreateTarget(
@JsonProperty("handle") OutputTableHandle handle,
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@JsonProperty("multipleWritersPerPartitionSupported") boolean multipleWritersPerPartitionSupported,
@JsonProperty("maxWriterTasks") OptionalInt maxWriterTasks,
@JsonProperty("writerScalingOptions") WriterScalingOptions writerScalingOptions)
@JsonProperty("writerScalingOptions") WriterScalingOptions writerScalingOptions,
@JsonProperty("replace") boolean replace)
{
this.handle = requireNonNull(handle, "handle is null");
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.multipleWritersPerPartitionSupported = multipleWritersPerPartitionSupported;
this.maxWriterTasks = requireNonNull(maxWriterTasks, "maxWriterTasks is null");
this.writerScalingOptions = requireNonNull(writerScalingOptions, "writerScalingOptions is null");
this.replace = replace;
}

@JsonProperty
Expand All @@ -308,6 +318,12 @@ public WriterScalingOptions getWriterScalingOptions()
return writerScalingOptions;
}

@JsonProperty
public boolean isReplace()
{
return replace;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SampleApplicationResult;
import io.trino.spi.connector.SampleType;
import io.trino.spi.connector.SaveMode;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.connector.SortItem;
Expand Down Expand Up @@ -375,6 +376,15 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
}
}

@Override
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, SaveMode saveMode)
{
Span span = startSpan("createTable", tableMetadata.getTable());
try (var ignored = scopedSpan(span)) {
delegate.createTable(session, tableMetadata, saveMode);
}
}

@Override
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
Expand Down Expand Up @@ -609,6 +619,15 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
}
}

@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode, boolean replace)
{
Span span = startSpan("beginCreateTable", tableMetadata.getTable());
try (var ignored = scopedSpan(span)) {
return delegate.beginCreateTable(session, tableMetadata, layout, retryMode, replace);
}
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SampleApplicationResult;
import io.trino.spi.connector.SampleType;
import io.trino.spi.connector.SaveMode;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SortItem;
import io.trino.spi.connector.SystemTable;
Expand Down Expand Up @@ -388,11 +389,11 @@ public void setSchemaAuthorization(Session session, CatalogSchemaName source, Tr
}

@Override
public void createTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, boolean ignoreExisting)
public void createTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, SaveMode saveMode)
{
Span span = startSpan("createTable", catalogName, tableMetadata);
try (var ignored = scopedSpan(span)) {
delegate.createTable(session, catalogName, tableMetadata, ignoreExisting);
delegate.createTable(session, catalogName, tableMetadata, saveMode);
}
}

Expand Down Expand Up @@ -568,11 +569,11 @@ public Optional<Type> getSupportedType(Session session, CatalogHandle catalogHan
}

@Override
public OutputTableHandle beginCreateTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, Optional<TableLayout> layout)
public OutputTableHandle beginCreateTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, Optional<TableLayout> layout, boolean replace)
{
Span span = startSpan("beginCreateTable", catalogName, tableMetadata);
try (var ignored = scopedSpan(span)) {
return delegate.beginCreateTable(session, catalogName, tableMetadata, layout);
return delegate.beginCreateTable(session, catalogName, tableMetadata, layout, replace);
}
}

Expand Down
Loading

0 comments on commit e73e1e9

Please sign in to comment.