Skip to content

Delta lake tests improvements #17622

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.testng.annotations.Test;

import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -35,9 +36,9 @@ public abstract class BaseDeltaLakeCompatibility
protected final String resourcePath;
protected HiveMinioDataLake hiveMinioDataLake;

public BaseDeltaLakeCompatibility(String bucketName, String resourcePath)
public BaseDeltaLakeCompatibility(String resourcePath)
{
this.bucketName = requireNonNull(bucketName);
this.bucketName = "compatibility-test-queries-" + randomNameSuffix();
this.resourcePath = requireNonNull(resourcePath);
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class TestDeltaLakeCreateTableStatistics
extends AbstractTestQueryFramework
{
private static final String SCHEMA = "default";

private String bucketName;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ public class TestDeltaLakeDatabricksCompatibility
{
public TestDeltaLakeDatabricksCompatibility()
{
super("databricks-test-queries", "io/trino/plugin/deltalake/testing/resources/databricks/");
super("io/trino/plugin/deltalake/testing/resources/databricks/");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public class TestDeltaLakeDelete
extends AbstractTestQueryFramework
{
private static final String SCHEMA = "default";
private final String bucketName = "test-delta-lake-connector-test-" + randomNameSuffix();

private final String bucketName = "test-delta-lake-connector-test-" + randomNameSuffix();
private HiveMinioDataLake hiveMinioDataLake;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG;
import static io.trino.spi.connector.Constraint.alwaysTrue;
import static io.trino.testing.DataProviders.toDataProvider;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.tpch.TpchTable.LINE_ITEM;
import static io.trino.tpch.TpchTable.ORDERS;
import static java.lang.String.format;
Expand All @@ -65,16 +66,15 @@
public class TestDeltaLakeDynamicFiltering
extends AbstractTestQueryFramework
{
private static final String BUCKET_NAME = "delta-lake-test-dynamic-filtering";

private final String bucketName = "delta-lake-test-dynamic-filtering-" + randomNameSuffix();
private HiveMinioDataLake hiveMinioDataLake;

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
verify(new DynamicFilterConfig().isEnableDynamicFiltering(), "this class assumes dynamic filtering is enabled by default");
hiveMinioDataLake = closeAfterClass(new HiveMinioDataLake(BUCKET_NAME));
hiveMinioDataLake = closeAfterClass(new HiveMinioDataLake(bucketName));
hiveMinioDataLake.start();

QueryRunner queryRunner = DeltaLakeQueryRunner.createS3DeltaLakeQueryRunner(
Expand All @@ -91,7 +91,7 @@ protected QueryRunner createQueryRunner()
DELTA_CATALOG,
"default",
tableName,
BUCKET_NAME));
bucketName));
});
return queryRunner;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,14 @@
public class TestDeltaLakeFlushMetadataCacheProcedure
extends AbstractTestQueryFramework
{
private static final String BUCKET_NAME = "delta-lake-test-flush-metadata-cache";

private final String bucketName = "delta-lake-test-flush-metadata-cache-" + randomNameSuffix();
private HiveMetastore metastore;

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
HiveMinioDataLake hiveMinioDataLake = new HiveMinioDataLake(BUCKET_NAME, HIVE3_IMAGE);
HiveMinioDataLake hiveMinioDataLake = new HiveMinioDataLake(bucketName, HIVE3_IMAGE);
hiveMinioDataLake.start();
metastore = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
Expand All @@ -67,7 +66,7 @@ public void tearDown()
@Test
public void testFlushMetadataCache()
{
assertUpdate("CREATE SCHEMA cached WITH (location = 's3://" + BUCKET_NAME + "/cached')");
assertUpdate("CREATE SCHEMA cached WITH (location = 's3://" + bucketName + "/cached')");
assertUpdate("CREATE TABLE cached.cached AS SELECT * FROM tpch.tiny.nation", 25);

// Verify that column cache is flushed
Expand Down Expand Up @@ -113,7 +112,7 @@ public void testFlushMetadataCacheAfterTableCreated()
String tableName = "flush_metadata_after_table_created";
String intermediateTableName = "test_flush_intermediate_" + randomNameSuffix();

String location = "s3://%s/%s".formatted(BUCKET_NAME, intermediateTableName);
String location = "s3://%s/%s".formatted(bucketName, intermediateTableName);
assertUpdate("CREATE TABLE " + intermediateTableName + " WITH (location = '" + location + "') AS TABLE tpch.tiny.region", 5);

// This may cause the connector to cache the fact that the table does not exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestDeltaLakeConnectorSmokeTest
/**
* Delta Lake connector smoke test exercising Hive metastore and MinIO storage.
*/
public class TestDeltaLakeMinioAndHmsConnectorSmokeTest
extends BaseDeltaLakeAwsConnectorSmokeTest
{
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapperProvider().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ public class TestDeltaLakeOssDeltaLakeCompatibility
{
public TestDeltaLakeOssDeltaLakeCompatibility()
{
super("ossdeltalake-test-queries", "io/trino/plugin/deltalake/testing/resources/ossdeltalake/");
super("io/trino/plugin/deltalake/testing/resources/ossdeltalake/");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,70 @@

import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import io.trino.testing.containers.Minio;
import org.testng.annotations.Test;

import static io.trino.SystemSessionProperties.TASK_PARTITIONED_WRITER_COUNT;
import static io.trino.SystemSessionProperties.USE_PREFERRED_WRITE_PARTITIONING;
import static io.trino.plugin.base.util.Closables.closeAllSuppress;
import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG;
import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.createS3DeltaLakeQueryRunner;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY;
import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY;
import static java.lang.String.format;
import static java.util.UUID.randomUUID;

public class TestDeltaLakePreferredPartitioning
extends AbstractTestQueryFramework
{
private static final String TEST_BUCKET_NAME = "mock-delta-lake-bucket";
private static final int WRITE_PARTITIONING_TEST_PARTITIONS_COUNT = 101;

private final String bucketName = "mock-delta-lake-bucket-" + randomNameSuffix();
protected Minio minio;

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
HiveMinioDataLake hiveMinioDataLake = closeAfterClass(new HiveMinioDataLake(TEST_BUCKET_NAME));
hiveMinioDataLake.start();
return createS3DeltaLakeQueryRunner(
DELTA_CATALOG,
"default",
ImmutableMap.of(
"delta.enable-non-concurrent-writes", "true",
"delta.max-partitions-per-writer", String.valueOf(WRITE_PARTITIONING_TEST_PARTITIONS_COUNT - 1)),
hiveMinioDataLake.getMinio().getMinioAddress(),
hiveMinioDataLake.getHiveHadoop());
minio = closeAfterClass(Minio.builder().build());
minio.start();
minio.createBucket(bucketName);

String schema = "default";
Session session = testSessionBuilder()
.setCatalog(DELTA_CATALOG)
.setSchema(schema)
.build();
DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session).build();
try {
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

queryRunner.installPlugin(new DeltaLakePlugin());
queryRunner.createCatalog(DELTA_CATALOG, DeltaLakeConnectorFactory.CONNECTOR_NAME, ImmutableMap.<String, String>builder()
.put("hive.metastore", "file")
.put("hive.metastore.catalog.dir", queryRunner.getCoordinator().getBaseDataDir().resolve("file-metastore").toString())
.put("hive.s3.aws-access-key", MINIO_ACCESS_KEY)
.put("hive.s3.aws-secret-key", MINIO_SECRET_KEY)
.put("hive.s3.endpoint", minio.getMinioAddress())
.put("hive.s3.path-style-access", "true")
.put("delta.enable-non-concurrent-writes", "true")
.put("delta.max-partitions-per-writer", String.valueOf(WRITE_PARTITIONING_TEST_PARTITIONS_COUNT - 1))
.buildOrThrow());

queryRunner.execute("CREATE SCHEMA " + schema + " WITH (location = 's3://" + bucketName + "/" + schema + "')");
}
catch (Throwable e) {
closeAllSuppress(e, queryRunner);
throw e;
}

return queryRunner;
}

@Test
Expand Down Expand Up @@ -119,9 +151,9 @@ private static String generateRandomTableName()
return "table_" + randomUUID().toString().replaceAll("-", "");
}

private static String getLocationForTable(String tableName)
private String getLocationForTable(String tableName)
{
return format("s3://%s/%s", TEST_BUCKET_NAME, tableName);
return format("s3://%s/%s", bucketName, tableName);
}

private Session withForcedPreferredPartitioning()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class TestDeltaLakeSharedHiveMetastoreWithViews
{
protected final String schema = "test_shared_schema_with_hive_views_" + randomNameSuffix();
private final String bucketName = "delta-lake-shared-hive-with-views-" + randomNameSuffix();

private HiveMinioDataLake hiveMinioDataLake;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
public class TestPredicatePushdown
extends AbstractTestQueryFramework
{
private static final String BUCKET_NAME = "delta-test-pushdown";
private static final Path RESOURCE_PATH = Path.of("databricks/pushdown/");
private static final String TEST_SCHEMA = "default";

private final String bucketName = "delta-test-pushdown-" + randomNameSuffix();
/**
* This single-file Parquet table has known row groups. See the test
* resource {@code pushdown/custkey_15rowgroups/README.md} for details.
Expand All @@ -55,7 +55,7 @@ public class TestPredicatePushdown
protected QueryRunner createQueryRunner()
throws Exception
{
hiveMinioDataLake = closeAfterClass(new HiveMinioDataLake(BUCKET_NAME));
hiveMinioDataLake = closeAfterClass(new HiveMinioDataLake(bucketName));
hiveMinioDataLake.start();
return createS3DeltaLakeQueryRunner(
DELTA_CATALOG,
Expand Down Expand Up @@ -231,7 +231,7 @@ String register(String namePrefix)
hiveMinioDataLake.copyResources(RESOURCE_PATH.resolve(resourcePath).toString(), name);
getQueryRunner().execute(format(
"CALL system.register_table(CURRENT_SCHEMA, '%2$s', 's3://%1$s/%2$s')",
BUCKET_NAME,
bucketName,
name));
return name;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,13 @@ public class FileHiveMetastore
private static final Set<String> ADMIN_USERS = ImmutableSet.of("admin", "hive", "hdfs");

// 128 is equals to the max database name length of Thrift Hive metastore
private static final int MAX_DATABASE_NAME_LENGTH = 128;
private static final int MAX_NAME_LENGTH = 128;

private final String currentVersion;
private final VersionCompatibility versionCompatibility;
private final HdfsEnvironment hdfsEnvironment;
private final Path catalogDirectory;
private final boolean disableLocationChecks;
private final HdfsContext hdfsContext;
private final boolean hideDeltaLakeTables;
private final FileSystem metadataFileSystem;
Expand All @@ -166,6 +167,7 @@ public FileHiveMetastore(NodeVersion nodeVersion, HdfsEnvironment hdfsEnvironmen
this.versionCompatibility = requireNonNull(config.getVersionCompatibility(), "config.getVersionCompatibility() is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.catalogDirectory = new Path(requireNonNull(config.getCatalogDirectory(), "catalogDirectory is null"));
this.disableLocationChecks = config.isDisableLocationChecks();
this.hdfsContext = new HdfsContext(ConnectorIdentity.ofUser(config.getMetastoreUser()));
this.hideDeltaLakeTables = hideDeltaLakeTables;
try {
Expand Down Expand Up @@ -289,8 +291,15 @@ private Database getRequiredDatabase(String databaseName)

private void verifyDatabaseNameLength(String databaseName)
{
if (databaseName.length() > MAX_DATABASE_NAME_LENGTH) {
throw new TrinoException(NOT_SUPPORTED, format("Schema name must be shorter than or equal to '%s' characters but got '%s'", MAX_DATABASE_NAME_LENGTH, databaseName.length()));
if (databaseName.length() > MAX_NAME_LENGTH) {
throw new TrinoException(NOT_SUPPORTED, format("Schema name must be shorter than or equal to '%s' characters but got '%s'", MAX_NAME_LENGTH, databaseName.length()));
}
}

private void verifyTableNameLength(String tableName)
{
if (tableName.length() > MAX_NAME_LENGTH) {
throw new TrinoException(NOT_SUPPORTED, format("Table name must be shorter than or equal to '%s' characters but got '%s'", MAX_NAME_LENGTH, tableName.length()));
}
}

Expand All @@ -312,6 +321,7 @@ public synchronized List<String> getAllDatabases()
@Override
public synchronized void createTable(Table table, PrincipalPrivileges principalPrivileges)
{
verifyTableNameLength(table.getTableName());
verifyDatabaseExists(table.getDatabaseName());
verifyTableNotExists(table.getDatabaseName(), table.getTableName());

Expand All @@ -322,7 +332,7 @@ public synchronized void createTable(Table table, PrincipalPrivileges principalP
checkArgument(table.getStorage().getLocation().isEmpty(), "Storage location for view must be empty");
}
else if (table.getTableType().equals(MANAGED_TABLE.name())) {
if (!(new Path(table.getStorage().getLocation()).toString().contains(tableMetadataDirectory.toString()))) {
if (!disableLocationChecks && !(new Path(table.getStorage().getLocation()).toString().contains(tableMetadataDirectory.toString()))) {
throw new TrinoException(HIVE_METASTORE_ERROR, "Table directory must be " + tableMetadataDirectory);
}
}
Expand Down Expand Up @@ -611,6 +621,7 @@ public synchronized void renameTable(String databaseName, String tableName, Stri
getRequiredDatabase(newDatabaseName);

// verify new table does not exist
verifyTableNameLength(newTableName);
verifyTableNotExists(newDatabaseName, newTableName);

Path oldPath = getTableMetadataDirectory(databaseName, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum VersionCompatibility

private String catalogDirectory;
private VersionCompatibility versionCompatibility = NOT_SUPPORTED;
private boolean disableLocationChecks; // TODO this should probably be true by default, to align with well-behaving metastores other than HMS
private String metastoreUser = "presto";

@NotNull
Expand Down Expand Up @@ -63,6 +64,18 @@ public FileHiveMetastoreConfig setVersionCompatibility(VersionCompatibility vers
return this;
}

public boolean isDisableLocationChecks()
{
return disableLocationChecks;
}

@Config("hive.metastore.disable-location-checks")
public FileHiveMetastoreConfig setDisableLocationChecks(boolean disableLocationChecks)
{
this.disableLocationChecks = disableLocationChecks;
return this;
}

@NotNull
public String getMetastoreUser()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8780,13 +8780,13 @@ protected void verifySchemaNameLengthFailurePermissible(Throwable e)
protected OptionalInt maxTableNameLength()
{
// This value depends on metastore type
return OptionalInt.of(255);
return OptionalInt.of(128);
}

@Override
protected void verifyTableNameLengthFailurePermissible(Throwable e)
{
assertThat(e).hasMessageMatching("Failed to create directory.*|Could not rename table directory");
assertThat(e).hasMessageMatching("Table name must be shorter than or equal to '128' characters but got .*");
}

private Session withTimestampPrecision(Session session, HiveTimestampPrecision precision)
Expand Down
Loading