Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time travel engine improvements #12542

Merged
merged 3 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions core/trino-main/src/main/java/io/trino/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -695,11 +695,6 @@ default boolean isMaterializedView(Session session, QualifiedObjectName viewName
*/
RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional<TableVersion> startVersion, Optional<TableVersion> endVersion);

/**
* Verifies that a version is valid for a given table
*/
boolean isValidTableVersion(Session session, QualifiedObjectName tableName, TableVersion version);

/**
* Returns true if the connector reports number of written bytes for an existing table. Otherwise, it returns false.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,21 +265,12 @@ public Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName

ConnectorSession connectorSession = session.toConnectorSession(catalogName);

// GetTableHandle with the optional version handle field will throw an error if it is not implemented, so only try calling it when we have a version
if (startVersion.isPresent() || endVersion.isPresent()) {
ConnectorTableHandle versionedTableHandle = metadata.getTableHandle(
connectorSession,
table.asSchemaTableName(),
toConnectorVersion(startVersion),
toConnectorVersion(endVersion));
return Optional.ofNullable(versionedTableHandle)
.map(connectorTableHandle -> new TableHandle(
catalogName,
connectorTableHandle,
catalogMetadata.getTransactionHandleFor(catalogName)));
}

return Optional.ofNullable(metadata.getTableHandle(connectorSession, table.asSchemaTableName()))
ConnectorTableHandle tableHandle = metadata.getTableHandle(
connectorSession,
table.asSchemaTableName(),
toConnectorVersion(startVersion),
toConnectorVersion(endVersion));
return Optional.ofNullable(tableHandle)
.map(connectorTableHandle -> new TableHandle(
catalogName,
connectorTableHandle,
Expand Down Expand Up @@ -2381,22 +2372,6 @@ private synchronized void finish()
}
}

@Override
public boolean isValidTableVersion(Session session, QualifiedObjectName tableName, TableVersion version)
{
requireNonNull(version, "Version must not be null for table " + tableName);

Optional<CatalogMetadata> catalog = getOptionalCatalogMetadata(session, tableName.getCatalogName());
if (!catalog.isPresent()) {
return false;
}

CatalogMetadata catalogMetadata = catalog.get();
CatalogName connectorId = catalogMetadata.getConnectorId(session, tableName);
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(session, connectorId);
return metadata.isSupportedVersionType(session.toConnectorSession(), tableName.asSchemaTableName(), version.getPointerType(), version.getObjectType());
}

@Override
public boolean supportsReportingWrittenBytes(Session session, QualifiedObjectName tableName, Map<String, Object> tableProperties)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4541,8 +4541,8 @@ private OutputColumn createOutputColumn(Field field)
private RedirectionAwareTableHandle getTableHandle(Table table, QualifiedObjectName name, Optional<Scope> scope)
{
if (table.getQueryPeriod().isPresent()) {
Optional<TableVersion> startVersion = extractTableVersion(table, name, table.getQueryPeriod().get().getStart(), scope);
Optional<TableVersion> endVersion = extractTableVersion(table, name, table.getQueryPeriod().get().getEnd(), scope);
Optional<TableVersion> startVersion = extractTableVersion(table, table.getQueryPeriod().get().getStart(), scope);
Optional<TableVersion> endVersion = extractTableVersion(table, table.getQueryPeriod().get().getEnd(), scope);
return metadata.getRedirectionAwareTableHandle(session, name, startVersion, endVersion);
}
return metadata.getRedirectionAwareTableHandle(session, name);
Expand All @@ -4551,7 +4551,7 @@ private RedirectionAwareTableHandle getTableHandle(Table table, QualifiedObjectN
/**
* Analyzes the version pointer in a query period and extracts an evaluated version value
*/
private Optional<TableVersion> extractTableVersion(Table table, QualifiedObjectName tableName, Optional<Expression> version, Optional<Scope> scope)
private Optional<TableVersion> extractTableVersion(Table table, Optional<Expression> version, Optional<Scope> scope)
{
Optional<TableVersion> tableVersion = Optional.empty();
if (version.isEmpty()) {
Expand All @@ -4568,11 +4568,11 @@ private Optional<TableVersion> extractTableVersion(Table table, QualifiedObjectN
}
Object evaluatedVersion = evaluateConstantExpression(version.get(), versionType, plannerContext, session, accessControl, ImmutableMap.of());
TableVersion extractedVersion = new TableVersion(pointerType, versionType, evaluatedVersion);
validateVersionPointer(tableName, table.getQueryPeriod().get(), extractedVersion);
validateVersionPointer(table.getQueryPeriod().get(), extractedVersion);
return Optional.of(extractedVersion);
}

private void validateVersionPointer(QualifiedObjectName tableName, QueryPeriod queryPeriod, TableVersion extractedVersion)
private void validateVersionPointer(QueryPeriod queryPeriod, TableVersion extractedVersion)
{
Type type = extractedVersion.getObjectType();
Object pointer = extractedVersion.getPointer();
Expand Down Expand Up @@ -4602,10 +4602,6 @@ private void validateVersionPointer(QualifiedObjectName tableName, QueryPeriod q
throw semanticException(INVALID_ARGUMENTS, queryPeriod, "Pointer value cannot be NULL");
}
}

if (!metadata.isValidTableVersion(session, tableName, extractedVersion)) {
throw semanticException(TYPE_MISMATCH, queryPeriod, format("Type %s not supported by this connector.", type.getDisplayName()));
}
}

private Instant getInstantWithRoundUp(LongTimestampWithTimeZone value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,12 +859,6 @@ public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session sessio
throw new UnsupportedOperationException();
}

@Override
public boolean isValidTableVersion(Session session, QualifiedObjectName tableName, TableVersion version)
{
throw new UnsupportedOperationException();
}

@Override
public boolean supportsReportingWrittenBytes(Session session, TableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -842,12 +842,6 @@ public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session sessio
return delegate.getRedirectionAwareTableHandle(session, tableName, startVersion, endVersion);
}

@Override
public boolean isValidTableVersion(Session session, QualifiedObjectName tableName, TableVersion version)
{
return delegate.isValidTableVersion(session, tableName, version);
}

@Override
public boolean supportsReportingWrittenBytes(Session session, TableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,10 @@ public void testInvalidTable()
.hasErrorCode(SCHEMA_NOT_FOUND);
assertFails("SELECT * FROM foo")
.hasErrorCode(TABLE_NOT_FOUND);
assertFails("SELECT * FROM foo FOR TIMESTAMP AS OF TIMESTAMP '2021-03-01 00:00:01'")
.hasErrorCode(TABLE_NOT_FOUND);
assertFails("SELECT * FROM foo FOR VERSION AS OF 'version1'")
.hasErrorCode(TABLE_NOT_FOUND);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.statistics.TableStatistics;
import io.trino.spi.statistics.TableStatisticsMetadata;
import io.trino.spi.type.Type;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -82,6 +81,33 @@ default ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTabl
return null;
}

/**
* Returns a table handle for the specified table name and version, or {@code null} if {@code tableName} relation does not exist
* or is not a table (e.g. is a view, or a materialized view).
*
* @throws TrinoException implementation can throw this exception when {@code tableName} refers to a table that
* cannot be queried.
* @see #getView(ConnectorSession, SchemaTableName)
* @see #getMaterializedView(ConnectorSession, SchemaTableName)
*/
@Nullable
default ConnectorTableHandle getTableHandle(
ConnectorSession session,
SchemaTableName tableName,
Optional<ConnectorTableVersion> startVersion,
Optional<ConnectorTableVersion> endVersion)
{
ConnectorTableHandle tableHandle = getTableHandle(session, tableName);
if (tableHandle == null) {
// Not found
return null;
}
if (startVersion.isEmpty() && endVersion.isEmpty()) {
return tableHandle;
}
throw new TrinoException(NOT_SUPPORTED, "This connector does not support versioned tables");
}

/**
* Returns a table handle for the specified table name, or null if the connector does not contain the table.
* The returned table handle can contain information in analyzeProperties.
Expand Down Expand Up @@ -1315,22 +1341,6 @@ default Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
return Optional.empty();
}

/**
* Returns a table handle representing a versioned table. A versioned table differs by having an additional specifier for version.
*/
default ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName, Optional<ConnectorTableVersion> startVersion, Optional<ConnectorTableVersion> endVersion)
{
throw new TrinoException(NOT_SUPPORTED, "This connector does not support versioned tables");
}

/**
* Returns whether a specified version type is supported by the connector for a given travel type and table name
*/
default boolean isSupportedVersionType(ConnectorSession session, SchemaTableName tableName, PointerType pointerType, Type versioning)
{
throw new TrinoException(NOT_SUPPORTED, "This connector does not support versioned tables");
}

default boolean supportsReportingWrittenBytes(ConnectorSession session, SchemaTableName schemaTableName, Map<String, Object> tableProperties)
{
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class TestSpiBackwardCompatibility
.put("383", "Method: public default void io.trino.spi.connector.ConnectorAccessControl.checkCanExecuteFunction(io.trino.spi.connector.ConnectorSecurityContext,io.trino.spi.connector.SchemaRoutineName)")
.put("384", "Constructor: public io.trino.spi.eventlistener.QueryInputMetadata(java.lang.String,java.lang.String,java.lang.String,java.util.List<java.lang.String>,java.util.Optional<java.lang.Object>,java.util.OptionalLong,java.util.OptionalLong)")
.put("386", "Method: public default java.util.stream.Stream<io.trino.spi.connector.TableColumnsMetadata> io.trino.spi.connector.ConnectorMetadata.streamTableColumns(io.trino.spi.connector.ConnectorSession,io.trino.spi.connector.SchemaTablePrefix)")
.put("386", "Method: public default boolean io.trino.spi.connector.ConnectorMetadata.isSupportedVersionType(io.trino.spi.connector.ConnectorSession,io.trino.spi.connector.SchemaTableName,io.trino.spi.connector.PointerType,io.trino.spi.type.Type)")
.put("386", "Method: public static io.trino.spi.ptf.TableArgumentSpecification$Builder io.trino.spi.ptf.TableArgumentSpecification.builder(java.lang.String)")
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.PointerType;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SampleApplicationResult;
Expand All @@ -70,7 +69,6 @@
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.statistics.TableStatistics;
import io.trino.spi.statistics.TableStatisticsMetadata;
import io.trino.spi.type.Type;

import javax.inject.Inject;

Expand Down Expand Up @@ -1009,14 +1007,6 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
}
}

@Override
public boolean isSupportedVersionType(ConnectorSession session, SchemaTableName tableName, PointerType pointerType, Type versioning)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.isSupportedVersionType(session, tableName, pointerType, versioning);
}
}

@Override
public boolean supportsReportingWrittenBytes(ConnectorSession session, SchemaTableName schemaTableName, Map<String, Object> tableProperties)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import io.trino.spi.connector.DiscretePredicates;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.MaterializedViewNotFoundException;
import io.trino.spi.connector.PointerType;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SchemaTableName;
Expand All @@ -80,7 +79,6 @@
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.statistics.TableStatistics;
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -273,7 +271,7 @@ public Optional<TrinoPrincipal> getSchemaOwner(ConnectorSession session, Catalog
@Override
public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
{
return getTableHandle(session, tableName, Optional.empty(), Optional.empty());
throw new UnsupportedOperationException("This method is not supported because getTableHandle with versions is implemented instead");
}

@Override
Expand Down Expand Up @@ -330,18 +328,6 @@ public IcebergTableHandle getTableHandle(
Optional.empty());
}

@Override
public boolean isSupportedVersionType(ConnectorSession session, SchemaTableName tableName, PointerType pointerType, io.trino.spi.type.Type versioning)
{
switch (pointerType) {
case TEMPORAL:
return versioning instanceof TimestampWithTimeZoneType || versioning instanceof TimestampType;
case TARGET_ID:
return versioning == BIGINT;
}
return false;
}

private long getSnapshotIdFromVersion(Table table, ConnectorTableVersion version)
{
io.trino.spi.type.Type versionType = version.getVersionType();
Expand Down Expand Up @@ -2094,7 +2080,7 @@ else if (strings.size() != 2) {
String schema = strings.get(0);
String name = strings.get(1);
SchemaTableName schemaTableName = new SchemaTableName(schema, name);
IcebergTableHandle tableHandle = getTableHandle(session, schemaTableName);
IcebergTableHandle tableHandle = getTableHandle(session, schemaTableName, Optional.empty(), Optional.empty());

if (tableHandle == null) {
throw new MaterializedViewNotFoundException(materializedViewName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,17 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
}
}

@Override
protected void verifyVersionedQueryFailurePermissible(Exception e)
{
assertThat(e)
.hasMessageMatching("Version pointer type is not supported: .*|" +
"Unsupported type for temporal table version: .*|" +
"Unsupported type for table version: .*|" +
"No version history table tpch.nation at or before .*|" +
"Iceberg snapshot ID does not exists: .*");
}

@Override
protected void verifyConcurrentUpdateFailurePermissible(Exception e)
{
Expand Down
Loading