Skip to content

Add Spark Integration test with purge #1825

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions integration-tests/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ dependencies {
implementation("org.testcontainers:testcontainers")
implementation(libs.s3mock.testcontainers)

implementation(platform(libs.awssdk.bom))
implementation("software.amazon.awssdk:s3")

implementation("org.apache.iceberg:iceberg-spark-3.5_2.12")
implementation("org.apache.iceberg:iceberg-spark-extensions-3.5_2.12")
implementation("org.apache.spark:spark-sql_2.12:3.5.6") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.S3Exception;

@ExtendWith(PolarisIntegrationTestExtension.class)
public abstract class PolarisSparkIntegrationTestBase {
Expand All @@ -64,6 +71,7 @@ public abstract class PolarisSparkIntegrationTestBase {
protected String externalCatalogName;

protected URI warehouseDir;
protected S3Client s3Client;

@BeforeAll
public static void setup() throws IOException {
Expand All @@ -84,6 +92,16 @@ public void before(
managementApi = client.managementApi(credentials);
catalogApi = client.catalogApi(credentials);

s3Client =
S3Client.builder()
.endpointOverride(URI.create(s3Container.getHttpEndpoint()))
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create("accessKey", "secretKey")))
.region(Region.of("us-west-2"))
.forcePathStyle(true) // Required for S3Mock
.build();

warehouseDir = IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve("spark-warehouse");

catalogName = client.newEntityName("spark_catalog");
Expand Down Expand Up @@ -236,4 +254,15 @@ protected void cleanupCatalog(String catalogName) {
protected static Dataset<Row> onSpark(@Language("SQL") String sql) {
return spark.sql(sql);
}

protected boolean fileExists(String key) {
try {
s3Client.headObject(HeadObjectRequest.builder().bucket("my-bucket").key(key).build());
return true; // File exists
} catch (NoSuchKeyException e) {
return false; // File does not exist
} catch (S3Exception e) { // Handle other S3-related exceptions
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,33 @@ public void testCreateAndUpdateExternalTable() {
assertThat(rowCount).isEqualTo(4);
}

@Test
public void testPurgeTable() {
onSpark("CREATE NAMESPACE ns1");
onSpark("USE ns1");
onSpark("CREATE TABLE tb1 (col1 integer, col2 string)");
onSpark("INSERT INTO tb1 VALUES (1, 'a'), (2, 'b'), (3, 'c'), (5, 'e')");

LoadTableResponse tableResponse = loadTable(catalogName, "ns1", "tb1");
String filePath = tableResponse.metadataLocation().replaceFirst("^s3://my-bucket/", "");
assertThat(fileExists(filePath)).isTrue();

// Drop table with purge
// dropTable(catalogName, "ns1", "tb1", true);
onSpark("DROP TABLE tb1 purge");
// verify the metadata file is eventually purged
int attempt = 0;
while (fileExists(filePath) && attempt < 5) {
try {
Thread.sleep(1000);
attempt += 1;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
assertThat(fileExists(filePath)).isFalse();
}

@Test
public void testCreateView() {
long namespaceCount = onSpark("SHOW NAMESPACES").count();
Expand Down
Loading