diff --git a/docs/apache-hive-catalog.md b/docs/apache-hive-catalog.md index ecf7a058e4b..ee05aba5bbc 100644 --- a/docs/apache-hive-catalog.md +++ b/docs/apache-hive-catalog.md @@ -42,6 +42,9 @@ The Hive catalog supports creating, updating, and deleting databases and tables When you use the Gravitino with Trino. You can pass the Trino Hive connector configuration using prefix `trino.bypass.`. For example, using `trino.bypass.hive.config.resources` to pass the `hive.config.resources` to the Gravitino Hive catalog in Trino runtime. +When you use the Gravitino with Spark. You can pass the Spark Hive connector configuration using prefix `spark.bypass.`. For example, using `spark.bypass.hive.exec.dynamic.partition.mode` to pass the `hive.exec.dynamic.partition.mode` to the Spark Hive connector in Spark runtime. + + ### Catalog operations Refer to [Manage Relational Metadata Using Gravitino](./manage-relational-metadata-using-gravitino.md#catalog-operations) for more details. diff --git a/docs/lakehouse-iceberg-catalog.md b/docs/lakehouse-iceberg-catalog.md index f33f32b97eb..a32bf3c330b 100644 --- a/docs/lakehouse-iceberg-catalog.md +++ b/docs/lakehouse-iceberg-catalog.md @@ -43,6 +43,9 @@ Any properties not defined by Gravitino with `gravitino.bypass.` prefix will pas When you use the Gravitino with Trino. You can pass the Trino Iceberg connector configuration using prefix `trino.bypass.`. For example, using `trino.bypass.iceberg.table-statistics-enabled` to pass the `iceberg.table-statistics-enabled` to the Gravitino Iceberg catalog in Trino runtime. +When you use the Gravitino with Spark. You can pass the Spark Iceberg connector configuration using prefix `spark.bypass.`. For example, using `spark.bypass.io-impl` to pass the `io-impl` to the Spark Iceberg connector in Spark runtime. + + #### JDBC catalog If you are using JDBC catalog, you must provide `jdbc-user`, `jdbc-password` and `jdbc-driver` to catalog properties. diff --git a/docs/spark-connector/spark-catalog-hive.md b/docs/spark-connector/spark-catalog-hive.md index 5e38056cac9..733a3bedefa 100644 --- a/docs/spark-connector/spark-catalog-hive.md +++ b/docs/spark-connector/spark-catalog-hive.md @@ -51,3 +51,19 @@ INSERT OVERWRITE TABLE employees PARTITION(department='Marketing') VALUES (3, 'M SELECT * FROM employees WHERE department = 'Engineering'; ``` + + +## Catalog properties + +Gravitino spark connector will transform below property names which are defined in catalog properties to Spark Hive connector configuration. + +| Property name in Gravitino catalog properties | Spark Hive connector configuration | Description | Since Version | +|-----------------------------------------------|------------------------------------|----------------------------|---------------| +| `metastore.uris` | `hive.metastore.uris` | Hive metastore uri address | 0.5.0 | + +Gravitino catalog property names with the prefix `spark.bypass.` are passed to Spark Hive connector. For example, using `spark.bypass.hive.exec.dynamic.partition.mode` to pass the `hive.exec.dynamic.partition.mode` to the Spark Hive connector. + + +:::caution +When using the `spark-sql` shell client, you must explicitly set the `spark.bypass.spark.sql.hive.metastore.jars` in the Gravitino Hive catalog properties. Replace the default `builtin` value with the appropriate setting for your setup. +::: \ No newline at end of file diff --git a/docs/spark-connector/spark-catalog-iceberg.md b/docs/spark-connector/spark-catalog-iceberg.md index a50ed4fffe5..b18c14f937f 100644 --- a/docs/spark-connector/spark-catalog-iceberg.md +++ b/docs/spark-connector/spark-catalog-iceberg.md @@ -58,3 +58,17 @@ VALUES SELECT * FROM employee WHERE date(hire_date) = '2021-01-01' ``` + +## Catalog properties + +Gravitino spark connector will transform below property names which are defined in catalog properties to Spark Iceberg connector configuration. + +| Gravitino catalog property name | Spark Iceberg connector configuration | Description | Since Version | +|---------------------------------|---------------------------------------|---------------------------|---------------| +| `catalog-backend` | `type` | Catalog backend type | 0.5.0 | +| `uri` | `uri` | Catalog backend uri | 0.5.0 | +| `warehouse` | `warehouse` | Catalog backend warehouse | 0.5.0 | +| `jdbc-user` | `jdbc.user` | JDBC user name | 0.5.0 | +| `jdbc-password` | `jdbc.password` | JDBC password | 0.5.0 | + +Gravitino catalog property names with the prefix `spark.bypass.` are passed to Spark Iceberg connector. For example, using `spark.bypass.io-impl` to pass the `io-impl` to the Spark Iceberg connector. diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java index fdcb916c41b..73722a05043 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java @@ -5,11 +5,74 @@ package com.datastrato.gravitino.spark.connector; +import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; import java.util.Map; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; -/** Transform table properties between Gravitino and Spark. */ +/** Interface for transforming properties between Gravitino and Spark. */ public interface PropertiesConverter { + @VisibleForTesting String SPARK_PROPERTY_PREFIX = "spark.bypass."; + + /** + * Converts properties from application provided properties and Gravitino catalog properties to + * Spark connector properties. + * + *

It provides the common implementation, include extract properties with "spark.bypass" + * prefix, merge user provided options and transformed properties. + * + * @param options Case-insensitive properties map provided by application configuration. + * @param properties Gravitino catalog properties. + * @return properties for the Spark connector. + */ + default Map toSparkCatalogProperties( + CaseInsensitiveStringMap options, Map properties) { + Map all = new HashMap<>(); + if (properties != null) { + properties.forEach( + (k, v) -> { + if (k.startsWith(SPARK_PROPERTY_PREFIX)) { + String newKey = k.substring(SPARK_PROPERTY_PREFIX.length()); + all.put(newKey, v); + } + }); + } + + Map transformedProperties = toSparkCatalogProperties(properties); + if (transformedProperties != null) { + all.putAll(transformedProperties); + } + + if (options != null) { + all.putAll(options); + } + return all; + } + + /** + * Transform properties from Gravitino catalog properties to Spark connector properties. + * + *

This interface focuses on the catalog specific transform logic, the common logic are + * implemented in {@code toSparkCatalogProperties}. + * + * @param properties Gravitino catalog properties. + * @return properties for the Spark connector. + */ + Map toSparkCatalogProperties(Map properties); + + /** + * Converts Spark table properties to Gravitino table properties. + * + * @param properties Spark table properties. + * @return Gravitino table properties. + */ Map toGravitinoTableProperties(Map properties); + /** + * Converts Gravitino table properties to Spark table properties. + * + * @param properties Gravitino table properties. + * @return Spark table properties. + */ Map toSparkTableProperties(Map properties); } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java index 6ffca1ff9f4..92400437983 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java @@ -6,15 +6,11 @@ package com.datastrato.gravitino.spark.connector.hive; import com.datastrato.gravitino.rel.Table; -import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig; import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.datastrato.gravitino.spark.connector.SparkTransformConverter; import com.datastrato.gravitino.spark.connector.catalog.BaseCatalog; import com.datastrato.gravitino.spark.connector.table.SparkBaseTable; -import com.google.common.base.Preconditions; -import java.util.HashMap; import java.util.Map; -import org.apache.commons.lang3.StringUtils; import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -25,17 +21,9 @@ public class GravitinoHiveCatalog extends BaseCatalog { @Override protected TableCatalog createAndInitSparkCatalog( String name, CaseInsensitiveStringMap options, Map properties) { - Preconditions.checkArgument(properties != null, "Hive Catalog properties should not be null"); - String metastoreUri = properties.get(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI); - Preconditions.checkArgument( - StringUtils.isNotBlank(metastoreUri), - "Couldn't get " - + GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI - + " from hive catalog properties"); - TableCatalog hiveCatalog = new HiveTableCatalog(); - HashMap all = new HashMap<>(options); - all.put(GravitinoSparkConfig.SPARK_HIVE_METASTORE_URI, metastoreUri); + Map all = + getPropertiesConverter().toSparkCatalogProperties(options, properties); hiveCatalog.initialize(name, new CaseInsensitiveStringMap(all)); return hiveCatalog; @@ -54,7 +42,7 @@ protected SparkBaseTable createSparkTable( @Override protected PropertiesConverter getPropertiesConverter() { - return new HivePropertiesConverter(); + return HivePropertiesConverter.getInstance(); } @Override diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java index ce4c4f9f08a..4cdd9da70f5 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java @@ -6,18 +6,31 @@ package com.datastrato.gravitino.spark.connector.hive; import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata; +import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig; import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import javax.ws.rs.NotSupportedException; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.sql.connector.catalog.TableCatalog; /** Transform hive catalog properties between Spark and Gravitino. */ public class HivePropertiesConverter implements PropertiesConverter { + public static class HivePropertiesConverterHolder { + private static final HivePropertiesConverter INSTANCE = new HivePropertiesConverter(); + } + + private HivePropertiesConverter() {} + + public static HivePropertiesConverter getInstance() { + return HivePropertiesConverterHolder.INSTANCE; + } // Transform Spark hive file format to Gravitino hive file format static final Map fileFormatMap = @@ -48,6 +61,20 @@ public class HivePropertiesConverter implements PropertiesConverter { HivePropertiesConstants.GRAVITINO_HIVE_TABLE_LOCATION, HivePropertiesConstants.SPARK_HIVE_LOCATION); + @Override + public Map toSparkCatalogProperties(Map properties) { + Preconditions.checkArgument(properties != null, "Hive Catalog properties should not be null"); + String metastoreUri = properties.get(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI); + Preconditions.checkArgument( + StringUtils.isNotBlank(metastoreUri), + "Couldn't get " + + GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI + + " from hive catalog properties"); + HashMap all = new HashMap<>(); + all.put(GravitinoSparkConfig.SPARK_HIVE_METASTORE_URI, metastoreUri); + return all; + } + /** * CREATE TABLE xxx STORED AS PARQUET will save "hive.stored-as" = "PARQUET" in property. * diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java index f7a028cad7a..4ed21faee5b 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java @@ -10,11 +10,7 @@ import com.datastrato.gravitino.spark.connector.SparkTransformConverter; import com.datastrato.gravitino.spark.connector.catalog.BaseCatalog; import com.datastrato.gravitino.spark.connector.table.SparkBaseTable; -import com.google.common.base.Preconditions; -import java.util.HashMap; -import java.util.Locale; import java.util.Map; -import org.apache.commons.lang3.StringUtils; import org.apache.iceberg.spark.SparkCatalog; import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; @@ -36,32 +32,10 @@ public class GravitinoIcebergCatalog extends BaseCatalog implements FunctionCata @Override protected TableCatalog createAndInitSparkCatalog( String name, CaseInsensitiveStringMap options, Map properties) { - Preconditions.checkArgument( - properties != null, "Iceberg Catalog properties should not be null"); - - String catalogBackend = - properties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND); - Preconditions.checkArgument( - StringUtils.isNotBlank(catalogBackend), "Iceberg Catalog backend should not be empty."); - - HashMap all = new HashMap<>(options); - - switch (catalogBackend.toLowerCase(Locale.ENGLISH)) { - case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE: - initHiveProperties(catalogBackend, properties, all); - break; - case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC: - initJdbcProperties(catalogBackend, properties, all); - break; - default: - // SparkCatalog does not support Memory type catalog - throw new IllegalArgumentException( - "Unsupported Iceberg Catalog backend: " + catalogBackend); - } - + Map all = + getPropertiesConverter().toSparkCatalogProperties(options, properties); TableCatalog icebergCatalog = new SparkCatalog(); icebergCatalog.initialize(name, new CaseInsensitiveStringMap(all)); - return icebergCatalog; } @@ -78,7 +52,7 @@ protected SparkBaseTable createSparkTable( @Override protected PropertiesConverter getPropertiesConverter() { - return new IcebergPropertiesConverter(); + return IcebergPropertiesConverter.getInstance(); } @Override @@ -95,79 +69,4 @@ public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceExce public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException { return ((SparkCatalog) sparkCatalog).loadFunction(ident); } - - private void initHiveProperties( - String catalogBackend, - Map gravitinoProperties, - HashMap icebergProperties) { - String metastoreUri = - gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI); - Preconditions.checkArgument( - StringUtils.isNotBlank(metastoreUri), - "Couldn't get " - + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI - + " from Iceberg Catalog properties"); - String hiveWarehouse = - gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE); - Preconditions.checkArgument( - StringUtils.isNotBlank(hiveWarehouse), - "Couldn't get " - + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE - + " from Iceberg Catalog properties"); - icebergProperties.put( - IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_TYPE, - catalogBackend.toLowerCase(Locale.ENGLISH)); - icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, metastoreUri); - icebergProperties.put( - IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, hiveWarehouse); - } - - private void initJdbcProperties( - String catalogBackend, - Map gravitinoProperties, - HashMap icebergProperties) { - String jdbcUri = - gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI); - Preconditions.checkArgument( - StringUtils.isNotBlank(jdbcUri), - "Couldn't get " - + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI - + " from Iceberg Catalog properties"); - String jdbcWarehouse = - gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE); - Preconditions.checkArgument( - StringUtils.isNotBlank(jdbcWarehouse), - "Couldn't get " - + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE - + " from Iceberg Catalog properties"); - String jdbcUser = gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_JDBC_USER); - Preconditions.checkArgument( - StringUtils.isNotBlank(jdbcUser), - "Couldn't get " - + IcebergPropertiesConstants.GRAVITINO_JDBC_USER - + " from Iceberg Catalog properties"); - String jdbcPassword = - gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_JDBC_PASSWORD); - Preconditions.checkArgument( - StringUtils.isNotBlank(jdbcPassword), - "Couldn't get " - + IcebergPropertiesConstants.GRAVITINO_JDBC_PASSWORD - + " from Iceberg Catalog properties"); - String jdbcDriver = - gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_DRIVER); - Preconditions.checkArgument( - StringUtils.isNotBlank(jdbcDriver), - "Couldn't get " - + IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_DRIVER - + " from Iceberg Catalog properties"); - icebergProperties.put( - IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_TYPE, - catalogBackend.toLowerCase(Locale.ROOT)); - icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, jdbcUri); - icebergProperties.put( - IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, jdbcWarehouse); - icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_USER, jdbcUser); - icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_PASSWORD, jdbcPassword); - icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_DRIVER, jdbcDriver); - } } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java index d69964785ab..3f8591dec28 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java @@ -7,34 +7,42 @@ import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata; import com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; public class IcebergPropertiesConstants { - @VisibleForTesting public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND = IcebergCatalogPropertiesMetadata.CATALOG_BACKEND_NAME; + static final String ICEBERG_CATALOG_TYPE = CatalogUtil.ICEBERG_CATALOG_TYPE; + @VisibleForTesting public static final String GRAVITINO_ICEBERG_CATALOG_WAREHOUSE = IcebergCatalogPropertiesMetadata.WAREHOUSE; + static final String ICEBERG_CATALOG_WAREHOUSE = CatalogProperties.WAREHOUSE_LOCATION; + @VisibleForTesting public static final String GRAVITINO_ICEBERG_CATALOG_URI = IcebergCatalogPropertiesMetadata.URI; - public static final String GRAVITINO_JDBC_USER = + static final String ICEBERG_CATALOG_URI = CatalogProperties.URI; + + static final String GRAVITINO_ICEBERG_CATALOG_JDBC_USER = IcebergCatalogPropertiesMetadata.GRAVITINO_JDBC_USER; - public static final String GRAVITINO_ICEBERG_JDBC_USER = + static final String ICEBERG_CATALOG_JDBC_USER = IcebergCatalogPropertiesMetadata.ICEBERG_JDBC_USER; - public static final String GRAVITINO_JDBC_PASSWORD = + + static final String GRAVITINO_ICEBERG_CATALOG_JDBC_PASSWORD = IcebergCatalogPropertiesMetadata.GRAVITINO_JDBC_PASSWORD; - public static final String GRAVITINO_ICEBERG_JDBC_PASSWORD = + static final String ICEBERG_CATALOG_JDBC_PASSWORD = IcebergCatalogPropertiesMetadata.ICEBERG_JDBC_PASSWORD; - public static final String GRAVITINO_ICEBERG_JDBC_DRIVER = - IcebergCatalogPropertiesMetadata.GRAVITINO_JDBC_DRIVER; - public static final String GRAVITINO_ICEBERG_CATALOG_TYPE = "type"; - public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive"; - public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC = "jdbc"; + static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive"; + static final String ICEBERG_CATALOG_BACKEND_HIVE = CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE; + + static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC = "jdbc"; + static final String ICEBERG_CATALOG_BACKEND_JDBC = "jdbc"; private IcebergPropertiesConstants() {} } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java index f96107c814d..58c2a09c9d5 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java @@ -6,11 +6,52 @@ package com.datastrato.gravitino.spark.connector.iceberg; import com.datastrato.gravitino.spark.connector.PropertiesConverter; +import com.google.common.base.Preconditions; import java.util.HashMap; +import java.util.Locale; import java.util.Map; +import org.apache.commons.lang3.StringUtils; /** Transform Iceberg catalog properties between Spark and Gravitino. */ public class IcebergPropertiesConverter implements PropertiesConverter { + + public static class IcebergPropertiesConverterHolder { + private static final IcebergPropertiesConverter INSTANCE = new IcebergPropertiesConverter(); + } + + private IcebergPropertiesConverter() {} + + public static IcebergPropertiesConverter getInstance() { + return IcebergPropertiesConverter.IcebergPropertiesConverterHolder.INSTANCE; + } + + @Override + public Map toSparkCatalogProperties(Map properties) { + Preconditions.checkArgument( + properties != null, "Iceberg Catalog properties should not be null"); + + String catalogBackend = + properties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogBackend), "Iceberg Catalog backend should not be empty."); + + HashMap all = new HashMap<>(); + + switch (catalogBackend.toLowerCase(Locale.ROOT)) { + case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE: + initHiveProperties(properties, all); + break; + case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC: + initJdbcProperties(properties, all); + break; + default: + // SparkCatalog does not support Memory type catalog + throw new IllegalArgumentException( + "Unsupported Iceberg Catalog backend: " + catalogBackend); + } + return all; + } + @Override public Map toGravitinoTableProperties(Map properties) { return new HashMap<>(properties); @@ -20,4 +61,66 @@ public Map toGravitinoTableProperties(Map proper public Map toSparkTableProperties(Map properties) { return new HashMap<>(properties); } + + private void initHiveProperties( + Map gravitinoProperties, HashMap icebergProperties) { + String metastoreUri = + gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI); + Preconditions.checkArgument( + StringUtils.isNotBlank(metastoreUri), + "Couldn't get " + + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI + + " from Iceberg Catalog properties"); + String hiveWarehouse = + gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE); + Preconditions.checkArgument( + StringUtils.isNotBlank(hiveWarehouse), + "Couldn't get " + + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE + + " from Iceberg Catalog properties"); + icebergProperties.put( + IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE); + icebergProperties.put(IcebergPropertiesConstants.ICEBERG_CATALOG_URI, metastoreUri); + icebergProperties.put(IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE, hiveWarehouse); + } + + private void initJdbcProperties( + Map gravitinoProperties, HashMap icebergProperties) { + String jdbcUri = + gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI); + Preconditions.checkArgument( + StringUtils.isNotBlank(jdbcUri), + "Couldn't get " + + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI + + " from Iceberg Catalog properties"); + String jdbcWarehouse = + gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE); + Preconditions.checkArgument( + StringUtils.isNotBlank(jdbcWarehouse), + "Couldn't get " + + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE + + " from Iceberg Catalog properties"); + String jdbcUser = + gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_JDBC_USER); + Preconditions.checkArgument( + StringUtils.isNotBlank(jdbcUser), + "Couldn't get " + + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_JDBC_USER + + " from Iceberg Catalog properties"); + String jdbcPassword = + gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_JDBC_PASSWORD); + Preconditions.checkArgument( + StringUtils.isNotBlank(jdbcPassword), + "Couldn't get " + + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_JDBC_PASSWORD + + " from Iceberg Catalog properties"); + icebergProperties.put( + IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_JDBC); + icebergProperties.put(IcebergPropertiesConstants.ICEBERG_CATALOG_URI, jdbcUri); + icebergProperties.put(IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE, jdbcWarehouse); + icebergProperties.put(IcebergPropertiesConstants.ICEBERG_CATALOG_JDBC_USER, jdbcUser); + icebergProperties.put(IcebergPropertiesConstants.ICEBERG_CATALOG_JDBC_PASSWORD, jdbcPassword); + } } diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java index 297c6d8bd99..f7c09b677f9 100644 --- a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java +++ b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java @@ -5,10 +5,13 @@ package com.datastrato.gravitino.spark.connector.hive; +import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig; +import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.google.common.collect.ImmutableMap; import java.util.Map; import javax.ws.rs.NotSupportedException; import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -16,7 +19,8 @@ @TestInstance(Lifecycle.PER_CLASS) public class TestHivePropertiesConverter { - HivePropertiesConverter hivePropertiesConverter = new HivePropertiesConverter(); + private final HivePropertiesConverter hivePropertiesConverter = + HivePropertiesConverter.getInstance(); @Test void testTableFormat() { @@ -139,4 +143,54 @@ void testOptionProperties() { Assertions.assertEquals( ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "1", "b", "2"), properties); } + + @Test + void testCatalogProperties() { + CaseInsensitiveStringMap caseInsensitiveStringMap = + new CaseInsensitiveStringMap(ImmutableMap.of("option-key", "option-value")); + Map properties = + hivePropertiesConverter.toSparkCatalogProperties( + caseInsensitiveStringMap, + ImmutableMap.of( + GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI, + "hive-uri", + PropertiesConverter.SPARK_PROPERTY_PREFIX + "bypass-key", + "bypass-value", + "key1", + "value1")); + Assertions.assertEquals( + ImmutableMap.of( + GravitinoSparkConfig.SPARK_HIVE_METASTORE_URI, + "hive-uri", + "option-key", + "option-value", + "bypass-key", + "bypass-value"), + properties); + + // test overwrite + caseInsensitiveStringMap = + new CaseInsensitiveStringMap( + ImmutableMap.of( + "bypass-key", + "overwrite-value", + GravitinoSparkConfig.SPARK_HIVE_METASTORE_URI, + "hive-uri2")); + properties = + hivePropertiesConverter.toSparkCatalogProperties( + caseInsensitiveStringMap, + ImmutableMap.of( + GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI, + "hive-uri", + PropertiesConverter.SPARK_PROPERTY_PREFIX + "bypass-key", + "bypass-value")); + + Assertions.assertEquals( + ImmutableMap.of( + GravitinoSparkConfig.SPARK_HIVE_METASTORE_URI, + "hive-uri2", + "bypass-key", + "overwrite-value"), + properties); + } } diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/iceberg/TestIcebergPropertiesConverter.java b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/iceberg/TestIcebergPropertiesConverter.java new file mode 100644 index 00000000000..549fb503db5 --- /dev/null +++ b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/iceberg/TestIcebergPropertiesConverter.java @@ -0,0 +1,72 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector.iceberg; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestIcebergPropertiesConverter { + private final IcebergPropertiesConverter icebergPropertiesConverter = + IcebergPropertiesConverter.getInstance(); + + @Test + void testCatalogPropertiesWithHiveBackend() { + Map properties = + icebergPropertiesConverter.toSparkCatalogProperties( + ImmutableMap.of( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE, + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, + "hive-uri", + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, + "hive-warehouse", + "key1", + "value1")); + Assertions.assertEquals( + ImmutableMap.of( + IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE, + IcebergPropertiesConstants.ICEBERG_CATALOG_URI, + "hive-uri", + IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE, + "hive-warehouse"), + properties); + } + + @Test + void testCatalogPropertiesWithJdbcBackend() { + Map properties = + icebergPropertiesConverter.toSparkCatalogProperties( + ImmutableMap.of( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_JDBC, + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, + "jdbc-uri", + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, + "jdbc-warehouse", + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_JDBC_USER, + "user", + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_JDBC_PASSWORD, + "passwd", + "key1", + "value1")); + Assertions.assertEquals( + ImmutableMap.of( + IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_JDBC, + IcebergPropertiesConstants.ICEBERG_CATALOG_URI, + "jdbc-uri", + IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE, + "jdbc-warehouse", + IcebergPropertiesConstants.ICEBERG_CATALOG_JDBC_USER, + "user", + IcebergPropertiesConstants.ICEBERG_CATALOG_JDBC_PASSWORD, + "passwd"), + properties); + } +}