From 5917549981f9e700d287f50d88dff919629877a7 Mon Sep 17 00:00:00 2001 From: Tony Baeg Date: Thu, 12 Dec 2024 08:28:53 -0500 Subject: [PATCH] Use executor service for iceberg scan planning system tables --- .../plugin/iceberg/AllManifestsTable.java | 6 ++++-- .../trino/plugin/iceberg/BaseSystemTable.java | 7 +++++-- .../io/trino/plugin/iceberg/FilesTable.java | 8 ++++++-- .../trino/plugin/iceberg/IcebergMetadata.java | 18 +++++++++++------- .../plugin/iceberg/IcebergMetadataFactory.java | 7 ++++++- .../iceberg/MetadataLogEntriesTable.java | 7 +++++-- .../trino/plugin/iceberg/PartitionsTable.java | 8 ++++++-- .../io/trino/plugin/iceberg/RefsTable.java | 6 ++++-- .../trino/plugin/iceberg/SnapshotsTable.java | 6 ++++-- .../iceberg/catalog/BaseTrinoCatalogTest.java | 4 +++- .../catalog/glue/TestTrinoGlueCatalog.java | 4 +++- .../catalog/nessie/TestTrinoNessieCatalog.java | 4 +++- .../catalog/rest/TestTrinoRestCatalog.java | 4 +++- .../snowflake/TestTrinoSnowflakeCatalog.java | 4 +++- 14 files changed, 66 insertions(+), 27 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/AllManifestsTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/AllManifestsTable.java index 1f2cc27e4270..59c6381f0ee9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/AllManifestsTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/AllManifestsTable.java @@ -27,6 +27,7 @@ import org.apache.iceberg.Table; import java.util.List; +import java.util.concurrent.ExecutorService; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; @@ -38,7 +39,7 @@ public class AllManifestsTable extends BaseSystemTable { - public AllManifestsTable(SchemaTableName tableName, Table icebergTable) + public AllManifestsTable(SchemaTableName tableName, Table icebergTable, ExecutorService executor) { super(requireNonNull(icebergTable, "icebergTable is null"), new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), ImmutableList.builder() @@ -55,7 +56,8 @@ public AllManifestsTable(SchemaTableName tableName, Table icebergTable) RowType.field("lower_bound", VARCHAR), RowType.field("upper_bound", VARCHAR))))) .build()), - ALL_MANIFESTS); + ALL_MANIFESTS, + executor); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/BaseSystemTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/BaseSystemTable.java index 07c5517548a4..adc5e5523ed4 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/BaseSystemTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/BaseSystemTable.java @@ -36,6 +36,7 @@ import java.io.UncheckedIOException; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.Maps.immutableEntry; @@ -49,12 +50,14 @@ public abstract class BaseSystemTable private final Table icebergTable; private final ConnectorTableMetadata tableMetadata; private final MetadataTableType metadataTableType; + private final ExecutorService executor; - BaseSystemTable(Table icebergTable, ConnectorTableMetadata tableMetadata, MetadataTableType metadataTableType) + BaseSystemTable(Table icebergTable, ConnectorTableMetadata tableMetadata, MetadataTableType metadataTableType, ExecutorService executor) { this.icebergTable = requireNonNull(icebergTable, "icebergTable is null"); this.tableMetadata = requireNonNull(tableMetadata, "tableMetadata is null"); this.metadataTableType = requireNonNull(metadataTableType, "metadataTableType is null"); + this.executor = requireNonNull(executor, "executor is null"); } @Override @@ -79,7 +82,7 @@ private List buildPages(ConnectorTableMetadata tableMetadata, ConnectorSes { PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata); - TableScan tableScan = createMetadataTableInstance(icebergTable, metadataTableType).newScan(); + TableScan tableScan = createMetadataTableInstance(icebergTable, metadataTableType).newScan().planWith(executor); TimeZoneKey timeZoneKey = session.getTimeZoneKey(); Map columnNameToPosition = mapWithIndex(tableScan.schema().columns().stream(), diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/FilesTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/FilesTable.java index 8db10ff71d30..715941c12f1f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/FilesTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/FilesTable.java @@ -65,6 +65,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutorService; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -117,8 +118,9 @@ public class FilesTable private final Optional partitionColumnType; private final Map idToPrimitiveTypeMapping; private final List primitiveFields; + private final ExecutorService executor; - public FilesTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable, Optional snapshotId) + public FilesTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable, Optional snapshotId, ExecutorService executor) { this.icebergTable = requireNonNull(icebergTable, "icebergTable is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); @@ -152,6 +154,7 @@ public FilesTable(SchemaTableName tableName, TypeManager typeManager, Table iceb tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), columns.build()); this.snapshotId = requireNonNull(snapshotId, "snapshotId is null"); + this.executor = requireNonNull(executor, "executor is null"); } @Override @@ -180,7 +183,8 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect TableScan tableScan = createMetadataTableInstance(icebergTable, FILES) .newScan() .useSnapshot(snapshotId.get()) - .includeColumnStats(); + .includeColumnStats() + .planWith(executor); Map columnNameToPosition = mapWithIndex(tableScan.schema().columns().stream(), (column, position) -> immutableEntry(column.name(), Long.valueOf(position).intValue())) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 06a97bf87d0c..2a6af35d5c1d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -200,6 +200,7 @@ import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -405,6 +406,7 @@ public class IcebergMetadata private final Optional metastoreFactory; private final boolean addFilesProcedureEnabled; private final Predicate allowedExtraProperties; + private final ExecutorService executor; private final Map> tableStatisticsCache = new ConcurrentHashMap<>(); @@ -420,7 +422,8 @@ public IcebergMetadata( TableStatisticsWriter tableStatisticsWriter, Optional metastoreFactory, boolean addFilesProcedureEnabled, - Predicate allowedExtraProperties) + Predicate allowedExtraProperties, + ExecutorService executor) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.trinoCatalogHandle = requireNonNull(trinoCatalogHandle, "trinoCatalogHandle is null"); @@ -431,6 +434,7 @@ public IcebergMetadata( this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null"); this.addFilesProcedureEnabled = addFilesProcedureEnabled; this.allowedExtraProperties = requireNonNull(allowedExtraProperties, "allowedExtraProperties is null"); + this.executor = requireNonNull(executor, "executor is null"); } @Override @@ -690,14 +694,14 @@ private Optional getRawSystemTable(ConnectorSession session, Schema return switch (tableType) { case DATA, MATERIALIZED_VIEW_STORAGE -> throw new VerifyException("Unexpected table type: " + tableType); // Handled above. case HISTORY -> Optional.of(new HistoryTable(tableName, table)); - case METADATA_LOG_ENTRIES -> Optional.of(new MetadataLogEntriesTable(tableName, table)); - case SNAPSHOTS -> Optional.of(new SnapshotsTable(tableName, typeManager, table)); - case PARTITIONS -> Optional.of(new PartitionsTable(tableName, typeManager, table, getCurrentSnapshotId(table))); - case ALL_MANIFESTS -> Optional.of(new AllManifestsTable(tableName, table)); + case METADATA_LOG_ENTRIES -> Optional.of(new MetadataLogEntriesTable(tableName, table, executor)); + case SNAPSHOTS -> Optional.of(new SnapshotsTable(tableName, typeManager, table, executor)); + case PARTITIONS -> Optional.of(new PartitionsTable(tableName, typeManager, table, getCurrentSnapshotId(table), executor)); + case ALL_MANIFESTS -> Optional.of(new AllManifestsTable(tableName, table, executor)); case MANIFESTS -> Optional.of(new ManifestsTable(tableName, table, getCurrentSnapshotId(table))); - case FILES -> Optional.of(new FilesTable(tableName, typeManager, table, getCurrentSnapshotId(table))); + case FILES -> Optional.of(new FilesTable(tableName, typeManager, table, getCurrentSnapshotId(table), executor)); case PROPERTIES -> Optional.of(new PropertiesTable(tableName, table)); - case REFS -> Optional.of(new RefsTable(tableName, table)); + case REFS -> Optional.of(new RefsTable(tableName, table, executor)); }; } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java index 91bf8e518eb2..023952f6e780 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java @@ -25,6 +25,7 @@ import io.trino.spi.type.TypeManager; import java.util.Optional; +import java.util.concurrent.ExecutorService; import java.util.function.Predicate; import static java.util.Objects.requireNonNull; @@ -40,6 +41,7 @@ public class IcebergMetadataFactory private final Optional metastoreFactory; private final boolean addFilesProcedureEnabled; private final Predicate allowedExtraProperties; + private final ExecutorService executor; @Inject public IcebergMetadataFactory( @@ -50,6 +52,7 @@ public IcebergMetadataFactory( IcebergFileSystemFactory fileSystemFactory, TableStatisticsWriter tableStatisticsWriter, @RawHiveMetastoreFactory Optional metastoreFactory, + @ForIcebergScanPlanning ExecutorService executor, IcebergConfig config) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); @@ -59,6 +62,7 @@ public IcebergMetadataFactory( this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.tableStatisticsWriter = requireNonNull(tableStatisticsWriter, "tableStatisticsWriter is null"); this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null"); + this.executor = requireNonNull(executor, "executor is null"); this.addFilesProcedureEnabled = config.isAddFilesProcedureEnabled(); if (config.getAllowedExtraProperties().equals(ImmutableList.of("*"))) { this.allowedExtraProperties = _ -> true; @@ -79,6 +83,7 @@ public IcebergMetadata create(ConnectorIdentity identity) tableStatisticsWriter, metastoreFactory, addFilesProcedureEnabled, - allowedExtraProperties); + allowedExtraProperties, + executor); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/MetadataLogEntriesTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/MetadataLogEntriesTable.java index 871555bba1bf..fd6de0b40bb6 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/MetadataLogEntriesTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/MetadataLogEntriesTable.java @@ -21,6 +21,8 @@ import io.trino.spi.type.TimeZoneKey; import org.apache.iceberg.Table; +import java.util.concurrent.ExecutorService; + import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; @@ -38,12 +40,13 @@ public class MetadataLogEntriesTable private static final String LATEST_SCHEMA_ID_COLUMN_NAME = "latest_schema_id"; private static final String LATEST_SEQUENCE_NUMBER_COLUMN_NAME = "latest_sequence_number"; - public MetadataLogEntriesTable(SchemaTableName tableName, Table icebergTable) + public MetadataLogEntriesTable(SchemaTableName tableName, Table icebergTable, ExecutorService executor) { super( requireNonNull(icebergTable, "icebergTable is null"), createConnectorTableMetadata(requireNonNull(tableName, "tableName is null")), - METADATA_LOG_ENTRIES); + METADATA_LOG_ENTRIES, + executor); } private static ConnectorTableMetadata createConnectorTableMetadata(SchemaTableName tableName) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionsTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionsTable.java index d2ff9dcc783d..e421d5334337 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionsTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionsTable.java @@ -46,6 +46,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.stream.Stream; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -75,8 +76,9 @@ public class PartitionsTable private final List columnMetricTypes; private final List resultTypes; private final ConnectorTableMetadata connectorTableMetadata; + private final ExecutorService executor; - public PartitionsTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable, Optional snapshotId) + public PartitionsTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable, Optional snapshotId, ExecutorService executor) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.icebergTable = requireNonNull(icebergTable, "icebergTable is null"); @@ -120,6 +122,7 @@ public PartitionsTable(SchemaTableName tableName, TypeManager typeManager, Table .map(ColumnMetadata::getType) .collect(toImmutableList()); this.connectorTableMetadata = new ConnectorTableMetadata(tableName, columnMetadata); + this.executor = requireNonNull(executor, "executor is null"); } @Override @@ -202,7 +205,8 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect } TableScan tableScan = icebergTable.newScan() .useSnapshot(snapshotId.get()) - .includeColumnStats(); + .includeColumnStats() + .planWith(executor); // TODO make the cursor lazy return buildRecordCursor(getStatisticsByPartition(tableScan)); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RefsTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RefsTable.java index 222838daa4d7..38f105281560 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RefsTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RefsTable.java @@ -22,6 +22,7 @@ import org.apache.iceberg.Table; import java.util.List; +import java.util.concurrent.ExecutorService; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.IntegerType.INTEGER; @@ -41,12 +42,13 @@ public class RefsTable .add(new ColumnMetadata("max_snapshot_age_in_ms", BIGINT)) .build(); - public RefsTable(SchemaTableName tableName, Table icebergTable) + public RefsTable(SchemaTableName tableName, Table icebergTable, ExecutorService executor) { super( requireNonNull(icebergTable, "icebergTable is null"), new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), COLUMNS), - REFS); + REFS, + executor); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/SnapshotsTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/SnapshotsTable.java index 4d50af358b6a..74204e6b4b2b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/SnapshotsTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/SnapshotsTable.java @@ -24,6 +24,7 @@ import org.apache.iceberg.Table; import java.util.Map; +import java.util.concurrent.ExecutorService; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; @@ -42,14 +43,15 @@ public class SnapshotsTable private static final String MANIFEST_LIST_COLUMN_NAME = "manifest_list"; private static final String SUMMARY_COLUMN_NAME = "summary"; - public SnapshotsTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable) + public SnapshotsTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable, ExecutorService executor) { super( requireNonNull(icebergTable, "icebergTable is null"), createConnectorTableMetadata( requireNonNull(tableName, "tableName is null"), requireNonNull(typeManager, "typeManager is null")), - SNAPSHOTS); + SNAPSHOTS, + executor); } private static ConnectorTableMetadata createConnectorTableMetadata(SchemaTableName tableName, TypeManager typeManager) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java index 2b1a1534c058..82254efdb85a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java @@ -48,6 +48,7 @@ import java.util.Optional; import java.util.UUID; +import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; import static io.airlift.json.JsonCodec.jsonCodec; import static io.trino.metastore.TableInfo.ExtendedRelationType.TABLE; import static io.trino.metastore.TableInfo.ExtendedRelationType.TRINO_VIEW; @@ -120,7 +121,8 @@ public void testNonLowercaseNamespace() new TableStatisticsWriter(new NodeVersion("test-version")), Optional.empty(), false, - _ -> false); + _ -> false, + newDirectExecutorService()); assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)") .isFalse(); assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)") diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java index 42b0ec53f06f..ca22eb99f0b8 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java @@ -51,6 +51,7 @@ import java.util.Optional; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; import static io.airlift.json.JsonCodec.jsonCodec; import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET; @@ -139,7 +140,8 @@ public void testNonLowercaseGlueDatabase() new TableStatisticsWriter(new NodeVersion("test-version")), Optional.empty(), false, - _ -> false); + _ -> false, + newDirectExecutorService()); assertThat(icebergMetadata.schemaExists(SESSION, databaseName)).as("icebergMetadata.schemaExists(databaseName)") .isFalse(); assertThat(icebergMetadata.schemaExists(SESSION, trinoSchemaName)).as("icebergMetadata.schemaExists(trinoSchemaName)") diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java index 38328611e8d4..2046ec69b861 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java @@ -46,6 +46,7 @@ import java.util.Map; import java.util.Optional; +import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; import static io.airlift.json.JsonCodec.jsonCodec; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; @@ -189,7 +190,8 @@ public void testNonLowercaseNamespace() new TableStatisticsWriter(new NodeVersion("test-version")), Optional.empty(), false, - _ -> false); + _ -> false, + newDirectExecutorService()); assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)") .isTrue(); assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)") diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java index 3a2240105c30..6e75f3aa8b78 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java @@ -45,6 +45,7 @@ import java.util.Map; import java.util.Optional; +import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; import static io.airlift.json.JsonCodec.jsonCodec; import static io.trino.metastore.TableInfo.ExtendedRelationType.OTHER_VIEW; import static io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogConfig.SessionType.NONE; @@ -127,7 +128,8 @@ public void testNonLowercaseNamespace() new TableStatisticsWriter(new NodeVersion("test-version")), Optional.empty(), false, - _ -> false); + _ -> false, + newDirectExecutorService()); assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)") .isTrue(); assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)") diff --git a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java index 9f3e0d7ef609..d445512ed2ae 100644 --- a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java +++ b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java @@ -58,6 +58,7 @@ import java.util.Map; import java.util.Optional; +import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; import static io.airlift.json.JsonCodec.jsonCodec; import static io.trino.plugin.iceberg.catalog.snowflake.TestIcebergSnowflakeCatalogConnectorSmokeTest.S3_ACCESS_KEY; import static io.trino.plugin.iceberg.catalog.snowflake.TestIcebergSnowflakeCatalogConnectorSmokeTest.S3_REGION; @@ -223,7 +224,8 @@ public void testNonLowercaseNamespace() new TableStatisticsWriter(new NodeVersion("test-version")), Optional.empty(), false, - _ -> false); + _ -> false, + newDirectExecutorService()); assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)") .isTrue(); assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")