diff --git a/LICENSE b/LICENSE index a0e27b20680..8a2f2fac8a0 100644 --- a/LICENSE +++ b/LICENSE @@ -221,6 +221,7 @@ Apache Iceberg ./api/src/main/java/com/datastrato/gravitino/exceptions/RESTException.java + ./catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/CachedClientPool.java ./catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveClientPool.java ./catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/dyn/DynConstructors.java ./catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/dyn/DynFields.java @@ -228,6 +229,7 @@ ./catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/miniHMS/MiniHiveMetastore.java ./catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/miniHMS/MiniHiveMetastoreService.java ./catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/miniHMS/ScriptRunner.java + ./catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestCachedClientPool.java ./catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/web/IcebergExceptionMapper.java ./clients/client-java/src/main/java/com/datastrato/gravitino/client/HTTPClient.java ./clients/client-java/src/main/java/com/datastrato/gravitino/client/RESTClient.java diff --git a/LICENSE.bin b/LICENSE.bin index 493f3b98718..2cede6220f4 100644 --- a/LICENSE.bin +++ b/LICENSE.bin @@ -327,6 +327,7 @@ Okio J2ObjC SQLite JDBC Driver + Immutables This product bundles various third-party components also under the Apache Software Foundation License 1.1 diff --git a/catalogs/catalog-hive/build.gradle.kts b/catalogs/catalog-hive/build.gradle.kts index d349b2792ea..9c49b92a11c 100644 --- a/catalogs/catalog-hive/build.gradle.kts +++ b/catalogs/catalog-hive/build.gradle.kts @@ -17,6 +17,9 @@ dependencies { compileOnly(libs.lombok) annotationProcessor(libs.lombok) + compileOnly(libs.immutables.value) + annotationProcessor(libs.immutables.value) + implementation(libs.hive2.metastore) { exclude("org.apache.hbase") exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager") @@ -68,6 +71,7 @@ dependencies { implementation(libs.slf4j.api) implementation(libs.guava) + implementation(libs.caffeine) testImplementation(libs.junit.jupiter.api) testRuntimeOnly(libs.junit.jupiter.engine) diff --git a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/CachedClientPool.java b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/CachedClientPool.java new file mode 100644 index 00000000000..07d58647a7c --- /dev/null +++ b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/CachedClientPool.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datastrato.gravitino.catalog.hive; + +import com.datastrato.gravitino.utils.ClientPool; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Scheduler; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.TException; +import org.immutables.value.Value; + +/** + * Referred from Apache Iceberg's CachedClientPool implementation + * hive-metastore/src/main/java/org/apache/iceberg/hive/CachedClientPool.java + * + *

ClientPoolCache is used for every HiveCatalog, I changed the type of `clientPoolCache` from + * static variable to variable. I change cache key from user and configuration options to the + * username. + * + *

A ClientPool that caches the underlying HiveClientPool instances. + */ +public class CachedClientPool implements ClientPool { + + private final Cache clientPoolCache; + + private final Configuration conf; + private final int clientPoolSize; + + CachedClientPool(int clientPoolSize, Configuration conf, long evictionInterval) { + this.conf = conf; + this.clientPoolSize = clientPoolSize; + // Since Caffeine does not ensure that removalListener will be involved after expiration + // We use a scheduler with one thread to clean up expired clients. + this.clientPoolCache = + Caffeine.newBuilder() + .expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS) + .removalListener((ignored, value, cause) -> ((HiveClientPool) value).close()) + .scheduler( + Scheduler.forScheduledExecutorService( + new ScheduledThreadPoolExecutor(1, newDaemonThreadFactory()))) + .build(); + } + + @VisibleForTesting + HiveClientPool clientPool() { + Key key = extractKey(); + return clientPoolCache.get(key, k -> new HiveClientPool(clientPoolSize, conf)); + } + + @VisibleForTesting + Cache clientPoolCache() { + return clientPoolCache; + } + + @Override + public R run(Action action) + throws TException, InterruptedException { + return clientPool().run(action); + } + + @Override + public R run(Action action, boolean retry) + throws TException, InterruptedException { + return clientPool().run(action, retry); + } + + @VisibleForTesting + static Key extractKey() { + List elements = Lists.newArrayList(); + try { + elements.add(UserGroupInformation.getCurrentUser().getUserName()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return Key.of(elements); + } + + @Value.Immutable + abstract static class Key { + + abstract List elements(); + + private static Key of(Iterable elements) { + return ImmutableKey.builder().elements(elements).build(); + } + } + + @Value.Immutable + abstract static class ConfElement { + abstract String key(); + + @Nullable + abstract String value(); + + static ConfElement of(String key, String value) { + return ImmutableConfElement.builder().key(key).value(value).build(); + } + } + + private static ThreadFactory newDaemonThreadFactory() { + return new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("hive-metastore-cleaner" + "-%d") + .build(); + } + + public void close() { + clientPoolCache.invalidateAll(); + } +} diff --git a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalog.java b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalog.java index a12df50c663..8e12ba0beb3 100644 --- a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalog.java +++ b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalog.java @@ -6,9 +6,11 @@ import com.datastrato.gravitino.catalog.BaseCatalog; import com.datastrato.gravitino.catalog.CatalogOperations; +import com.datastrato.gravitino.catalog.ProxyPlugin; import com.datastrato.gravitino.rel.SupportsSchemas; import com.datastrato.gravitino.rel.TableCatalog; import java.util.Map; +import java.util.Optional; /** Implementation of a Hive catalog in Gravitino. */ public class HiveCatalog extends BaseCatalog { @@ -43,7 +45,7 @@ protected CatalogOperations newOps(Map config) { */ @Override public SupportsSchemas asSchemas() { - return (HiveCatalogOperations) ops(); + return (SupportsSchemas) ops(); } /** @@ -53,6 +55,18 @@ public SupportsSchemas asSchemas() { */ @Override public TableCatalog asTableCatalog() { - return (HiveCatalogOperations) ops(); + return (TableCatalog) ops(); + } + + @Override + protected Optional newProxyPlugin(Map config) { + boolean impersonationEnabled = + (boolean) + new HiveCatalogPropertiesMeta() + .getOrDefault(config, HiveCatalogPropertiesMeta.IMPERSONATION_ENABLE); + if (!impersonationEnabled) { + return Optional.empty(); + } + return Optional.of(new HiveProxyPlugin()); } } diff --git a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java index 4a6a9fa8211..209b06515ad 100644 --- a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java +++ b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.catalog.hive; import static com.datastrato.gravitino.catalog.BaseCatalog.CATALOG_BYPASS_PREFIX; +import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS; import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.CLIENT_POOL_SIZE; import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.METASTORE_URIS; import static com.datastrato.gravitino.catalog.hive.HiveTable.SUPPORT_TABLE_TYPES; @@ -44,7 +45,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import java.io.IOException; import java.time.Instant; import java.util.Arrays; import java.util.List; @@ -72,7 +72,7 @@ public class HiveCatalogOperations implements CatalogOperations, SupportsSchemas public static final Logger LOG = LoggerFactory.getLogger(HiveCatalogOperations.class); - @VisibleForTesting HiveClientPool clientPool; + @VisibleForTesting CachedClientPool clientPool; @VisibleForTesting HiveConf hiveConf; @@ -136,7 +136,8 @@ public void initialize(Map conf) throws RuntimeException { mergeConfig.forEach(hadoopConf::set); hiveConf = new HiveConf(hadoopConf, HiveCatalogOperations.class); - this.clientPool = new HiveClientPool(getClientPoolSize(conf), hiveConf); + this.clientPool = + new CachedClientPool(getClientPoolSize(conf), hiveConf, getCacheEvictionInterval(conf)); } @VisibleForTesting @@ -144,6 +145,11 @@ int getClientPoolSize(Map conf) { return (int) catalogPropertiesMetadata.getOrDefault(conf, CLIENT_POOL_SIZE); } + long getCacheEvictionInterval(Map conf) { + return (long) + catalogPropertiesMetadata.getOrDefault(conf, CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS); + } + /** Closes the Hive catalog and releases the associated client pool. */ @Override public void close() { @@ -206,7 +212,7 @@ public HiveSchema createSchema( .withConf(hiveConf) .withAuditInfo( new AuditInfo.Builder() - .withCreator(currentUser()) + .withCreator(UserGroupInformation.getCurrentUser().getUserName()) .withCreateTime(Instant.now()) .build()) .build(); @@ -603,7 +609,7 @@ public Table createTable( .withSortOrders(sortOrders) .withAuditInfo( new AuditInfo.Builder() - .withCreator(currentUser()) + .withCreator(UserGroupInformation.getCurrentUser().getUserName()) .withCreateTime(Instant.now()) .build()) .withPartitioning(partitioning) @@ -941,23 +947,6 @@ private boolean dropHiveTable(NameIdentifier tableIdent, boolean deleteData, boo } } - // TODO. We should figure out a better way to get the current user from servlet container. - private static String currentUser() { - String username = null; - try { - username = UserGroupInformation.getCurrentUser().getShortUserName(); - } catch (IOException e) { - LOG.warn("Failed to get Hadoop user", e); - } - - if (username != null) { - return username; - } else { - LOG.warn("Hadoop user is null, defaulting to user.name"); - return System.getProperty("user.name"); - } - } - @Override public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException { return tablePropertiesMetadata; diff --git a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogPropertiesMeta.java b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogPropertiesMeta.java index c0d4b0eae1f..aa22d05c538 100644 --- a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogPropertiesMeta.java +++ b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogPropertiesMeta.java @@ -9,6 +9,7 @@ import com.datastrato.gravitino.catalog.PropertyEntry; import com.google.common.collect.ImmutableMap; import java.util.Map; +import java.util.concurrent.TimeUnit; public class HiveCatalogPropertiesMeta extends BaseCatalogPropertiesMetadata { @@ -17,6 +18,16 @@ public class HiveCatalogPropertiesMeta extends BaseCatalogPropertiesMetadata { public static final String METASTORE_URIS = "metastore.uris"; + public static final String CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS = + "client.pool-cache.eviction-interval-ms"; + + public static final long DEFAULT_CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS = + TimeUnit.MINUTES.toMillis(5); + + public static final String IMPERSONATION_ENABLE = "impersonation-enable"; + + public static final boolean DEFAULT_IMPERSONATION_ENABLE = false; + private static final Map> HIVE_CATALOG_PROPERTY_ENTRIES = ImmutableMap.>builder() .put( @@ -31,6 +42,24 @@ public class HiveCatalogPropertiesMeta extends BaseCatalogPropertiesMetadata { true, DEFAULT_CLIENT_POOL_SIZE, false)) + .put( + CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS, + PropertyEntry.longOptionalPropertyEntry( + CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS, + "The cache pool eviction interval", + true, + DEFAULT_CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS, + false)) + .put( + IMPERSONATION_ENABLE, + PropertyEntry.booleanPropertyEntry( + IMPERSONATION_ENABLE, + "Enable user impersonation for Hive catalog", + false, + true, + DEFAULT_IMPERSONATION_ENABLE, + false, + false)) .putAll(BASIC_CATALOG_PROPERTY_ENTRIES) .build(); diff --git a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveProxyPlugin.java b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveProxyPlugin.java new file mode 100644 index 00000000000..403e3f7b03d --- /dev/null +++ b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveProxyPlugin.java @@ -0,0 +1,51 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.hive; + +import com.datastrato.gravitino.catalog.ProxyPlugin; +import com.datastrato.gravitino.utils.Executable; +import com.datastrato.gravitino.utils.PrincipalUtils; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.Principal; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.Map; +import org.apache.hadoop.security.UserGroupInformation; + +class HiveProxyPlugin implements ProxyPlugin { + + private final UserGroupInformation currentUser; + + HiveProxyPlugin() { + try { + currentUser = UserGroupInformation.getCurrentUser(); + } catch (IOException ioe) { + throw new IllegalStateException("Fail to init HiveCatalogProxyPlugin"); + } + } + + @Override + public Object doAs( + Principal principal, Executable action, Map properties) + throws Throwable { + try { + UserGroupInformation proxyUser = + UserGroupInformation.createProxyUser( + PrincipalUtils.getCurrentPrincipal().getName(), currentUser); + return proxyUser.doAs((PrivilegedExceptionAction) action::execute); + } catch (UndeclaredThrowableException e) { + Throwable innerException = e.getCause(); + if (innerException instanceof PrivilegedActionException) { + throw innerException.getCause(); + } else if (innerException instanceof InvocationTargetException) { + throw innerException.getCause(); + } else { + throw innerException; + } + } + } +} diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestCachedClientPool.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestCachedClientPool.java new file mode 100644 index 00000000000..9ef81c56906 --- /dev/null +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestCachedClientPool.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datastrato.gravitino.catalog.hive; + +import com.datastrato.gravitino.catalog.hive.miniHMS.MiniHiveMetastoreService; +import java.security.PrivilegedAction; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +// Referred from Apache Iceberg's TestCachedClientPool implementation +// hive-metastore/src/test/java/org/apache/iceberg/hive/TestCachedClientPool.java +public class TestCachedClientPool extends MiniHiveMetastoreService { + @Test + public void testClientPoolCleaner() throws InterruptedException { + CachedClientPool clientPool = new CachedClientPool(1, hiveConf, 5000); + HiveClientPool clientPool1 = clientPool.clientPool(); + HiveClientPool cachedClientPool = + clientPool.clientPoolCache().getIfPresent(CachedClientPool.extractKey()); + Assertions.assertSame(clientPool1, cachedClientPool); + TimeUnit.MILLISECONDS.sleep(5000 - TimeUnit.SECONDS.toMillis(2)); + HiveClientPool clientPool2 = clientPool.clientPool(); + Assertions.assertSame(clientPool2, clientPool1); + TimeUnit.MILLISECONDS.sleep(5000 + TimeUnit.SECONDS.toMillis(5)); + Assertions.assertNull(clientPool.clientPoolCache().getIfPresent(CachedClientPool.extractKey())); + + // The client has been really closed. + Assertions.assertTrue(clientPool1.isClosed()); + Assertions.assertTrue(clientPool2.isClosed()); + } + + @Test + public void testCacheKey() throws Exception { + UserGroupInformation current = UserGroupInformation.getCurrentUser(); + UserGroupInformation foo1 = UserGroupInformation.createProxyUser("foo", current); + UserGroupInformation foo2 = UserGroupInformation.createProxyUser("foo", current); + UserGroupInformation bar = UserGroupInformation.createProxyUser("bar", current); + CachedClientPool.Key key1 = + foo1.doAs((PrivilegedAction) CachedClientPool::extractKey); + CachedClientPool.Key key2 = + foo2.doAs((PrivilegedAction) CachedClientPool::extractKey); + CachedClientPool.Key key3 = + bar.doAs((PrivilegedAction) CachedClientPool::extractKey); + Assertions.assertEquals(key1, key2); + Assertions.assertNotEquals(key1, key3); + } +} diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java index 92cd1bf6da3..3ab148c4234 100644 --- a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java @@ -6,7 +6,9 @@ package com.datastrato.gravitino.catalog.hive; import static com.datastrato.gravitino.catalog.BaseCatalog.CATALOG_BYPASS_PREFIX; +import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS; import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.CLIENT_POOL_SIZE; +import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.IMPERSONATION_ENABLE; import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.METASTORE_URIS; import com.datastrato.gravitino.Catalog; @@ -61,14 +63,18 @@ void testPropertyMeta() { Map> propertyEntryMap = hiveCatalogOperations.catalogPropertiesMetadata().propertyEntries(); - Assertions.assertEquals(5, propertyEntryMap.size()); + Assertions.assertEquals(7, propertyEntryMap.size()); Assertions.assertTrue(propertyEntryMap.containsKey(METASTORE_URIS)); Assertions.assertTrue(propertyEntryMap.containsKey(Catalog.PROPERTY_PACKAGE)); Assertions.assertTrue(propertyEntryMap.containsKey(CLIENT_POOL_SIZE)); + Assertions.assertTrue(propertyEntryMap.containsKey(IMPERSONATION_ENABLE)); Assertions.assertTrue(propertyEntryMap.get(METASTORE_URIS).isRequired()); Assertions.assertFalse(propertyEntryMap.get(Catalog.PROPERTY_PACKAGE).isRequired()); Assertions.assertFalse(propertyEntryMap.get(CLIENT_POOL_SIZE).isRequired()); + Assertions.assertFalse( + propertyEntryMap.get(CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS).isRequired()); + Assertions.assertFalse(propertyEntryMap.get(IMPERSONATION_ENABLE).isRequired()); } @Test diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java index 7610c9e0f9a..51cae7a5249 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java @@ -11,6 +11,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import java.util.Map; +import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,7 +88,14 @@ public CatalogOperations ops() { if (ops == null) { Preconditions.checkArgument( entity != null && conf != null, "entity and conf must be set before calling ops()"); - ops = newOps(conf); + CatalogOperations newOps = newOps(conf); + ops = + newProxyPlugin(conf) + .map( + proxyPlugin -> { + return asProxyOps(newOps, proxyPlugin); + }) + .orElse(newOps); } } } @@ -172,4 +180,12 @@ public Audit auditInfo() { Preconditions.checkArgument(entity != null, "entity is not set"); return entity.auditInfo(); } + + protected CatalogOperations asProxyOps(CatalogOperations ops, ProxyPlugin plugin) { + return OperationsProxy.createProxy(ops, plugin); + } + + protected Optional newProxyPlugin(Map config) { + return Optional.empty(); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/OperationsProxy.java b/core/src/main/java/com/datastrato/gravitino/catalog/OperationsProxy.java new file mode 100644 index 00000000000..49a00eca5f8 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/catalog/OperationsProxy.java @@ -0,0 +1,44 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog; + +import com.datastrato.gravitino.utils.PrincipalUtils; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.Collections; + +/** Proxy wrapper on an operation class to execute operations by impersonating given user */ +public class OperationsProxy implements InvocationHandler { + + private final ProxyPlugin plugin; + private final T ops; + + private OperationsProxy(ProxyPlugin plugin, T ops) { + this.plugin = plugin; + this.ops = ops; + } + + public static T createProxy(T ops, ProxyPlugin plugin) { + if (!(ops instanceof CatalogOperations)) { + throw new IllegalArgumentException("Method only supports the type of CatalogOperations"); + } + return createProxyInternal(ops, plugin, ops.getClass().getInterfaces()); + } + + private static T createProxyInternal(T ops, ProxyPlugin plugin, Class[] interfaces) { + return (T) + Proxy.newProxyInstance( + ops.getClass().getClassLoader(), interfaces, new OperationsProxy(plugin, ops)); + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + return plugin.doAs( + PrincipalUtils.getCurrentPrincipal(), + () -> method.invoke(ops, args), + Collections.emptyMap()); + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/PropertyEntry.java b/core/src/main/java/com/datastrato/gravitino/catalog/PropertyEntry.java index d26370d4640..9c4151e53ea 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/PropertyEntry.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/PropertyEntry.java @@ -178,6 +178,28 @@ public static PropertyEntry stringPropertyEntry( .build(); } + public static PropertyEntry longPropertyEntry( + String name, + String description, + boolean required, + boolean immutable, + long defaultValue, + boolean hidden, + boolean reserved) { + return new Builder() + .withName(name) + .withDescription(description) + .withRequired(required) + .withImmutable(immutable) + .withJavaType(Long.class) + .withDefaultValue(defaultValue) + .withDecoder(Long::parseLong) + .withEncoder(String::valueOf) + .withHidden(hidden) + .withReserved(reserved) + .build(); + } + public static PropertyEntry integerPropertyEntry( String name, String description, @@ -239,7 +261,7 @@ public static PropertyEntry stringRequiredPropertyEntry( public static PropertyEntry stringOptionalPropertyEntry( String name, String description, boolean immutable, String defaultValue, boolean hidden) { - return stringPropertyEntry(name, description, false, immutable, null, hidden, false); + return stringPropertyEntry(name, description, false, immutable, defaultValue, hidden, false); } public static PropertyEntry integerOptionalPropertyEntry( @@ -247,6 +269,11 @@ public static PropertyEntry integerOptionalPropertyEntry( return integerPropertyEntry(name, description, false, immutable, defaultValue, hidden, false); } + public static PropertyEntry longOptionalPropertyEntry( + String name, String description, boolean immutable, long defaultValue, boolean hidden) { + return longPropertyEntry(name, description, false, immutable, defaultValue, hidden, false); + } + public static PropertyEntry stringImmutablePropertyEntry( String name, String description, diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/ProxyPlugin.java b/core/src/main/java/com/datastrato/gravitino/catalog/ProxyPlugin.java new file mode 100644 index 00000000000..ceaf959c5dd --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/catalog/ProxyPlugin.java @@ -0,0 +1,24 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog; + +import com.datastrato.gravitino.utils.Executable; +import java.security.Principal; +import java.util.Map; + +/** The catalog can implement their own ProxyPlugin to execute operations by given user. */ +public interface ProxyPlugin { + + /** + * @param principal The given principal to execute the action + * @param action A method need to be executed. + * @param properties The properties which be used when execute the action. + * @return The return value of action. + * @throws Throwable The throwable object which the action throws. + */ + Object doAs( + Principal principal, Executable action, Map properties) + throws Throwable; +} diff --git a/docs/apache-hive-catalog.md b/docs/apache-hive-catalog.md index f7371e357d0..a3e784ff8e8 100644 --- a/docs/apache-hive-catalog.md +++ b/docs/apache-hive-catalog.md @@ -28,11 +28,13 @@ The Hive catalog supports creating, updating, and deleting databases and tables ### Catalog properties -| Property Name | Description | Default Value | Required | Since Version | -|---------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|----------|---------------| -| `metastore.uris` | The Hive metastore service URIs, separate multiple addresses with commas. Such as `thrift://127.0.0.1:9083` | (none) | Yes | 0.2.0 | -| `client.pool-size` | The maximum number of Hive metastore clients in the pool for Gravitino. | 1 | No | 0.2.0 | -| `gravitino.bypass.` | Property name with this prefix passed down to the underlying HMS client for use. Such as `gravitino.bypass.hive.metastore.failure.retries = 3` indicate 3 times of retries upon failure of Thrift metastore calls | (none) | No | 0.2.0 | +| Property Name | Description | Default Value | Required | Since Version | +|------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|----------|---------------| +| `metastore.uris` | The Hive metastore service URIs, separate multiple addresses with commas. Such as `thrift://127.0.0.1:9083` | (none) | Yes | 0.2.0 | +| `client.pool-size` | The maximum number of Hive metastore clients in the pool for Gravitino. | 1 | No | 0.2.0 | +| `gravitino.bypass.` | Property name with this prefix passed down to the underlying HMS client for use. Such as `gravitino.bypass.hive.metastore.failure.retries = 3` indicate 3 times of retries upon failure of Thrift metastore calls | (none) | No | 0.2.0 | +| `client.pool-cache.eviction-interval-ms` | The cache pool eviction interval. | 300000 | No | 0.4.0 | +| `impersonation-enable` | Enable user impersonation for Hive catalog. | false | No | 0.4.0 | ### Catalog operations diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 1ef25a74b2b..986001b3dc6 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -41,6 +41,7 @@ prometheus = "0.16.0" jsqlparser = "4.2" mysql = "8.0.23" postgresql = "42.6.0" +immutables-value = "2.10.0" protobuf-plugin = "0.9.2" spotless-plugin = '6.11.0' @@ -134,6 +135,7 @@ prometheus-servlet = { group = "io.prometheus", name = "simpleclient_servlet", v jsqlparser = { group = "com.github.jsqlparser", name = "jsqlparser", version.ref = "jsqlparser" } mysql-driver = { group = "mysql", name = "mysql-connector-java", version.ref = "mysql" } postgresql-driver = { group = "org.postgresql", name = "postgresql", version.ref = "postgresql" } +immutables-value = { module = "org.immutables:value", version.ref = "immutables-value" } commons-cli = { group = "commons-cli", name = "commons-cli", version.ref = "commons-cli" } [bundles] diff --git a/integration-test/build.gradle.kts b/integration-test/build.gradle.kts index 40a16b60452..57f1d4e0e20 100644 --- a/integration-test/build.gradle.kts +++ b/integration-test/build.gradle.kts @@ -284,7 +284,7 @@ tasks.test { } // Gravitino CI Docker image - environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.7") + environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.8") environment("GRAVITINO_CI_TRINO_DOCKER_IMAGE", "datastrato/gravitino-ci-trino:0.1.3") val testMode = project.properties["testMode"] as? String ?: "embedded" diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/hive/ProxyCatalogHiveIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/hive/ProxyCatalogHiveIT.java new file mode 100644 index 00000000000..e0e10988ac0 --- /dev/null +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/hive/ProxyCatalogHiveIT.java @@ -0,0 +1,262 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.integration.test.catalog.hive; + +import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.IMPERSONATION_ENABLE; +import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.METASTORE_URIS; +import static com.datastrato.gravitino.server.GravitinoServer.WEBSERVER_CONF_PREFIX; + +import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.auth.AuthenticatorType; +import com.datastrato.gravitino.catalog.hive.HiveClientPool; +import com.datastrato.gravitino.client.GravitinoClient; +import com.datastrato.gravitino.client.GravitinoMetaLake; +import com.datastrato.gravitino.dto.rel.partitioning.Partitioning; +import com.datastrato.gravitino.integration.test.container.ContainerSuite; +import com.datastrato.gravitino.integration.test.container.HiveContainer; +import com.datastrato.gravitino.integration.test.util.AbstractIT; +import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; +import com.datastrato.gravitino.rel.Column; +import com.datastrato.gravitino.rel.Table; +import com.datastrato.gravitino.rel.types.Types; +import com.datastrato.gravitino.server.auth.OAuthConfig; +import com.datastrato.gravitino.server.web.JettyServerConfig; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class ProxyCatalogHiveIT extends AbstractIT { + + public static final String METALAKE_NAME = + GravitinoITUtils.genRandomName("ProxyCatalogHiveIT_metalake"); + public static final String CATALOG_NAME = GravitinoITUtils.genRandomName("CatalogHiveIT_catalog"); + public static final String SCHEMA_PREFIX = "ProxyCatalogHiveIT_schema"; + public static final String TABLE_PREFIX = "ProxyCatalogHiveIT_table"; + private static final String PROVIDER = "hive"; + private static final String EXPECT_USER = "datastrato"; + private static final String HADOOP_USER_NAME = "HADOOP_USER_NAME"; + + private static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + private static GravitinoMetaLake metalake; + private static Catalog catalog; + private static HiveClientPool hiveClientPool; + + private static String HIVE_METASTORE_URIS; + private static FileSystem hdfs; + private static String originHadoopUser; + private static GravitinoClient anotherClient; + private static Catalog anotherCatalog; + + @BeforeAll + public static void startIntegrationTest() throws Exception { + originHadoopUser = System.getenv(HADOOP_USER_NAME); + setEnv(HADOOP_USER_NAME, null); + + System.setProperty("user.name", "datastrato"); + + Map configs = Maps.newHashMap(); + configs.put(OAuthConfig.AUTHENTICATOR.getKey(), AuthenticatorType.SIMPLE.name().toLowerCase()); + registerCustomConfigs(configs); + AbstractIT.startIntegrationTest(); + containerSuite.startHiveContainer(); + HIVE_METASTORE_URIS = + String.format( + "thrift://%s:%d", + containerSuite.getHiveContainer().getContainerIpAddress(), + HiveContainer.HIVE_METASTORE_PORT); + HiveConf hiveConf = new HiveConf(); + hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, HIVE_METASTORE_URIS); + + // Check if hive client can connect to hive metastore + hiveClientPool = new HiveClientPool(1, hiveConf); + + Configuration conf = new Configuration(); + conf.set( + "fs.defaultFS", + String.format( + "hdfs://%s:%d", + containerSuite.getHiveContainer().getContainerIpAddress(), + HiveContainer.HDFS_DEFAULTFS_PORT)); + hdfs = FileSystem.get(conf); + JettyServerConfig jettyServerConfig = + JettyServerConfig.fromConfig(serverConfig, WEBSERVER_CONF_PREFIX); + + String uri = "http://" + jettyServerConfig.getHost() + ":" + jettyServerConfig.getHttpPort(); + System.setProperty("user.name", "test"); + anotherClient = GravitinoClient.builder(uri).withSimpleAuth().build(); + createMetalake(); + createCatalog(); + loadCatalogWithAnotherClient(); + } + + @AfterAll + public static void stop() throws Exception { + setEnv(HADOOP_USER_NAME, originHadoopUser); + anotherClient.close(); + } + + @Test + public void testOperateSchema() throws Exception { + // create schema normally using user datastrato + String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX); + String anotherSchemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX); + NameIdentifier ident = NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, schemaName); + NameIdentifier anotherIdent = NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, anotherSchemaName); + Map properties = Maps.newHashMap(); + properties.put("key1", "val1"); + properties.put("key2", "val2"); + properties.put( + "location", + String.format( + "hdfs://%s:%d/user/hive/warehouse/%s.db", + containerSuite.getHiveContainer().getContainerIpAddress(), + HiveContainer.HDFS_DEFAULTFS_PORT, + schemaName.toLowerCase())); + String comment = "comment"; + + catalog.asSchemas().createSchema(ident, comment, properties); + Database db = hiveClientPool.run(client -> client.getDatabase(schemaName)); + Assertions.assertEquals(EXPECT_USER, db.getOwnerName()); + Assertions.assertEquals( + EXPECT_USER, hdfs.getFileStatus(new Path(db.getLocationUri())).getOwner()); + + // create schema with exception using the system user + properties.put( + "location", + String.format( + "hdfs://%s:%d/user/hive/warehouse/%s.db", + containerSuite.getHiveContainer().getContainerIpAddress(), + HiveContainer.HDFS_DEFAULTFS_PORT, + anotherSchemaName.toLowerCase())); + Exception e = + Assertions.assertThrows( + RuntimeException.class, + () -> anotherCatalog.asSchemas().createSchema(anotherIdent, comment, properties)); + Assertions.assertTrue(e.getMessage().contains("AccessControlException Permission denied")); + } + + @Test + public void testOperateTable() throws Exception { + // create table normally using user datastrato + Column[] columns = createColumns(); + String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX); + String tableName = GravitinoITUtils.genRandomName(TABLE_PREFIX); + String anotherTableName = GravitinoITUtils.genRandomName(TABLE_PREFIX); + NameIdentifier ident = NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, schemaName); + NameIdentifier nameIdentifier = + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, schemaName, tableName); + NameIdentifier anotherNameIdentifier = + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, schemaName, anotherTableName); + Map properties = Maps.newHashMap(); + properties.put("key1", "val1"); + properties.put("key2", "val2"); + properties.put( + "location", + String.format( + "hdfs://%s:%d/user/hive/warehouse/%s.db", + containerSuite.getHiveContainer().getContainerIpAddress(), + HiveContainer.HDFS_DEFAULTFS_PORT, + schemaName.toLowerCase())); + String comment = "comment"; + catalog.asSchemas().createSchema(ident, comment, properties); + Table createdTable = + catalog + .asTableCatalog() + .createTable( + nameIdentifier, + columns, + comment, + ImmutableMap.of(), + Partitioning.EMPTY_PARTITIONING); + String location = createdTable.properties().get("location"); + Assertions.assertEquals(EXPECT_USER, hdfs.getFileStatus(new Path(location)).getOwner()); + org.apache.hadoop.hive.metastore.api.Table hiveTab = + hiveClientPool.run(client -> client.getTable(schemaName, tableName)); + Assertions.assertEquals(EXPECT_USER, hiveTab.getOwner()); + + // create table with exception with system user + Exception e = + Assertions.assertThrows( + RuntimeException.class, + () -> + anotherCatalog + .asTableCatalog() + .createTable( + anotherNameIdentifier, + columns, + comment, + ImmutableMap.of(), + Partitioning.EMPTY_PARTITIONING)); + Assertions.assertTrue(e.getMessage().contains("AccessControlException Permission denied")); + } + + private Column[] createColumns() { + Column col1 = Column.of("col1", Types.ByteType.get(), "col_1_comment"); + Column col2 = Column.of("col2", Types.DateType.get(), "col_2_comment"); + Column col3 = Column.of("col3", Types.StringType.get(), "col_3_comment"); + return new Column[] {col1, col2, col3}; + } + + private static void createMetalake() { + GravitinoMetaLake[] gravitinoMetaLakes = client.listMetalakes(); + Assertions.assertEquals(0, gravitinoMetaLakes.length); + + GravitinoMetaLake createdMetalake = + client.createMetalake(NameIdentifier.of(METALAKE_NAME), "comment", Collections.emptyMap()); + GravitinoMetaLake loadMetalake = client.loadMetalake(NameIdentifier.of(METALAKE_NAME)); + Assertions.assertEquals(createdMetalake, loadMetalake); + + metalake = loadMetalake; + } + + private static void createCatalog() { + Map properties = Maps.newHashMap(); + properties.put(METASTORE_URIS, HIVE_METASTORE_URIS); + properties.put(IMPERSONATION_ENABLE, "true"); + + metalake.createCatalog( + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME), + Catalog.Type.RELATIONAL, + PROVIDER, + "comment", + properties); + + catalog = metalake.loadCatalog(NameIdentifier.of(METALAKE_NAME, CATALOG_NAME)); + } + + private static void loadCatalogWithAnotherClient() { + GravitinoMetaLake metaLake = anotherClient.loadMetalake(NameIdentifier.of(METALAKE_NAME)); + anotherCatalog = metaLake.loadCatalog(NameIdentifier.of(METALAKE_NAME, CATALOG_NAME)); + } + + public static void setEnv(String key, String value) { + try { + Map env = System.getenv(); + Class cl = env.getClass(); + Field field = cl.getDeclaredField("m"); + field.setAccessible(true); + Map writableEnv = (Map) field.get(env); + if (value == null) { + writableEnv.remove(key); + } else { + writableEnv.put(key, value); + } + } catch (Exception e) { + throw new IllegalStateException("Failed to set environment variable", e); + } + } +} diff --git a/integration-test/trino-it/docker-compose.yaml b/integration-test/trino-it/docker-compose.yaml index 293efd1209c..19fc71ca64a 100644 --- a/integration-test/trino-it/docker-compose.yaml +++ b/integration-test/trino-it/docker-compose.yaml @@ -6,7 +6,7 @@ version: '3.0' services: hive: - image: datastrato/gravitino-ci-hive:0.1.7 + image: datastrato/gravitino-ci-hive:0.1.8 networks: - trino-net container_name: trino-ci-hive