Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableVersion;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.RelationColumnsMetadata;
Expand Down Expand Up @@ -371,8 +372,11 @@ public boolean supportsMerge()
}

@Override
public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, SchemaTableName schemaTableName)
public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, SchemaTableName schemaTableName, Optional<ConnectorTableVersion> endVersion)
{
if (endVersion.isPresent()) {
return delegate.getTableHandle(session, schemaTableName, endVersion);
}
TableHandlesByNameCacheKey key = new TableHandlesByNameCacheKey(getIdentityKey(session), schemaTableName);
Optional<JdbcTableHandle> cachedTableHandle = tableHandlesByNameCache.getIfPresent(key);
//noinspection OptionalAssignedToNull
Expand All @@ -382,7 +386,7 @@ public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, Schema
}
tableHandlesByNameCache.invalidate(key);
}
return get(tableHandlesByNameCache, key, () -> delegate.getTableHandle(session, schemaTableName));
return get(tableHandlesByNameCache, key, () -> delegate.getTableHandle(session, schemaTableName, endVersion));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,11 @@ public List<String> listSchemaNames(ConnectorSession session)
@Override
public JdbcTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName, Optional<ConnectorTableVersion> startVersion, Optional<ConnectorTableVersion> endVersion)
{
if (startVersion.isPresent() || endVersion.isPresent()) {
if (startVersion.isPresent()) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support versioned tables");
}

return jdbcClient.getTableHandle(session, tableName)
return jdbcClient.getTableHandle(session, tableName, endVersion)
.orElse(null);
}

Expand Down Expand Up @@ -1094,7 +1094,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
.orElseGet(() -> listTables(session, prefix.getSchema()));
for (SchemaTableName tableName : tables) {
try {
jdbcClient.getTableHandle(session, tableName)
jdbcClient.getTableHandle(session, tableName, Optional.empty())
.ifPresent(tableHandle -> columns.put(tableName, getColumnMetadata(session, tableHandle)));
}
catch (TableNotFoundException | AccessDeniedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableVersion;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.RelationColumnsMetadata;
Expand Down Expand Up @@ -88,9 +89,9 @@ public List<SchemaTableName> getTableNames(ConnectorSession session, Optional<St
}

@Override
public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, SchemaTableName schemaTableName)
public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, SchemaTableName schemaTableName, Optional<ConnectorTableVersion> endVersion)
{
return delegate().getTableHandle(session, schemaTableName);
return delegate().getTableHandle(session, schemaTableName, endVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableVersion;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.RelationColumnsMetadata;
Expand Down Expand Up @@ -63,7 +64,7 @@ default boolean schemaExists(ConnectorSession session, String schema)

List<SchemaTableName> getTableNames(ConnectorSession session, Optional<String> schema);

Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, SchemaTableName schemaTableName);
Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, SchemaTableName schemaTableName, Optional<ConnectorTableVersion> endVersion);

JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@ public class JdbcNamedRelationHandle
private final SchemaTableName schemaTableName;
private final RemoteTableName remoteTableName;
private final Optional<String> comment;
private final Optional<String> readVersion;

@JsonCreator
public JdbcNamedRelationHandle(
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@JsonProperty("remoteTableName") RemoteTableName remoteTableName,
@JsonProperty("comment") Optional<String> comment)
@JsonProperty("comment") Optional<String> comment,
@JsonProperty("readVersion") Optional<String> readVersion)
{
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null");
this.comment = requireNonNull(comment, "comment is null");
this.readVersion = requireNonNull(readVersion, "readVersion is null");
}

@JsonProperty
Expand All @@ -59,6 +62,12 @@ public Optional<String> getComment()
return comment;
}

@JsonProperty
public Optional<String> getReadVersion()
{
return readVersion;
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,14 @@ public final class JdbcTableHandle
private final List<JdbcAssignmentItem> updateAssignments;

public JdbcTableHandle(SchemaTableName schemaTableName, RemoteTableName remoteTableName, Optional<String> comment)
{
this(schemaTableName, remoteTableName, comment, Optional.empty());
}

public JdbcTableHandle(SchemaTableName schemaTableName, RemoteTableName remoteTableName, Optional<String> comment, Optional<String> readVersion)
{
this(
new JdbcNamedRelationHandle(schemaTableName, remoteTableName, comment),
new JdbcNamedRelationHandle(schemaTableName, remoteTableName, comment, readVersion),
TupleDomain.all(),
ImmutableList.of(),
Optional.empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableVersion;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.RelationColumnsMetadata;
Expand Down Expand Up @@ -85,9 +86,9 @@ public List<SchemaTableName> getTableNames(ConnectorSession session, Optional<St
}

@Override
public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, SchemaTableName schemaTableName)
public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, SchemaTableName schemaTableName, Optional<ConnectorTableVersion> endVersion)
{
return retry(policy, () -> delegate.getTableHandle(session, schemaTableName));
return retry(policy, () -> delegate.getTableHandle(session, schemaTableName, endVersion));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableVersion;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.RelationColumnsMetadata;
Expand Down Expand Up @@ -110,9 +111,9 @@ public List<SchemaTableName> getTableNames(ConnectorSession session, Optional<St
}

@Override
public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, SchemaTableName schemaTableName)
public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, SchemaTableName schemaTableName, Optional<ConnectorTableVersion> endVersion)
{
return stats.getGetTableHandle().wrap(() -> delegate().getTableHandle(session, schemaTableName));
return stats.getGetTableHandle().wrap(() -> delegate().getTableHandle(session, schemaTableName, endVersion));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public abstract class BaseJdbcConnectorTest
{
private static final Logger log = Logger.get(BaseJdbcConnectorTest.class);

private final ExecutorService executor = newCachedThreadPool(daemonThreadsNamed(getClass().getName()));
protected final ExecutorService executor = newCachedThreadPool(daemonThreadsNamed(getClass().getName()));

protected abstract SqlExecutor onRemoteDatabase();

Expand Down Expand Up @@ -1716,7 +1716,7 @@ protected void stopTracingDatabaseEvent(RemoteLogTracingEvent event)
throw new UnsupportedOperationException();
}

private QueryId getQueryId(String query)
protected QueryId getQueryId(String query)
throws Exception
{
for (int i = 0; i < 100; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.spi.connector.ColumnPosition;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableVersion;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.TupleDomain;
Expand Down Expand Up @@ -183,11 +184,11 @@ public void testTableHandleCached()
SchemaTableName phantomTable = new SchemaTableName(schema, "phantom_table");

createTable(phantomTable);
Optional<JdbcTableHandle> cachedTable = cachingJdbcClient.getTableHandle(SESSION, phantomTable);
Optional<JdbcTableHandle> cachedTable = cachingJdbcClient.getTableHandle(SESSION, phantomTable, Optional.empty());
dropTable(phantomTable);

assertThat(jdbcClient.getTableHandle(SESSION, phantomTable)).isEmpty();
assertThat(cachingJdbcClient.getTableHandle(SESSION, phantomTable)).isEqualTo(cachedTable);
assertThat(jdbcClient.getTableHandle(SESSION, phantomTable, Optional.empty())).isEmpty();
assertThat(cachingJdbcClient.getTableHandle(SESSION, phantomTable, Optional.empty())).isEqualTo(cachedTable);
}

@Test
Expand Down Expand Up @@ -348,10 +349,10 @@ private void assertTableHandlesByNameCacheIsInvalidated(CachingJdbcClient cachin
SchemaTableName tableName = table.asPlainTable().getSchemaTableName();

assertTableHandleByNameCache(cachingJdbcClient).misses(1).loads(1).afterRunning(() -> {
assertThat(cachingJdbcClient.getTableHandle(SESSION, tableName).orElseThrow()).isEqualTo(table);
assertThat(cachingJdbcClient.getTableHandle(SESSION, tableName, Optional.empty()).orElseThrow()).isEqualTo(table);
});
assertTableHandleByNameCache(cachingJdbcClient).hits(1).afterRunning(() -> {
assertThat(cachingJdbcClient.getTableHandle(SESSION, tableName).orElseThrow()).isEqualTo(table);
assertThat(cachingJdbcClient.getTableHandle(SESSION, tableName, Optional.empty()).orElseThrow()).isEqualTo(table);
});
}

Expand All @@ -363,10 +364,10 @@ public void testEmptyTableHandleIsCachedWhenCacheMissingIsTrue()
.build();
SchemaTableName phantomTable = new SchemaTableName(schema, "phantom_table");

assertThat(cachingJdbcClient.getTableHandle(SESSION, phantomTable)).isEmpty();
assertThat(cachingJdbcClient.getTableHandle(SESSION, phantomTable, Optional.empty())).isEmpty();

createTable(phantomTable);
assertThat(cachingJdbcClient.getTableHandle(SESSION, phantomTable)).isEmpty();
assertThat(cachingJdbcClient.getTableHandle(SESSION, phantomTable, Optional.empty())).isEmpty();
dropTable(phantomTable);
}

Expand All @@ -378,17 +379,17 @@ public void testEmptyTableHandleNotCachedWhenCacheMissingIsFalse()
.build();
SchemaTableName phantomTable = new SchemaTableName(schema, "phantom_table");

assertThat(cachingJdbcClient.getTableHandle(SESSION, phantomTable)).isEmpty();
assertThat(cachingJdbcClient.getTableHandle(SESSION, phantomTable, Optional.empty())).isEmpty();

createTable(phantomTable);
assertThat(cachingJdbcClient.getTableHandle(SESSION, phantomTable)).isPresent();
assertThat(cachingJdbcClient.getTableHandle(SESSION, phantomTable, Optional.empty())).isPresent();
dropTable(phantomTable);
}

private JdbcTableHandle createTable(SchemaTableName phantomTable)
{
jdbcClient.createTable(SESSION, new ConnectorTableMetadata(phantomTable, emptyList()));
return jdbcClient.getTableHandle(SESSION, phantomTable).orElseThrow();
return jdbcClient.getTableHandle(SESSION, phantomTable, Optional.empty()).orElseThrow();
}

private void createProcedure(String procedureName)
Expand Down Expand Up @@ -421,7 +422,7 @@ private void dropTable(JdbcTableHandle tableHandle)

private void dropTable(SchemaTableName phantomTable)
{
JdbcTableHandle tableHandle = jdbcClient.getTableHandle(SESSION, phantomTable).orElseThrow();
JdbcTableHandle tableHandle = jdbcClient.getTableHandle(SESSION, phantomTable, Optional.empty()).orElseThrow();
jdbcClient.dropTable(SESSION, tableHandle);
}

Expand Down Expand Up @@ -885,7 +886,7 @@ protected JdbcClient delegate()
}

@Override
public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, SchemaTableName schemaTableName)
public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, SchemaTableName schemaTableName, Optional<ConnectorTableVersion> endVersion)
{
if (first.compareAndSet(true, false)) {
// first
Expand All @@ -898,7 +899,7 @@ public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, Schema
}
throw new RuntimeException("first attempt is poised to fail");
}
return super.getTableHandle(session, schemaTableName);
return super.getTableHandle(session, schemaTableName, endVersion);
}
})
// ttl is 0, cache is disabled
Expand All @@ -913,7 +914,7 @@ public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, Schema
for (int i = 0; i < 2; i++) {
futures.add(executor.submit(() -> {
barrier.await(10, SECONDS);
return cachingJdbcClient.getTableHandle(session, tableName).orElseThrow();
return cachingJdbcClient.getTableHandle(session, tableName, Optional.empty()).orElseThrow();
}));
}

Expand Down Expand Up @@ -965,10 +966,10 @@ public void testSpecificSchemaAndTableCaches()
.doesNotContain(secondName);
});
assertTableHandleByNameCache(cachingJdbcClient).misses(1).loads(1).afterRunning(() -> {
assertThat(cachingJdbcClient.getTableHandle(session, firstName)).isNotEmpty();
assertThat(cachingJdbcClient.getTableHandle(session, firstName, Optional.empty())).isNotEmpty();
});
assertTableHandleByNameCache(cachingJdbcClient).misses(1).loads(1).afterRunning(() -> {
assertThat(cachingJdbcClient.getTableHandle(session, secondName)).isEmpty();
assertThat(cachingJdbcClient.getTableHandle(session, secondName, Optional.empty())).isEmpty();
});

jdbcClient.createSchema(SESSION, secondSchema);
Expand All @@ -986,10 +987,10 @@ public void testSpecificSchemaAndTableCaches()
.doesNotContain(secondName);
});
assertTableHandleByNameCache(cachingJdbcClient).hits(1).afterRunning(() -> {
assertThat(cachingJdbcClient.getTableHandle(session, firstName)).isNotEmpty();
assertThat(cachingJdbcClient.getTableHandle(session, firstName, Optional.empty())).isNotEmpty();
});
assertTableHandleByNameCache(cachingJdbcClient).hits(1).misses(1).loads(1).afterRunning(() -> {
assertThat(cachingJdbcClient.getTableHandle(session, secondName)).isNotEmpty();
assertThat(cachingJdbcClient.getTableHandle(session, secondName, Optional.empty())).isNotEmpty();
});

// reloads table names, retains schema names and table handles
Expand All @@ -1004,10 +1005,10 @@ public void testSpecificSchemaAndTableCaches()
.contains(firstName, secondName);
});
assertTableHandleByNameCache(cachingJdbcClient).hits(1).afterRunning(() -> {
assertThat(cachingJdbcClient.getTableHandle(session, firstName)).isNotEmpty();
assertThat(cachingJdbcClient.getTableHandle(session, firstName, Optional.empty())).isNotEmpty();
});
assertTableHandleByNameCache(cachingJdbcClient).hits(1).afterRunning(() -> {
assertThat(cachingJdbcClient.getTableHandle(session, secondName)).isNotEmpty();
assertThat(cachingJdbcClient.getTableHandle(session, secondName, Optional.empty())).isNotEmpty();
});

// reloads tables names and schema names, but retains table handles
Expand All @@ -1021,10 +1022,10 @@ public void testSpecificSchemaAndTableCaches()
.contains(firstName, secondName);
});
assertTableHandleByNameCache(cachingJdbcClient).hits(1).afterRunning(() -> {
assertThat(cachingJdbcClient.getTableHandle(session, firstName)).isNotEmpty();
assertThat(cachingJdbcClient.getTableHandle(session, firstName, Optional.empty())).isNotEmpty();
});
assertTableHandleByNameCache(cachingJdbcClient).hits(1).afterRunning(() -> {
assertThat(cachingJdbcClient.getTableHandle(session, secondName)).isNotEmpty();
assertThat(cachingJdbcClient.getTableHandle(session, secondName, Optional.empty())).isNotEmpty();
});

jdbcClient.dropTable(SESSION, first);
Expand All @@ -1049,7 +1050,7 @@ public void testCacheOnlyStatistics()
JdbcTableHandle firstHandle = assertTableHandleByNameCache(cachingJdbcClient)
.misses(2)
.loads(1)
.calling(() -> cachingJdbcClient.getTableHandle(SESSION, phantomTable)).orElseThrow();
.calling(() -> cachingJdbcClient.getTableHandle(SESSION, phantomTable, Optional.empty())).orElseThrow();

assertStatisticsCacheStats(cachingJdbcClient)
.misses(1)
Expand All @@ -1060,7 +1061,7 @@ public void testCacheOnlyStatistics()
JdbcTableHandle secondHandle = assertTableHandleByNameCache(cachingJdbcClient)
.misses(2)
.loads(1)
.calling(() -> cachingJdbcClient.getTableHandle(SESSION, phantomTable)).orElseThrow();
.calling(() -> cachingJdbcClient.getTableHandle(SESSION, phantomTable, Optional.empty())).orElseThrow();

// Stats come from the cache
assertStatisticsCacheStats(cachingJdbcClient)
Expand All @@ -1077,7 +1078,7 @@ private JdbcTableHandle getAnyTable(String schema)
.filter(schemaTableName -> !"public".equals(schemaTableName.getTableName()))
.findAny()
.orElseThrow();
return jdbcClient.getTableHandle(SESSION, tableName).orElseThrow();
return jdbcClient.getTableHandle(SESSION, tableName, Optional.empty()).orElseThrow();
}

private JdbcColumnHandle addColumn(JdbcTableHandle tableHandle)
Expand Down
Loading
Loading