Skip to content

Commit

Permalink
[apache#2541] feat(spark-connector): support basic DDL and DML operat…
Browse files Browse the repository at this point in the history
…ions to iceberg catalog (apache#2544)

### What changes were proposed in this pull request?

1. support DDL  operations to iceberg catalog.
2. support read and write operations to iceberg Table.

### Why are the changes needed?

support basic DDL and DML operations for iceberg table using sparksql.

Fix: apache#2541

### Does this PR introduce _any_ user-facing change?

Yes, users can use sparksql to do iceberg table ddl and read&write
operations.

### How was this patch tested?

New Iceberg ITs.
  • Loading branch information
caican00 authored Apr 2, 2024
1 parent 4411459 commit 46ebaf6
Show file tree
Hide file tree
Showing 12 changed files with 368 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
*/
@Override
public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException {
NameIdentifier schemaIdent = NameIdentifier.of(namespace.levels());
if (!schemaExists(schemaIdent)) {
throw new NoSuchSchemaException("Schema (database) does not exist %s", namespace);
}

try {
ListTablesResponse listTablesResponse =
icebergTableOps.listTable(IcebergTableOpsHelper.getIcebergNamespace(namespace));
Expand Down
1 change: 0 additions & 1 deletion integration-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ dependencies {
testImplementation(project(":server"))
testImplementation(project(":server-common"))
testImplementation(project(":spark-connector")) {
exclude("org.apache.iceberg")
exclude("org.apache.hadoop", "hadoop-client-api")
exclude("org.apache.hadoop", "hadoop-client-runtime")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,33 @@
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo.SparkColumnInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.platform.commons.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class SparkCommonIT extends SparkEnvIT {
private static final Logger LOG = LoggerFactory.getLogger(SparkCommonIT.class);

// To generate test data for write&read table.
protected static final Map<DataType, String> typeConstant =
Expand Down Expand Up @@ -61,14 +66,36 @@ private static String getInsertWithPartitionSql(
// Whether supports [CLUSTERED BY col_name3 SORTED BY col_name INTO num_buckets BUCKETS]
protected abstract boolean supportsSparkSQLClusteredBy();

// Use a custom database not the original default database because SparkIT couldn't read&write
// data to tables in default database. The main reason is default database location is
protected abstract boolean supportsPartition();

// Use a custom database not the original default database because SparkCommonIT couldn't
// read&write data to tables in default database. The main reason is default database location is
// determined by `hive.metastore.warehouse.dir` in hive-site.xml which is local HDFS address
// not real HDFS address. The location of tables created under default database is like
// hdfs://localhost:9000/xxx which couldn't read write data from SparkCommonIT. Will use default
// database after spark connector support Alter database xx set location command.
@BeforeAll
void initDefaultDatabase() {
void initDefaultDatabase() throws IOException {
// In embedded mode, derby acts as the backend database for the hive metastore
// and creates a directory named metastore_db to store metadata,
// supporting only one connection at a time.
// Previously, only SparkHiveCatalogIT accessed derby without any exceptions.
// Now, SparkIcebergCatalogIT exists at the same time.
// This exception about `ERROR XSDB6: Another instance of Derby may have already
// booted the database {GRAVITINO_HOME}/integration-test/metastore_db` will occur when
// SparkIcebergCatalogIT is initialized after the Sparkhivecatalogit is executed.
// The main reason is that the lock file in the metastore_db directory is not cleaned so that a
// new connection cannot be created,
// so a clean operation is done here to ensure that a new connection can be created.
File hiveLocalMetaStorePath = new File("metastore_db");
try {
if (hiveLocalMetaStorePath.exists()) {
FileUtils.deleteDirectory(hiveLocalMetaStorePath);
}
} catch (IOException e) {
LOG.error(String.format("delete director %s failed.", hiveLocalMetaStorePath), e);
throw e;
}
sql("USE " + getCatalogName());
createDatabaseIfNotExists(getDefaultDatabase());
}
Expand All @@ -79,6 +106,26 @@ void init() {
sql("USE " + getDefaultDatabase());
}

@AfterAll
void cleanUp() {
sql("USE " + getCatalogName());
getDatabases()
.forEach(database -> sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", database)));
}

@Test
void testListTables() {
String tableName = "t_list";
dropTableIfExists(tableName);
Set<String> tableNames = listTableNames();
Assertions.assertFalse(tableNames.contains(tableName));
createSimpleTable(tableName);
tableNames = listTableNames();
Assertions.assertTrue(tableNames.contains(tableName));
Assertions.assertThrowsExactly(
NoSuchNamespaceException.class, () -> sql("SHOW TABLES IN nonexistent_schema"));
}

@Test
void testLoadCatalogs() {
Set<String> catalogs = getCatalogs();
Expand All @@ -89,20 +136,20 @@ void testLoadCatalogs() {
void testCreateAndLoadSchema() {
String testDatabaseName = "t_create1";
dropDatabaseIfExists(testDatabaseName);
sql("CREATE DATABASE " + testDatabaseName);
sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES (ID=001);");
Map<String, String> databaseMeta = getDatabaseMetadata(testDatabaseName);
Assertions.assertFalse(databaseMeta.containsKey("Comment"));
Assertions.assertTrue(databaseMeta.containsKey("Location"));
Assertions.assertEquals("datastrato", databaseMeta.get("Owner"));
String properties = databaseMeta.get("Properties");
Assertions.assertTrue(StringUtils.isBlank(properties));
Assertions.assertTrue(properties.contains("(ID,001)"));

testDatabaseName = "t_create2";
dropDatabaseIfExists(testDatabaseName);
String testDatabaseLocation = "/tmp/" + testDatabaseName;
sql(
String.format(
"CREATE DATABASE %s COMMENT 'comment' LOCATION '%s'\n" + " WITH DBPROPERTIES (ID=001);",
"CREATE DATABASE %s COMMENT 'comment' LOCATION '%s'\n" + " WITH DBPROPERTIES (ID=002);",
testDatabaseName, testDatabaseLocation));
databaseMeta = getDatabaseMetadata(testDatabaseName);
String comment = databaseMeta.get("Comment");
Expand All @@ -111,19 +158,22 @@ void testCreateAndLoadSchema() {
// underlying catalog may change /tmp/t_create2 to file:/tmp/t_create2
Assertions.assertTrue(databaseMeta.get("Location").contains(testDatabaseLocation));
properties = databaseMeta.get("Properties");
Assertions.assertEquals("((ID,001))", properties);
Assertions.assertTrue(properties.contains("(ID,002)"));
}

@Test
void testAlterSchema() {
String testDatabaseName = "t_alter";
dropDatabaseIfExists(testDatabaseName);
sql("CREATE DATABASE " + testDatabaseName);
sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES (ID=001);");
Assertions.assertTrue(
StringUtils.isBlank(getDatabaseMetadata(testDatabaseName).get("Properties")));
getDatabaseMetadata(testDatabaseName).get("Properties").contains("(ID,001)"));

sql(String.format("ALTER DATABASE %s SET DBPROPERTIES ('ID'='001')", testDatabaseName));
Assertions.assertEquals("((ID,001))", getDatabaseMetadata(testDatabaseName).get("Properties"));
sql(String.format("ALTER DATABASE %s SET DBPROPERTIES ('ID'='002')", testDatabaseName));
Assertions.assertFalse(
getDatabaseMetadata(testDatabaseName).get("Properties").contains("(ID,001)"));
Assertions.assertTrue(
getDatabaseMetadata(testDatabaseName).get("Properties").contains("(ID,002)"));

// Hive metastore doesn't support alter database location, therefore this test method
// doesn't verify ALTER DATABASE database_name SET LOCATION 'new_location'.
Expand Down Expand Up @@ -334,9 +384,9 @@ void testAlterTableUpdateColumnType() {
checkTableColumns(tableName, simpleTableColumns, getTableInfo(tableName));

sql(String.format("ALTER TABLE %S ADD COLUMNS (col1 int)", tableName));
sql(String.format("ALTER TABLE %S CHANGE COLUMN col1 col1 string", tableName));
sql(String.format("ALTER TABLE %S CHANGE COLUMN col1 col1 bigint", tableName));
ArrayList<SparkColumnInfo> updateColumns = new ArrayList<>(simpleTableColumns);
updateColumns.add(SparkColumnInfo.of("col1", DataTypes.StringType, null));
updateColumns.add(SparkColumnInfo.of("col1", DataTypes.LongType, null));
checkTableColumns(tableName, updateColumns, getTableInfo(tableName));
}

Expand All @@ -354,7 +404,7 @@ void testAlterTableRenameColumn() {
sql(String.format("ALTER TABLE %S ADD COLUMNS (col1 int)", tableName));
sql(
String.format(
"ALTER TABLE %S RENAME COLUMN %S TO %S", tableName, oldColumnName, newColumnName));
"ALTER TABLE %s RENAME COLUMN %s TO %s", tableName, oldColumnName, newColumnName));
ArrayList<SparkColumnInfo> renameColumns = new ArrayList<>(simpleTableColumns);
renameColumns.add(SparkColumnInfo.of(newColumnName, DataTypes.IntegerType, null));
checkTableColumns(tableName, renameColumns, getTableInfo(tableName));
Expand All @@ -373,7 +423,7 @@ void testUpdateColumnPosition() {

sql(
String.format(
"CREATE TABLE %s (id STRING COMMENT '', name STRING COMMENT '', age STRING COMMENT '') USING PARQUET",
"CREATE TABLE %s (id STRING COMMENT '', name STRING COMMENT '', age STRING COMMENT '')",
tableName));
checkTableColumns(tableName, simpleTableColumns, getTableInfo(tableName));

Expand Down Expand Up @@ -456,12 +506,13 @@ void testComplexType() {
}

@Test
@EnabledIf("supportsPartition")
void testCreateDatasourceFormatPartitionTable() {
String tableName = "datasource_partition_table";

dropTableIfExists(tableName);
String createTableSQL = getCreateSimpleTableString(tableName);
createTableSQL = createTableSQL + "USING PARQUET PARTITIONED BY (name, age)";
createTableSQL = createTableSQL + " USING PARQUET PARTITIONED BY (name, age)";
sql(createTableSQL);
SparkTableInfo tableInfo = getTableInfo(tableName);
SparkTableInfoChecker checker =
Expand Down Expand Up @@ -558,6 +609,7 @@ void testInsertTableAsSelect() {
}

@Test
@EnabledIf("supportsPartition")
void testInsertDatasourceFormatPartitionTableAsSelect() {
String tableName = "insert_select_partition_table";
String newTableName = "new_" + tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.datastrato.gravitino.integration.test.container.HiveContainer;
import com.datastrato.gravitino.integration.test.util.spark.SparkUtilIT;
import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants;
import com.datastrato.gravitino.spark.connector.plugin.GravitinoSparkPlugin;
import com.google.common.collect.Maps;
import java.io.IOException;
Expand All @@ -37,6 +38,7 @@ public abstract class SparkEnvIT extends SparkUtilIT {
private SparkSession sparkSession;
private String hiveMetastoreUri = "thrift://127.0.0.1:9083";
private String gravitinoUri = "http://127.0.0.1:8090";
private String warehouse;

protected abstract String getCatalogName();

Expand Down Expand Up @@ -79,8 +81,18 @@ private void initMetalakeAndCatalogs() {
client.createMetalake(NameIdentifier.of(metalakeName), "", Collections.emptyMap());
GravitinoMetalake metalake = client.loadMetalake(NameIdentifier.of(metalakeName));
Map<String, String> properties = Maps.newHashMap();
properties.put(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI, hiveMetastoreUri);

switch (getProvider()) {
case "hive":
properties.put(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI, hiveMetastoreUri);
break;
case "lakehouse-iceberg":
properties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, "hive");
properties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, warehouse);
properties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, hiveMetastoreUri);
break;
default:
throw new IllegalArgumentException("Unsupported provider: " + getProvider());
}
metalake.createCatalog(
NameIdentifier.of(metalakeName, getCatalogName()),
Catalog.Type.RELATIONAL,
Expand All @@ -102,6 +114,11 @@ private void initHiveEnv() {
"thrift://%s:%d",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HIVE_METASTORE_PORT);
warehouse =
String.format(
"hdfs://%s:%d/user/hive/warehouse",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HDFS_DEFAULTFS_PORT);
}

private void initHdfsFileSystem() {
Expand Down Expand Up @@ -129,12 +146,7 @@ private void initSparkEnv() {
.config(GravitinoSparkConfig.GRAVITINO_URI, gravitinoUri)
.config(GravitinoSparkConfig.GRAVITINO_METALAKE, metalakeName)
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config(
"spark.sql.warehouse.dir",
String.format(
"hdfs://%s:%d/user/hive/warehouse",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HDFS_DEFAULTFS_PORT))
.config("spark.sql.warehouse.dir", warehouse)
.enableHiveSupport()
.getOrCreate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ protected boolean supportsSparkSQLClusteredBy() {
return true;
}

@Override
protected boolean supportsPartition() {
return true;
}

@Test
public void testCreateHiveFormatPartitionTable() {
String tableName = "hive_partition_table";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.integration.test.spark.iceberg;

import com.datastrato.gravitino.integration.test.spark.SparkCommonIT;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInstance;

@Tag("gravitino-docker-it")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class SparkIcebergCatalogIT extends SparkCommonIT {

@Override
protected String getCatalogName() {
return "iceberg";
}

@Override
protected String getProvider() {
return "lakehouse-iceberg";
}

@Override
protected boolean supportsSparkSQLClusteredBy() {
return false;
}

@Override
protected boolean supportsPartition() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.datastrato.gravitino.spark.connector;

import com.datastrato.gravitino.spark.connector.hive.HiveAdaptor;
import com.datastrato.gravitino.spark.connector.iceberg.IcebergAdaptor;
import java.util.Locale;

/**
Expand All @@ -17,6 +18,8 @@ public static GravitinoCatalogAdaptor createGravitinoAdaptor(String provider) {
switch (provider.toLowerCase(Locale.ROOT)) {
case "hive":
return new HiveAdaptor();
case "lakehouse-iceberg":
return new IcebergAdaptor();
default:
throw new RuntimeException(String.format("Provider:%s is not supported yet", provider));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public void close() {
Preconditions.checkState(!isClosed, "Gravitino Catalog is already closed");
isClosed = true;
gravitinoClient.close();
gravitinoCatalogManager = null;
}

public Catalog getGravitinoCatalogInfo(String name) {
Expand Down
Loading

0 comments on commit 46ebaf6

Please sign in to comment.