Skip to content

Commit

Permalink
[BugFix] Fix deltalake glue catalog (#31839)
Browse files Browse the repository at this point in the history
Signed-off-by: Letian Jiang <letian.jiang@outlook.com>
(cherry picked from commit 5f9c1ac)
  • Loading branch information
letian-jiang authored and mergify[bot] committed Oct 8, 2023
1 parent a01bebc commit fb0bae7
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

package com.starrocks.connector.delta;

import com.google.common.base.Preconditions;
import com.starrocks.common.util.Util;
import com.starrocks.connector.Connector;
import com.starrocks.connector.ConnectorContext;
import com.starrocks.connector.ConnectorMetadata;
Expand Down Expand Up @@ -47,14 +45,6 @@ public DeltaLakeConnector(ConnectorContext context) {
this.internalMgr = new DeltaLakeInternalMgr(catalogName, properties, hdfsEnvironment);
this.metadataFactory = createMetadataFactory();
// TODO extract to ConnectorConfigFactory
validate();
onCreate();
}

public void validate() {
String hiveMetastoreUris = Preconditions.checkNotNull(properties.get(HIVE_METASTORE_URIS),
"%s must be set in properties when creating hive catalog", HIVE_METASTORE_URIS);
Util.validateMetastoreUris(hiveMetastoreUris);
}

@Override
Expand All @@ -68,15 +58,12 @@ private DeltaLakeMetadataFactory createMetadataFactory() {
catalogName,
metastore,
internalMgr.getHiveMetastoreConf(),
properties.get(HIVE_METASTORE_URIS),
properties,
internalMgr.getHdfsEnvironment(),
internalMgr.getMetastoreType()
);
}

public void onCreate() {
}

public CloudConfiguration getCloudConfiguration() {
return this.cloudConfiguration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.starrocks.connector.delta;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.starrocks.common.util.Util;
import com.starrocks.connector.HdfsEnvironment;
Expand All @@ -25,31 +26,44 @@
import com.starrocks.connector.hive.HiveMetaClient;
import com.starrocks.connector.hive.HiveMetastore;
import com.starrocks.connector.hive.IHiveMetastore;
import com.starrocks.sql.analyzer.SemanticException;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static com.starrocks.connector.delta.DeltaLakeConnector.HIVE_METASTORE_URIS;
import static com.starrocks.connector.hive.HiveConnector.HIVE_METASTORE_TYPE;
import static com.starrocks.connector.hive.HiveConnector.HIVE_METASTORE_URIS;

public class DeltaLakeInternalMgr {
public static final List<String> SUPPORTED_METASTORE_TYPE = ImmutableList.of("hive", "glue");
private final String catalogName;
private final Map<String, String> properties;
private final HdfsEnvironment hdfsEnvironment;
private final boolean enableMetastoreCache;
private final CachingHiveMetastoreConf hmsConf;
private ExecutorService refreshHiveMetastoreExecutor;
private MetastoreType metastoreType = MetastoreType.HMS;
private final MetastoreType metastoreType;

public DeltaLakeInternalMgr(String catalogName, Map<String, String> properties, HdfsEnvironment hdfsEnvironment) {
this.catalogName = catalogName;
this.properties = properties;
this.enableMetastoreCache = Boolean.parseBoolean(properties.getOrDefault("enable_metastore_cache", "false"));
this.hmsConf = new CachingHiveMetastoreConf(properties, "delta lake");
this.hdfsEnvironment = hdfsEnvironment;
String hiveMetastoreUris = Preconditions.checkNotNull(properties.get(HIVE_METASTORE_URIS),
"%s must be set in properties when creating hive catalog", HIVE_METASTORE_URIS);
Util.validateMetastoreUris(hiveMetastoreUris);

String hiveMetastoreType = properties.getOrDefault(HIVE_METASTORE_TYPE, "hive").toLowerCase();
if (!SUPPORTED_METASTORE_TYPE.contains(hiveMetastoreType)) {
throw new SemanticException("hive metastore type [%s] is not supported", hiveMetastoreType);
}

if (hiveMetastoreType.equals("hive")) {
String hiveMetastoreUris = Preconditions.checkNotNull(properties.get(HIVE_METASTORE_URIS),
"%s must be set in properties when creating catalog of hive-metastore", HIVE_METASTORE_URIS);
Util.validateMetastoreUris(hiveMetastoreUris);
}
this.metastoreType = MetastoreType.get(hiveMetastoreType);
}

public IHiveMetastore createHiveMetastore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import com.starrocks.connector.hive.IHiveMetastore;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;

import java.util.Map;

import static com.starrocks.connector.delta.DeltaLakeConnector.HIVE_METASTORE_URIS;
import static com.starrocks.connector.hive.CachingHiveMetastore.createQueryLevelInstance;

public class DeltaLakeMetadataFactory {
Expand All @@ -32,12 +35,16 @@ public class DeltaLakeMetadataFactory {
private final MetastoreType metastoreType;

public DeltaLakeMetadataFactory(String catalogName, IHiveMetastore metastore, CachingHiveMetastoreConf hmsConf,
String uri, HdfsEnvironment hdfsEnvironment, MetastoreType metastoreType) {
Map<String, String> properties, HdfsEnvironment hdfsEnvironment,
MetastoreType metastoreType) {
this.catalogName = catalogName;
this.metastore = metastore;
this.perQueryMetastoreMaxNum = hmsConf.getPerQueryCacheMaxNum();
this.hdfsEnvironment = hdfsEnvironment;
this.hdfsEnvironment.getConfiguration().set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), uri);
if (properties.containsKey(HIVE_METASTORE_URIS)) {
this.hdfsEnvironment.getConfiguration().set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(),
properties.get(HIVE_METASTORE_URIS));
}
this.metastoreType = metastoreType;
}

Expand Down

0 comments on commit fb0bae7

Please sign in to comment.