Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
caican00 committed Mar 31, 2024
1 parent a337e14 commit a2e8efd
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.datastrato.gravitino.integration.test.container.HiveContainer;
import com.datastrato.gravitino.integration.test.util.spark.SparkUtilIT;
import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants;
import com.datastrato.gravitino.spark.connector.plugin.GravitinoSparkPlugin;
import com.google.common.collect.Maps;
import java.io.IOException;
Expand Down Expand Up @@ -85,9 +86,9 @@ private void initMetalakeAndCatalogs() {
properties.put(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI, hiveMetastoreUri);
break;
case "lakehouse-iceberg":
properties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_BACKEND, "hive");
properties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE, warehouse);
properties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_URI, hiveMetastoreUri);
properties.put(IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_BACKEND, "hive");
properties.put(IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE, warehouse);
properties.put(IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_URI, hiveMetastoreUri);
break;
default:
throw new IllegalArgumentException("Unsupported provider: " + getProvider());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,5 @@ public class GravitinoSparkConfig {
public static final String GRAVITINO_HIVE_METASTORE_URI = "metastore.uris";
public static final String SPARK_HIVE_METASTORE_URI = "hive.metastore.uris";

public static final String LAKEHOUSE_ICEBERG_CATALOG_BACKEND = "catalog-backend";
public static final String LAKEHOUSE_ICEBERG_CATALOG_TYPE = "type";
public static final String LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE = "warehouse";
public static final String LAKEHOUSE_ICEBERG_CATALOG_URI = "uri";
public static final String GRAVITINO_JDBC_USER = "jdbc-user";
public static final String LAKEHOUSE_ICEBERG_CATALOG_JDBC_USER = "jdbc.user";
public static final String GRAVITINO_JDBC_PASSWORD = "jdbc-password";
public static final String LAKEHOUSE_ICEBERG_CATALOG_JDBC_PASSWORD = "jdbc.password";
public static final String LAKEHOUSE_ICEBERG_CATALOG_JDBC_INITIALIZE = "jdbc-initialize";
public static final String LAKEHOUSE_ICEBERG_CATALOG_JDBC_DRIVER = "jdbc-driver";

public static final String LAKEHOUSE_ICEBERG_CATALOG_BACKEND_HIVE = "hive";
public static final String LAKEHOUSE_ICEBERG_CATALOG_BACKEND_JDBC = "jdbc";

private GravitinoSparkConfig() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.spark.connector.GravitinoCatalogAdaptor;
import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.connector.PropertiesConverter;
import com.datastrato.gravitino.spark.connector.table.SparkBaseTable;
import com.google.common.base.Preconditions;
Expand All @@ -28,71 +27,76 @@ private void initHiveProperties(
Map<String, String> gravitinoProperties,
HashMap<String, String> icebergProperties) {
String metastoreUri =
gravitinoProperties.get(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_URI);
gravitinoProperties.get(IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_URI);
Preconditions.checkArgument(
StringUtils.isNotBlank(metastoreUri),
"Couldn't get "
+ GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_URI
+ " from iceberg catalog properties");
+ IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_URI
+ " from Iceberg Catalog properties");
String hiveWarehouse =
gravitinoProperties.get(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE);
gravitinoProperties.get(IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE);
Preconditions.checkArgument(
StringUtils.isNotBlank(hiveWarehouse),
"Couldn't get "
+ GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE
+ " from iceberg catalog properties");
+ IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE
+ " from Iceberg Catalog properties");
icebergProperties.put(
GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_TYPE,
IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_TYPE,
catalogBackend.toLowerCase(Locale.ENGLISH));
icebergProperties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_URI, metastoreUri);
icebergProperties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE, hiveWarehouse);
icebergProperties.put(IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_URI, metastoreUri);
icebergProperties.put(
IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE, hiveWarehouse);
}

private void initJdbcProperties(
String catalogBackend,
Map<String, String> gravitinoProperties,
HashMap<String, String> icebergProperties) {
String jdbcUri = gravitinoProperties.get(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_URI);
String jdbcUri =
gravitinoProperties.get(IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_URI);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcUri),
"Couldn't get "
+ GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_URI
+ " from iceberg catalog properties");
+ IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_URI
+ " from Iceberg Catalog properties");
String jdbcWarehouse =
gravitinoProperties.get(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE);
gravitinoProperties.get(IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcWarehouse),
"Couldn't get "
+ GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE
+ " from iceberg catalog properties");
String jdbcUser = gravitinoProperties.get(GravitinoSparkConfig.GRAVITINO_JDBC_USER);
+ IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE
+ " from Iceberg Catalog properties");
String jdbcUser = gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_JDBC_USER);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcUser),
"Couldn't get "
+ GravitinoSparkConfig.GRAVITINO_JDBC_USER
+ " from iceberg catalog properties");
String jdbcPasswrod = gravitinoProperties.get(GravitinoSparkConfig.GRAVITINO_JDBC_PASSWORD);
+ IcebergPropertiesConstants.GRAVITINO_JDBC_USER
+ " from Iceberg Catalog properties");
String jdbcPassword =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_JDBC_PASSWORD);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcPasswrod),
StringUtils.isNotBlank(jdbcPassword),
"Couldn't get "
+ GravitinoSparkConfig.GRAVITINO_JDBC_PASSWORD
+ " from iceberg catalog properties");
+ IcebergPropertiesConstants.GRAVITINO_JDBC_PASSWORD
+ " from Iceberg Catalog properties");
String jdbcDriver =
gravitinoProperties.get(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_JDBC_DRIVER);
gravitinoProperties.get(IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_JDBC_DRIVER);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcDriver),
"Couldn't get "
+ GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_JDBC_DRIVER
+ " from iceberg catalog properties");
+ IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_JDBC_DRIVER
+ " from Iceberg Catalog properties");
icebergProperties.put(
GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_TYPE,
IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_TYPE,
catalogBackend.toLowerCase(Locale.ROOT));
icebergProperties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_URI, jdbcUri);
icebergProperties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE, jdbcWarehouse);
icebergProperties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_JDBC_USER, jdbcUser);
icebergProperties.put(IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_URI, jdbcUri);
icebergProperties.put(
IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE, jdbcWarehouse);
icebergProperties.put(IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_JDBC_USER, jdbcUser);
icebergProperties.put(
IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_JDBC_PASSWORD, jdbcPassword);
icebergProperties.put(
GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_JDBC_PASSWORD, jdbcPasswrod);
icebergProperties.put(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_JDBC_DRIVER, jdbcDriver);
IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_JDBC_DRIVER, jdbcDriver);
}

@Override
Expand All @@ -115,17 +119,18 @@ public TableCatalog createAndInitSparkCatalog(
Preconditions.checkArgument(
properties != null, "Iceberg Catalog properties should not be null");

String catalogBackend = properties.get(GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_BACKEND);
String catalogBackend =
properties.get(IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_BACKEND);
Preconditions.checkArgument(
StringUtils.isNotBlank(catalogBackend), "Iceberg Catalog backend should not be empty.");

HashMap<String, String> all = new HashMap<>(options);

switch (catalogBackend.toLowerCase(Locale.ENGLISH)) {
case GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_BACKEND_HIVE:
case IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_BACKEND_HIVE:
initHiveProperties(catalogBackend, properties, all);
break;
case GravitinoSparkConfig.LAKEHOUSE_ICEBERG_CATALOG_BACKEND_JDBC:
case IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_BACKEND_JDBC:
initJdbcProperties(catalogBackend, properties, all);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.spark.connector.iceberg;

import com.google.common.annotations.VisibleForTesting;

public class IcebergPropertiesConstants {

@VisibleForTesting
public static final String LAKEHOUSE_ICEBERG_CATALOG_BACKEND =
IcebergCatalogPropertiesMetadata.CATALOG_BACKEND;

@VisibleForTesting
public static final String LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE =
IcebergCatalogPropertiesMetadata.WAREHOUSE;

@VisibleForTesting
public static final String LAKEHOUSE_ICEBERG_CATALOG_URI = IcebergCatalogPropertiesMetadata.URI;

public static final String GRAVITINO_JDBC_USER =
IcebergCatalogPropertiesMetadata.GRAVITINO_JDBC_USER;
public static final String LAKEHOUSE_ICEBERG_CATALOG_JDBC_USER =
IcebergCatalogPropertiesMetadata.ICEBERG_JDBC_USER;
public static final String GRAVITINO_JDBC_PASSWORD =
IcebergCatalogPropertiesMetadata.GRAVITINO_JDBC_PASSWORD;
public static final String LAKEHOUSE_ICEBERG_CATALOG_JDBC_PASSWORD =
IcebergCatalogPropertiesMetadata.ICEBERG_JDBC_PASSWORD;
public static final String LAKEHOUSE_ICEBERG_CATALOG_JDBC_DRIVER =
IcebergCatalogPropertiesMetadata.GRAVITINO_JDBC_DRIVER;

public static final String LAKEHOUSE_ICEBERG_CATALOG_TYPE = "type";
public static final String LAKEHOUSE_ICEBERG_CATALOG_BACKEND_HIVE = "hive";
public static final String LAKEHOUSE_ICEBERG_CATALOG_BACKEND_JDBC = "jdbc";

private IcebergPropertiesConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ public class SparkIcebergTable extends SparkBaseTable {
public SparkIcebergTable(
Identifier identifier,
Table gravitinoTable,
TableCatalog sparkCatalog,
TableCatalog sparkIcebergCatalog,
PropertiesConverter propertiesConverter) {
super(identifier, gravitinoTable, sparkCatalog, propertiesConverter);
super(identifier, gravitinoTable, sparkIcebergCatalog, propertiesConverter);
}
}

0 comments on commit a2e8efd

Please sign in to comment.