Skip to content

Commit

Permalink
[Refactor] refactor cloud cred and support cred-isolated cache key (S…
Browse files Browse the repository at this point in the history
  • Loading branch information
dirtysalt authored Sep 5, 2023
1 parent fc39fdc commit e4479f8
Show file tree
Hide file tree
Showing 24 changed files with 334 additions and 165 deletions.
2 changes: 2 additions & 0 deletions be/src/fs/hdfs/hdfs_fs_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ static Status create_hdfs_fs_handle(const std::string& namenode, const std::shar
const std::map<std::string, std::string> cloud_properties = get_cloud_properties(options);
if (!cloud_properties.empty()) {
for (const auto& cloud_property : cloud_properties) {
VLOG_FILE << "[xxx] cloud property: key = " << cloud_property.first.data()
<< ", value = " << cloud_property.second.data();
hdfsBuilderConfSetStr(hdfs_builder, cloud_property.first.data(), cloud_property.second.data());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.


package com.starrocks.connector.delta;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -55,7 +54,7 @@ public DeltaLakeInternalMgr(String catalogName, Map<String, String> properties,

public IHiveMetastore createHiveMetastore() {
// TODO(stephen): Abstract the creator class to construct hive meta client
HiveMetaClient metaClient = HiveMetaClient.createHiveMetaClient(properties);
HiveMetaClient metaClient = HiveMetaClient.createHiveMetaClient(hdfsEnvironment, properties);
IHiveMetastore hiveMetastore = new HiveMetastore(metaClient, catalogName);
IHiveMetastore baseHiveMetastore;
if (!enableMetastoreCache) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.


package com.starrocks.connector.hive;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -348,7 +347,7 @@ public Map<String, Partition> getPartitionsByNames(String dbName, String tblName

private Map<HivePartitionName, Partition> loadPartitionsByNames(Iterable<? extends HivePartitionName> partitionNames) {
HivePartitionName hivePartitionName = Iterables.get(partitionNames, 0);
Map<String, Partition> partitionsByNames = metastore.getPartitionsByNames(
Map<String, Partition> partitionsByNames = metastore.getPartitionsByNames(
hivePartitionName.getDatabaseName(),
hivePartitionName.getTableName(),
Streams.stream(partitionNames).map(partitionName -> partitionName.getPartitionNames().get())
Expand Down Expand Up @@ -428,7 +427,7 @@ private Map<HivePartitionName, HivePartitionStats> loadPartitionsStatistics(
HivePartitionName hivePartitionName = Iterables.get(partitionNames, 0);
Table table = getTable(hivePartitionName.getDatabaseName(), hivePartitionName.getTableName());

Map<String, HivePartitionStats> partitionsStatistics = metastore.getPartitionStatistics(table,
Map<String, HivePartitionStats> partitionsStatistics = metastore.getPartitionStatistics(table,
Streams.stream(partitionNames).map(partitionName -> partitionName.getPartitionNames().get())
.collect(Collectors.toList()));

Expand All @@ -441,7 +440,7 @@ private Map<HivePartitionName, HivePartitionStats> loadPartitionsStatistics(

@Override
public List<HivePartitionName> refreshTable(String hiveDbName, String hiveTblName,
boolean onlyCachedPartitions) {
boolean onlyCachedPartitions) {
HiveTableName hiveTableName = HiveTableName.of(hiveDbName, hiveTblName);
tableNameLockMap.putIfAbsent(hiveTableName, hiveDbName + "_" + hiveTblName + "_lock");
String lockStr = tableNameLockMap.get(hiveTableName);
Expand Down Expand Up @@ -534,9 +533,9 @@ public List<HivePartitionName> refreshTableBackground(String hiveDbName, String
}

private <T> List<HivePartitionName> refreshPartitions(List<HivePartitionName> presentInCache,
List<String> partitionNamesInHMS,
Function<List<HivePartitionName>, Map<HivePartitionName, T>> reload,
LoadingCache<HivePartitionName, T> cache) {
List<String> partitionNamesInHMS,
Function<List<HivePartitionName>, Map<HivePartitionName, T>> reload,
LoadingCache<HivePartitionName, T> cache) {
List<HivePartitionName> needToRefresh = Lists.newArrayList();
List<HivePartitionName> needToInvalidate = Lists.newArrayList();
for (HivePartitionName name : presentInCache) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.


package com.starrocks.connector.hive;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -111,7 +110,7 @@ public void shutdown() {

public IHiveMetastore createHiveMetastore() {
// TODO(stephen): Abstract the creator class to construct hive meta client
HiveMetaClient metaClient = HiveMetaClient.createHiveMetaClient(properties);
HiveMetaClient metaClient = HiveMetaClient.createHiveMetaClient(hdfsEnvironment, properties);
IHiveMetastore hiveMetastore = new HiveMetastore(metaClient, catalogName);
IHiveMetastore baseHiveMetastore;
if (!enableMetastoreCache) {
Expand Down Expand Up @@ -204,4 +203,4 @@ public boolean isEnableBackgroundRefreshHiveMetadata() {
public MetastoreType getMetastoreType() {
return metastoreType;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.


package com.starrocks.connector.hive;

import com.google.common.collect.Lists;
import com.starrocks.common.Config;
import com.starrocks.connector.HdfsEnvironment;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.hive.events.MetastoreNotificationFetchException;
import com.starrocks.connector.hive.glue.AWSCatalogMetastoreClient;
Expand Down Expand Up @@ -69,8 +69,9 @@ public HiveMetaClient(HiveConf conf) {
this.conf = conf;
}

public static HiveMetaClient createHiveMetaClient(Map<String, String> properties) {
public static HiveMetaClient createHiveMetaClient(HdfsEnvironment env, Map<String, String> properties) {
HiveConf conf = new HiveConf();
conf.addResource(env.getConfiguration());
properties.forEach(conf::set);
if (properties.containsKey(HIVE_METASTORE_URIS)) {
conf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), properties.get(HIVE_METASTORE_URIS));
Expand Down Expand Up @@ -351,14 +352,18 @@ public Map<String, List<ColumnStatisticsObj>> getPartitionColumnStats(String dbN
* hive metastore is false. The hive metastore will throw StackOverFlow exception.
* We solve this problem by get partitions information multiple times.
* Each retry reduces the number of partitions fetched by half until only one partition is fetched at a time.
*
* @return Hive table partitions
* @throws StarRocksConnectorException If there is an exception with only one partition at a time when get partition,
* then we determine that there is a bug with the user's hive metastore.
* then we determine that there is a bug with the user's hive metastore.
*/
private List<Partition> getPartitionsWithRetry(String dbName, String tableName,
List<String> partNames, int retryNum) throws StarRocksConnectorException {
int subListSize = (int) Math.pow(2, retryNum);
int subListNum = partNames.size() / subListSize;
if (subListNum == 0) {
subListNum = 1;
}
List<List<String>> partNamesList = Lists.partition(partNames, subListNum);
List<Partition> partitions = Lists.newArrayList();

Expand Down Expand Up @@ -415,6 +420,7 @@ public NotificationEventResponse getNextNotification(long lastEventId,

static class ClassUtils {
private static final HashMap WRAPPER_TO_PRIMITIVE = new HashMap();

static {
WRAPPER_TO_PRIMITIVE.put(Boolean.class, Boolean.TYPE);
WRAPPER_TO_PRIMITIVE.put(Character.class, Character.TYPE);
Expand Down Expand Up @@ -452,4 +458,4 @@ public static Class<?> getWrapperType(Class<?> primitiveType) {
return (Class) WRAPPER_TO_PRIMITIVE.get(primitiveType);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.


package com.starrocks.connector.hive;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -320,4 +319,4 @@ public NotificationEventResponse getNextEventResponse(long lastSyncedEventId, St
"Last synced event id is " + lastSyncedEventId, e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.


package com.starrocks.connector.hive;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -101,4 +100,4 @@ default void invalidatePartitionKeys(HivePartitionValue partitionValue) {
default long getCurrentEventId() {
return -1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.


package com.starrocks.connector.hudi;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -105,7 +104,7 @@ public void shutdown() {

public IHiveMetastore createHiveMetastore() {
// TODO(stephen): Abstract the creator class to construct hive meta client
HiveMetaClient metaClient = HiveMetaClient.createHiveMetaClient(properties);
HiveMetaClient metaClient = HiveMetaClient.createHiveMetaClient(hdfsEnvironment, properties);
IHiveMetastore hiveMetastore = new HiveMetastore(metaClient, catalogName);
IHiveMetastore baseHiveMetastore;
if (!enableMetastoreCache) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ public interface CloudConfiguration {

// Convert to the protobuf used by staros.
FileStoreInfo toFileStoreInfo();

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
* Mapping used config key in StarRocks
*/
public class CloudConfigurationConstants {

// Credential for AWS s3
public static final String AWS_S3_USE_AWS_SDK_DEFAULT_BEHAVIOR = "aws.s3.use_aws_sdk_default_behavior";
public static final String AWS_S3_USE_INSTANCE_PROFILE = "aws.s3.use_instance_profile";
Expand Down Expand Up @@ -46,7 +45,6 @@ public class CloudConfigurationConstants {
*/
public static final String AWS_S3_ENABLE_SSL = "aws.s3.enable_ssl";


public static final String AWS_GLUE_USE_AWS_SDK_DEFAULT_BEHAVIOR = "aws.glue.use_aws_sdk_default_behavior";
public static final String AWS_GLUE_USE_INSTANCE_PROFILE = "aws.glue.use_instance_profile";
public static final String AWS_GLUE_ACCESS_KEY = "aws.glue.access_key";
Expand All @@ -57,7 +55,6 @@ public class CloudConfigurationConstants {
public static final String AWS_GLUE_REGION = "aws.glue.region";
public static final String AWS_GLUE_ENDPOINT = "aws.glue.endpoint";


// Credential for Azure storage
// For Azure Blob Storage

Expand All @@ -83,21 +80,33 @@ public class CloudConfigurationConstants {

// Credential for Google Cloud Platform (GCP)
// For Google Cloud Storage (GCS)
public static final String GCP_GCS_USE_COMPUTE_ENGINE_SERVICE_ACCOUNT = "gcp.gcs.use_compute_engine_service_account";
public static final String GCP_GCS_USE_COMPUTE_ENGINE_SERVICE_ACCOUNT =
"gcp.gcs.use_compute_engine_service_account";
public static final String GCP_GCS_SERVICE_ACCOUNT_EMAIL = "gcp.gcs.service_account_email";
public static final String GCP_GCS_SERVICE_ACCOUNT_PRIVATE_KEY = "gcp.gcs.service_account_private_key";
public static final String GCP_GCS_SERVICE_ACCOUNT_PRIVATE_KEY_ID = "gcp.gcs.service_account_private_key_id";
public static final String GCP_GCS_SERVICE_ACCOUNT_IMPERSONATION_SERVICE_ACCOUNT =
"gcp.gcs.impersonation_service_account";

// Credential for HDFS
// TODO: Refactor the name of parameters
public static final String HDFS_AUTHENTICATION = "hadoop.security.authentication";
public static final String HDFS_USER_NAME = "username";
public static final String HDFS_PASSWORD = "password";
public static final String HDFS_KERBEROS_PRINCIPAL = "kerberos_principal";
public static final String HDFS_KERBEROS_KEYTAB = "kerberos_keytab";
public static final String HDFS_KERBEROS_KEYTAB_CONTENT = "kerberos_keytab_content";
@Deprecated
public static final String HDFS_USERNAME_DEPRECATED = "username";
public static final String HDFS_USERNAME = "hadoop.username";
@Deprecated
public static final String HDFS_PASSWORD_DEPRECATED = "password";
public static final String HDFS_PASSWORD = "hadoop.password";
public static final String HDFS_KERBEROS_PRINCIPAL_DEPRECATED = "kerberos_principal";
public static final String HDFS_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
@Deprecated
public static final String HDFS_KERBEROS_KEYTAB_DEPRECATED = "kerberos_keytab";
public static final String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab";

@Deprecated
public static final String HDFS_KERBEROS_KEYTAB_CONTENT_DEPRECATED = "kerberos_keytab_content";
public static final String HADOOP_KERBEROS_KEYTAB_CONTENT = "hadoop.kerberos.keytab_content";
public static final String HDFS_CONFIG_RESOURCES = "hadoop.config.resources";
public static final String HDFS_RUNTIME_JARS = "hadoop.runtime.jars";

// Credential for Aliyun OSS
public static final String ALIYUN_OSS_ACCESS_KEY = "aliyun.oss.access_key";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class CloudConfigurationFactory {
new AliyunCloudConfigurationProvider(),
new HDFSCloudConfigurationProvider());


public static CloudConfiguration buildCloudConfigurationForStorage(Map<String, String> properties) {
for (CloudConfigurationProvider factory : cloudConfigurationFactoryChain) {
CloudConfiguration cloudConfiguration = factory.build(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ public enum CloudType {
AWS,
AZURE,
GCP,
HDFS,
ALIYUN
ALIYUN,
HDFS
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public AzureCloudConfiguration(AzureStorageCloudCredential azureStorageCloudCred
@Override
public void toThrift(TCloudConfiguration tCloudConfiguration) {
tCloudConfiguration.setCloud_type(TCloudType.AZURE);

Map<String, String> properties = new HashMap<>();
azureStorageCloudCredential.toThrift(properties);
tCloudConfiguration.setCloud_properties_v2(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,85 @@

package com.starrocks.credential.hdfs;

import autovalue.shaded.com.google.common.common.base.Preconditions;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.staros.proto.FileStoreInfo;
import com.starrocks.StarRocksFE;
import com.starrocks.credential.CloudConfiguration;
import com.starrocks.credential.CloudType;
import com.starrocks.thrift.TCloudConfiguration;
import com.starrocks.thrift.TCloudType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashMap;
import java.util.Map;

import static com.starrocks.credential.CloudConfigurationConstants.HDFS_CONFIG_RESOURCES;
import static com.starrocks.credential.CloudConfigurationConstants.HDFS_RUNTIME_JARS;

public class HDFSCloudConfiguration implements CloudConfiguration {
private static final Logger LOG = LogManager.getLogger(HDFSCloudConfiguration.class);

private final HDFSCloudCredential hdfsCloudCredential;
private String configResources;
private String runtimeJars;

private static final String CONFIG_RESOURCES_SEPERATOR = ",";

public HDFSCloudConfiguration(HDFSCloudCredential hdfsCloudCredential) {
Preconditions.checkNotNull(hdfsCloudCredential);
this.hdfsCloudCredential = hdfsCloudCredential;
}

public void setConfigResources(String configResources) {
this.configResources = configResources;
}

public void setRuntimeJars(String runtimeJars) {
this.runtimeJars = runtimeJars;
}

public HDFSCloudCredential getHdfsCloudCredential() {
return hdfsCloudCredential;
}

public void addConfigResourcesToConfiguration(String configResources, Configuration conf) {
if (Strings.isNullOrEmpty(configResources)) {
return;
}
String[] parts = configResources.split(CONFIG_RESOURCES_SEPERATOR);
for (String p : parts) {
Path path = new Path(StarRocksFE.STARROCKS_HOME_DIR + "/conf/", p);
LOG.debug(String.format("Add path '%s' to configuration", path.toString()));
conf.addResource(path);
}
}

@Override
public void toThrift(TCloudConfiguration tCloudConfiguration) {
// TODO
tCloudConfiguration.setCloud_type(TCloudType.HDFS);
Map<String, String> properties = new HashMap<>();
hdfsCloudCredential.toThrift(properties);
properties.put(HDFS_CONFIG_RESOURCES, configResources);
properties.put(HDFS_RUNTIME_JARS, runtimeJars);
tCloudConfiguration.setCloud_properties_v2(properties);
}

@Override
public void applyToConfiguration(Configuration configuration) {
// TODO
hdfsCloudCredential.applyToConfiguration(configuration);
addConfigResourcesToConfiguration(configResources, configuration);
}

@Override
public String getCredentialString() {
return hdfsCloudCredential.getCredentialString();
return "HDFSCloudConfiguration{" +
"configResources=" + configResources +
", runtimeJars=" + runtimeJars +
", credential=" + hdfsCloudCredential.getCredentialString() + "}";
}

@Override
Expand Down
Loading

0 comments on commit e4479f8

Please sign in to comment.