Skip to content

Commit

Permalink
[#3292] fix(spark-connector): passing Gravitino catalog properties to…
Browse files Browse the repository at this point in the history
… spark connector (#3358)

### What changes were proposed in this pull request?
1. passing Gravitino catalog properties with `spark.bypass.` prefix to
spark connector
2. refactor the catalog properties transform logic.

### Why are the changes needed?

Fix: #3292

### Does this PR introduce _any_ user-facing change?
yes, add document

### How was this patch tested?
1. add UT
3. test in local envriment

Co-authored-by: FANNG <xiaojing@datastrato.com>
  • Loading branch information
jerryshao and FANNG1 authored May 13, 2024
1 parent bb0a318 commit 7a6d045
Show file tree
Hide file tree
Showing 12 changed files with 381 additions and 131 deletions.
3 changes: 3 additions & 0 deletions docs/apache-hive-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ The Hive catalog supports creating, updating, and deleting databases and tables

When you use the Gravitino with Trino. You can pass the Trino Hive connector configuration using prefix `trino.bypass.`. For example, using `trino.bypass.hive.config.resources` to pass the `hive.config.resources` to the Gravitino Hive catalog in Trino runtime.

When you use the Gravitino with Spark. You can pass the Spark Hive connector configuration using prefix `spark.bypass.`. For example, using `spark.bypass.hive.exec.dynamic.partition.mode` to pass the `hive.exec.dynamic.partition.mode` to the Spark Hive connector in Spark runtime.


### Catalog operations

Refer to [Manage Relational Metadata Using Gravitino](./manage-relational-metadata-using-gravitino.md#catalog-operations) for more details.
Expand Down
3 changes: 3 additions & 0 deletions docs/lakehouse-iceberg-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ Any properties not defined by Gravitino with `gravitino.bypass.` prefix will pas

When you use the Gravitino with Trino. You can pass the Trino Iceberg connector configuration using prefix `trino.bypass.`. For example, using `trino.bypass.iceberg.table-statistics-enabled` to pass the `iceberg.table-statistics-enabled` to the Gravitino Iceberg catalog in Trino runtime.

When you use the Gravitino with Spark. You can pass the Spark Iceberg connector configuration using prefix `spark.bypass.`. For example, using `spark.bypass.io-impl` to pass the `io-impl` to the Spark Iceberg connector in Spark runtime.


#### JDBC catalog

If you are using JDBC catalog, you must provide `jdbc-user`, `jdbc-password` and `jdbc-driver` to catalog properties.
Expand Down
16 changes: 16 additions & 0 deletions docs/spark-connector/spark-catalog-hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,19 @@ INSERT OVERWRITE TABLE employees PARTITION(department='Marketing') VALUES (3, 'M

SELECT * FROM employees WHERE department = 'Engineering';
```


## Catalog properties

Gravitino spark connector will transform below property names which are defined in catalog properties to Spark Hive connector configuration.

| Property name in Gravitino catalog properties | Spark Hive connector configuration | Description | Since Version |
|-----------------------------------------------|------------------------------------|----------------------------|---------------|
| `metastore.uris` | `hive.metastore.uris` | Hive metastore uri address | 0.5.0 |

Gravitino catalog property names with the prefix `spark.bypass.` are passed to Spark Hive connector. For example, using `spark.bypass.hive.exec.dynamic.partition.mode` to pass the `hive.exec.dynamic.partition.mode` to the Spark Hive connector.


:::caution
When using the `spark-sql` shell client, you must explicitly set the `spark.bypass.spark.sql.hive.metastore.jars` in the Gravitino Hive catalog properties. Replace the default `builtin` value with the appropriate setting for your setup.
:::
14 changes: 14 additions & 0 deletions docs/spark-connector/spark-catalog-iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,17 @@ VALUES

SELECT * FROM employee WHERE date(hire_date) = '2021-01-01'
```

## Catalog properties

Gravitino spark connector will transform below property names which are defined in catalog properties to Spark Iceberg connector configuration.

| Gravitino catalog property name | Spark Iceberg connector configuration | Description | Since Version |
|---------------------------------|---------------------------------------|---------------------------|---------------|
| `catalog-backend` | `type` | Catalog backend type | 0.5.0 |
| `uri` | `uri` | Catalog backend uri | 0.5.0 |
| `warehouse` | `warehouse` | Catalog backend warehouse | 0.5.0 |
| `jdbc-user` | `jdbc.user` | JDBC user name | 0.5.0 |
| `jdbc-password` | `jdbc.password` | JDBC password | 0.5.0 |

Gravitino catalog property names with the prefix `spark.bypass.` are passed to Spark Iceberg connector. For example, using `spark.bypass.io-impl` to pass the `io-impl` to the Spark Iceberg connector.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,74 @@

package com.datastrato.gravitino.spark.connector;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/** Transform table properties between Gravitino and Spark. */
/** Interface for transforming properties between Gravitino and Spark. */
public interface PropertiesConverter {
@VisibleForTesting String SPARK_PROPERTY_PREFIX = "spark.bypass.";

/**
* Converts properties from application provided properties and Gravitino catalog properties to
* Spark connector properties.
*
* <p>It provides the common implementation, include extract properties with "spark.bypass"
* prefix, merge user provided options and transformed properties.
*
* @param options Case-insensitive properties map provided by application configuration.
* @param properties Gravitino catalog properties.
* @return properties for the Spark connector.
*/
default Map<String, String> toSparkCatalogProperties(
CaseInsensitiveStringMap options, Map<String, String> properties) {
Map<String, String> all = new HashMap<>();
if (properties != null) {
properties.forEach(
(k, v) -> {
if (k.startsWith(SPARK_PROPERTY_PREFIX)) {
String newKey = k.substring(SPARK_PROPERTY_PREFIX.length());
all.put(newKey, v);
}
});
}

Map<String, String> transformedProperties = toSparkCatalogProperties(properties);
if (transformedProperties != null) {
all.putAll(transformedProperties);
}

if (options != null) {
all.putAll(options);
}
return all;
}

/**
* Transform properties from Gravitino catalog properties to Spark connector properties.
*
* <p>This interface focuses on the catalog specific transform logic, the common logic are
* implemented in {@code toSparkCatalogProperties}.
*
* @param properties Gravitino catalog properties.
* @return properties for the Spark connector.
*/
Map<String, String> toSparkCatalogProperties(Map<String, String> properties);

/**
* Converts Spark table properties to Gravitino table properties.
*
* @param properties Spark table properties.
* @return Gravitino table properties.
*/
Map<String, String> toGravitinoTableProperties(Map<String, String> properties);

/**
* Converts Gravitino table properties to Spark table properties.
*
* @param properties Gravitino table properties.
* @return Spark table properties.
*/
Map<String, String> toSparkTableProperties(Map<String, String> properties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,11 @@
package com.datastrato.gravitino.spark.connector.hive;

import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.connector.PropertiesConverter;
import com.datastrato.gravitino.spark.connector.SparkTransformConverter;
import com.datastrato.gravitino.spark.connector.catalog.BaseCatalog;
import com.datastrato.gravitino.spark.connector.table.SparkBaseTable;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
Expand All @@ -25,17 +21,9 @@ public class GravitinoHiveCatalog extends BaseCatalog {
@Override
protected TableCatalog createAndInitSparkCatalog(
String name, CaseInsensitiveStringMap options, Map<String, String> properties) {
Preconditions.checkArgument(properties != null, "Hive Catalog properties should not be null");
String metastoreUri = properties.get(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI);
Preconditions.checkArgument(
StringUtils.isNotBlank(metastoreUri),
"Couldn't get "
+ GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI
+ " from hive catalog properties");

TableCatalog hiveCatalog = new HiveTableCatalog();
HashMap<String, String> all = new HashMap<>(options);
all.put(GravitinoSparkConfig.SPARK_HIVE_METASTORE_URI, metastoreUri);
Map<String, String> all =
getPropertiesConverter().toSparkCatalogProperties(options, properties);
hiveCatalog.initialize(name, new CaseInsensitiveStringMap(all));

return hiveCatalog;
Expand All @@ -54,7 +42,7 @@ protected SparkBaseTable createSparkTable(

@Override
protected PropertiesConverter getPropertiesConverter() {
return new HivePropertiesConverter();
return HivePropertiesConverter.getInstance();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,31 @@
package com.datastrato.gravitino.spark.connector.hive;

import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata;
import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.connector.PropertiesConverter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.ws.rs.NotSupportedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.connector.catalog.TableCatalog;

/** Transform hive catalog properties between Spark and Gravitino. */
public class HivePropertiesConverter implements PropertiesConverter {
public static class HivePropertiesConverterHolder {
private static final HivePropertiesConverter INSTANCE = new HivePropertiesConverter();
}

private HivePropertiesConverter() {}

public static HivePropertiesConverter getInstance() {
return HivePropertiesConverterHolder.INSTANCE;
}

// Transform Spark hive file format to Gravitino hive file format
static final Map<String, String> fileFormatMap =
Expand Down Expand Up @@ -48,6 +61,20 @@ public class HivePropertiesConverter implements PropertiesConverter {
HivePropertiesConstants.GRAVITINO_HIVE_TABLE_LOCATION,
HivePropertiesConstants.SPARK_HIVE_LOCATION);

@Override
public Map<String, String> toSparkCatalogProperties(Map<String, String> properties) {
Preconditions.checkArgument(properties != null, "Hive Catalog properties should not be null");
String metastoreUri = properties.get(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI);
Preconditions.checkArgument(
StringUtils.isNotBlank(metastoreUri),
"Couldn't get "
+ GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI
+ " from hive catalog properties");
HashMap<String, String> all = new HashMap<>();
all.put(GravitinoSparkConfig.SPARK_HIVE_METASTORE_URI, metastoreUri);
return all;
}

/**
* CREATE TABLE xxx STORED AS PARQUET will save "hive.stored-as" = "PARQUET" in property.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@
import com.datastrato.gravitino.spark.connector.SparkTransformConverter;
import com.datastrato.gravitino.spark.connector.catalog.BaseCatalog;
import com.datastrato.gravitino.spark.connector.table.SparkBaseTable;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
Expand All @@ -36,32 +32,10 @@ public class GravitinoIcebergCatalog extends BaseCatalog implements FunctionCata
@Override
protected TableCatalog createAndInitSparkCatalog(
String name, CaseInsensitiveStringMap options, Map<String, String> properties) {
Preconditions.checkArgument(
properties != null, "Iceberg Catalog properties should not be null");

String catalogBackend =
properties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND);
Preconditions.checkArgument(
StringUtils.isNotBlank(catalogBackend), "Iceberg Catalog backend should not be empty.");

HashMap<String, String> all = new HashMap<>(options);

switch (catalogBackend.toLowerCase(Locale.ENGLISH)) {
case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE:
initHiveProperties(catalogBackend, properties, all);
break;
case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC:
initJdbcProperties(catalogBackend, properties, all);
break;
default:
// SparkCatalog does not support Memory type catalog
throw new IllegalArgumentException(
"Unsupported Iceberg Catalog backend: " + catalogBackend);
}

Map<String, String> all =
getPropertiesConverter().toSparkCatalogProperties(options, properties);
TableCatalog icebergCatalog = new SparkCatalog();
icebergCatalog.initialize(name, new CaseInsensitiveStringMap(all));

return icebergCatalog;
}

Expand All @@ -78,7 +52,7 @@ protected SparkBaseTable createSparkTable(

@Override
protected PropertiesConverter getPropertiesConverter() {
return new IcebergPropertiesConverter();
return IcebergPropertiesConverter.getInstance();
}

@Override
Expand All @@ -95,79 +69,4 @@ public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceExce
public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException {
return ((SparkCatalog) sparkCatalog).loadFunction(ident);
}

private void initHiveProperties(
String catalogBackend,
Map<String, String> gravitinoProperties,
HashMap<String, String> icebergProperties) {
String metastoreUri =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI);
Preconditions.checkArgument(
StringUtils.isNotBlank(metastoreUri),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI
+ " from Iceberg Catalog properties");
String hiveWarehouse =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE);
Preconditions.checkArgument(
StringUtils.isNotBlank(hiveWarehouse),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE
+ " from Iceberg Catalog properties");
icebergProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_TYPE,
catalogBackend.toLowerCase(Locale.ENGLISH));
icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, metastoreUri);
icebergProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, hiveWarehouse);
}

private void initJdbcProperties(
String catalogBackend,
Map<String, String> gravitinoProperties,
HashMap<String, String> icebergProperties) {
String jdbcUri =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcUri),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI
+ " from Iceberg Catalog properties");
String jdbcWarehouse =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcWarehouse),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE
+ " from Iceberg Catalog properties");
String jdbcUser = gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_JDBC_USER);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcUser),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_JDBC_USER
+ " from Iceberg Catalog properties");
String jdbcPassword =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_JDBC_PASSWORD);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcPassword),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_JDBC_PASSWORD
+ " from Iceberg Catalog properties");
String jdbcDriver =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_DRIVER);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcDriver),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_DRIVER
+ " from Iceberg Catalog properties");
icebergProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_TYPE,
catalogBackend.toLowerCase(Locale.ROOT));
icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, jdbcUri);
icebergProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, jdbcWarehouse);
icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_USER, jdbcUser);
icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_PASSWORD, jdbcPassword);
icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_DRIVER, jdbcDriver);
}
}
Loading

0 comments on commit 7a6d045

Please sign in to comment.