Skip to content

Commit

Permalink
Use executor service for iceberg scan planning system tables
Browse files Browse the repository at this point in the history
  • Loading branch information
tbaeg authored and raunaqmorarka committed Dec 13, 2024
1 parent da79aed commit 5917549
Show file tree
Hide file tree
Showing 14 changed files with 66 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.Table;

import java.util.List;
import java.util.concurrent.ExecutorService;

import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
Expand All @@ -38,7 +39,7 @@
public class AllManifestsTable
extends BaseSystemTable
{
public AllManifestsTable(SchemaTableName tableName, Table icebergTable)
public AllManifestsTable(SchemaTableName tableName, Table icebergTable, ExecutorService executor)
{
super(requireNonNull(icebergTable, "icebergTable is null"),
new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), ImmutableList.<ColumnMetadata>builder()
Expand All @@ -55,7 +56,8 @@ public AllManifestsTable(SchemaTableName tableName, Table icebergTable)
RowType.field("lower_bound", VARCHAR),
RowType.field("upper_bound", VARCHAR)))))
.build()),
ALL_MANIFESTS);
ALL_MANIFESTS,
executor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Maps.immutableEntry;
Expand All @@ -49,12 +50,14 @@ public abstract class BaseSystemTable
private final Table icebergTable;
private final ConnectorTableMetadata tableMetadata;
private final MetadataTableType metadataTableType;
private final ExecutorService executor;

BaseSystemTable(Table icebergTable, ConnectorTableMetadata tableMetadata, MetadataTableType metadataTableType)
BaseSystemTable(Table icebergTable, ConnectorTableMetadata tableMetadata, MetadataTableType metadataTableType, ExecutorService executor)
{
this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");
this.tableMetadata = requireNonNull(tableMetadata, "tableMetadata is null");
this.metadataTableType = requireNonNull(metadataTableType, "metadataTableType is null");
this.executor = requireNonNull(executor, "executor is null");
}

@Override
Expand All @@ -79,7 +82,7 @@ private List<Page> buildPages(ConnectorTableMetadata tableMetadata, ConnectorSes
{
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);

TableScan tableScan = createMetadataTableInstance(icebergTable, metadataTableType).newScan();
TableScan tableScan = createMetadataTableInstance(icebergTable, metadataTableType).newScan().planWith(executor);
TimeZoneKey timeZoneKey = session.getTimeZoneKey();

Map<String, Integer> columnNameToPosition = mapWithIndex(tableScan.schema().columns().stream(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -117,8 +118,9 @@ public class FilesTable
private final Optional<IcebergPartitionColumn> partitionColumnType;
private final Map<Integer, Type.PrimitiveType> idToPrimitiveTypeMapping;
private final List<Types.NestedField> primitiveFields;
private final ExecutorService executor;

public FilesTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable, Optional<Long> snapshotId)
public FilesTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable, Optional<Long> snapshotId, ExecutorService executor)
{
this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
Expand Down Expand Up @@ -152,6 +154,7 @@ public FilesTable(SchemaTableName tableName, TypeManager typeManager, Table iceb

tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), columns.build());
this.snapshotId = requireNonNull(snapshotId, "snapshotId is null");
this.executor = requireNonNull(executor, "executor is null");
}

@Override
Expand Down Expand Up @@ -180,7 +183,8 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
TableScan tableScan = createMetadataTableInstance(icebergTable, FILES)
.newScan()
.useSnapshot(snapshotId.get())
.includeColumnStats();
.includeColumnStats()
.planWith(executor);

Map<String, Integer> columnNameToPosition = mapWithIndex(tableScan.schema().columns().stream(),
(column, position) -> immutableEntry(column.name(), Long.valueOf(position).intValue()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -405,6 +406,7 @@ public class IcebergMetadata
private final Optional<HiveMetastoreFactory> metastoreFactory;
private final boolean addFilesProcedureEnabled;
private final Predicate<String> allowedExtraProperties;
private final ExecutorService executor;

private final Map<IcebergTableHandle, AtomicReference<TableStatistics>> tableStatisticsCache = new ConcurrentHashMap<>();

Expand All @@ -420,7 +422,8 @@ public IcebergMetadata(
TableStatisticsWriter tableStatisticsWriter,
Optional<HiveMetastoreFactory> metastoreFactory,
boolean addFilesProcedureEnabled,
Predicate<String> allowedExtraProperties)
Predicate<String> allowedExtraProperties,
ExecutorService executor)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.trinoCatalogHandle = requireNonNull(trinoCatalogHandle, "trinoCatalogHandle is null");
Expand All @@ -431,6 +434,7 @@ public IcebergMetadata(
this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null");
this.addFilesProcedureEnabled = addFilesProcedureEnabled;
this.allowedExtraProperties = requireNonNull(allowedExtraProperties, "allowedExtraProperties is null");
this.executor = requireNonNull(executor, "executor is null");
}

@Override
Expand Down Expand Up @@ -690,14 +694,14 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
return switch (tableType) {
case DATA, MATERIALIZED_VIEW_STORAGE -> throw new VerifyException("Unexpected table type: " + tableType); // Handled above.
case HISTORY -> Optional.of(new HistoryTable(tableName, table));
case METADATA_LOG_ENTRIES -> Optional.of(new MetadataLogEntriesTable(tableName, table));
case SNAPSHOTS -> Optional.of(new SnapshotsTable(tableName, typeManager, table));
case PARTITIONS -> Optional.of(new PartitionsTable(tableName, typeManager, table, getCurrentSnapshotId(table)));
case ALL_MANIFESTS -> Optional.of(new AllManifestsTable(tableName, table));
case METADATA_LOG_ENTRIES -> Optional.of(new MetadataLogEntriesTable(tableName, table, executor));
case SNAPSHOTS -> Optional.of(new SnapshotsTable(tableName, typeManager, table, executor));
case PARTITIONS -> Optional.of(new PartitionsTable(tableName, typeManager, table, getCurrentSnapshotId(table), executor));
case ALL_MANIFESTS -> Optional.of(new AllManifestsTable(tableName, table, executor));
case MANIFESTS -> Optional.of(new ManifestsTable(tableName, table, getCurrentSnapshotId(table)));
case FILES -> Optional.of(new FilesTable(tableName, typeManager, table, getCurrentSnapshotId(table)));
case FILES -> Optional.of(new FilesTable(tableName, typeManager, table, getCurrentSnapshotId(table), executor));
case PROPERTIES -> Optional.of(new PropertiesTable(tableName, table));
case REFS -> Optional.of(new RefsTable(tableName, table));
case REFS -> Optional.of(new RefsTable(tableName, table, executor));
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.spi.type.TypeManager;

import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;

import static java.util.Objects.requireNonNull;
Expand All @@ -40,6 +41,7 @@ public class IcebergMetadataFactory
private final Optional<HiveMetastoreFactory> metastoreFactory;
private final boolean addFilesProcedureEnabled;
private final Predicate<String> allowedExtraProperties;
private final ExecutorService executor;

@Inject
public IcebergMetadataFactory(
Expand All @@ -50,6 +52,7 @@ public IcebergMetadataFactory(
IcebergFileSystemFactory fileSystemFactory,
TableStatisticsWriter tableStatisticsWriter,
@RawHiveMetastoreFactory Optional<HiveMetastoreFactory> metastoreFactory,
@ForIcebergScanPlanning ExecutorService executor,
IcebergConfig config)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
Expand All @@ -59,6 +62,7 @@ public IcebergMetadataFactory(
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.tableStatisticsWriter = requireNonNull(tableStatisticsWriter, "tableStatisticsWriter is null");
this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null");
this.executor = requireNonNull(executor, "executor is null");
this.addFilesProcedureEnabled = config.isAddFilesProcedureEnabled();
if (config.getAllowedExtraProperties().equals(ImmutableList.of("*"))) {
this.allowedExtraProperties = _ -> true;
Expand All @@ -79,6 +83,7 @@ public IcebergMetadata create(ConnectorIdentity identity)
tableStatisticsWriter,
metastoreFactory,
addFilesProcedureEnabled,
allowedExtraProperties);
allowedExtraProperties,
executor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.trino.spi.type.TimeZoneKey;
import org.apache.iceberg.Table;

import java.util.concurrent.ExecutorService;

import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
Expand All @@ -38,12 +40,13 @@ public class MetadataLogEntriesTable
private static final String LATEST_SCHEMA_ID_COLUMN_NAME = "latest_schema_id";
private static final String LATEST_SEQUENCE_NUMBER_COLUMN_NAME = "latest_sequence_number";

public MetadataLogEntriesTable(SchemaTableName tableName, Table icebergTable)
public MetadataLogEntriesTable(SchemaTableName tableName, Table icebergTable, ExecutorService executor)
{
super(
requireNonNull(icebergTable, "icebergTable is null"),
createConnectorTableMetadata(requireNonNull(tableName, "tableName is null")),
METADATA_LOG_ENTRIES);
METADATA_LOG_ENTRIES,
executor);
}

private static ConnectorTableMetadata createConnectorTableMetadata(SchemaTableName tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Stream;

import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -75,8 +76,9 @@ public class PartitionsTable
private final List<RowType> columnMetricTypes;
private final List<io.trino.spi.type.Type> resultTypes;
private final ConnectorTableMetadata connectorTableMetadata;
private final ExecutorService executor;

public PartitionsTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable, Optional<Long> snapshotId)
public PartitionsTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable, Optional<Long> snapshotId, ExecutorService executor)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");
Expand Down Expand Up @@ -120,6 +122,7 @@ public PartitionsTable(SchemaTableName tableName, TypeManager typeManager, Table
.map(ColumnMetadata::getType)
.collect(toImmutableList());
this.connectorTableMetadata = new ConnectorTableMetadata(tableName, columnMetadata);
this.executor = requireNonNull(executor, "executor is null");
}

@Override
Expand Down Expand Up @@ -202,7 +205,8 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
}
TableScan tableScan = icebergTable.newScan()
.useSnapshot(snapshotId.get())
.includeColumnStats();
.includeColumnStats()
.planWith(executor);
// TODO make the cursor lazy
return buildRecordCursor(getStatisticsByPartition(tableScan));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iceberg.Table;

import java.util.List;
import java.util.concurrent.ExecutorService;

import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.IntegerType.INTEGER;
Expand All @@ -41,12 +42,13 @@ public class RefsTable
.add(new ColumnMetadata("max_snapshot_age_in_ms", BIGINT))
.build();

public RefsTable(SchemaTableName tableName, Table icebergTable)
public RefsTable(SchemaTableName tableName, Table icebergTable, ExecutorService executor)
{
super(
requireNonNull(icebergTable, "icebergTable is null"),
new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), COLUMNS),
REFS);
REFS,
executor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.Table;

import java.util.Map;
import java.util.concurrent.ExecutorService;

import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
Expand All @@ -42,14 +43,15 @@ public class SnapshotsTable
private static final String MANIFEST_LIST_COLUMN_NAME = "manifest_list";
private static final String SUMMARY_COLUMN_NAME = "summary";

public SnapshotsTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable)
public SnapshotsTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable, ExecutorService executor)
{
super(
requireNonNull(icebergTable, "icebergTable is null"),
createConnectorTableMetadata(
requireNonNull(tableName, "tableName is null"),
requireNonNull(typeManager, "typeManager is null")),
SNAPSHOTS);
SNAPSHOTS,
executor);
}

private static ConnectorTableMetadata createConnectorTableMetadata(SchemaTableName tableName, TypeManager typeManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Optional;
import java.util.UUID;

import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.trino.metastore.TableInfo.ExtendedRelationType.TABLE;
import static io.trino.metastore.TableInfo.ExtendedRelationType.TRINO_VIEW;
Expand Down Expand Up @@ -120,7 +121,8 @@ public void testNonLowercaseNamespace()
new TableStatisticsWriter(new NodeVersion("test-version")),
Optional.empty(),
false,
_ -> false);
_ -> false,
newDirectExecutorService());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isFalse();
assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Optional;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY;
import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET;
Expand Down Expand Up @@ -139,7 +140,8 @@ public void testNonLowercaseGlueDatabase()
new TableStatisticsWriter(new NodeVersion("test-version")),
Optional.empty(),
false,
_ -> false);
_ -> false,
newDirectExecutorService());
assertThat(icebergMetadata.schemaExists(SESSION, databaseName)).as("icebergMetadata.schemaExists(databaseName)")
.isFalse();
assertThat(icebergMetadata.schemaExists(SESSION, trinoSchemaName)).as("icebergMetadata.schemaExists(trinoSchemaName)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Map;
import java.util.Optional;

import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS;
Expand Down Expand Up @@ -189,7 +190,8 @@ public void testNonLowercaseNamespace()
new TableStatisticsWriter(new NodeVersion("test-version")),
Optional.empty(),
false,
_ -> false);
_ -> false,
newDirectExecutorService());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isTrue();
assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Map;
import java.util.Optional;

import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.trino.metastore.TableInfo.ExtendedRelationType.OTHER_VIEW;
import static io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogConfig.SessionType.NONE;
Expand Down Expand Up @@ -127,7 +128,8 @@ public void testNonLowercaseNamespace()
new TableStatisticsWriter(new NodeVersion("test-version")),
Optional.empty(),
false,
_ -> false);
_ -> false,
newDirectExecutorService());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isTrue();
assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.Map;
import java.util.Optional;

import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.trino.plugin.iceberg.catalog.snowflake.TestIcebergSnowflakeCatalogConnectorSmokeTest.S3_ACCESS_KEY;
import static io.trino.plugin.iceberg.catalog.snowflake.TestIcebergSnowflakeCatalogConnectorSmokeTest.S3_REGION;
Expand Down Expand Up @@ -223,7 +224,8 @@ public void testNonLowercaseNamespace()
new TableStatisticsWriter(new NodeVersion("test-version")),
Optional.empty(),
false,
_ -> false);
_ -> false,
newDirectExecutorService());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isTrue();
assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")
Expand Down

0 comments on commit 5917549

Please sign in to comment.