Skip to content

Commit

Permalink
feat: add custom dynamodb endpoint configuration (#2575)
Browse files Browse the repository at this point in the history
# Description

Add custom dynamodb endpoint configuration
Users can specify dynamodb endpoint in the map or set the environment
variable `AWS_ENDPOINT_URL_DYNAMODB` to override dynamodb client
endpoint.


# Related Issue(s)
[2498](#2498)

# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
hnaoto authored Jun 7, 2024
1 parent f23c92d commit 0a44a0d
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 2 deletions.
52 changes: 51 additions & 1 deletion crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,12 @@ impl DynamoDbLockClient {
lock_table_name: Option<String>,
billing_mode: Option<String>,
max_elapsed_request_time: Option<String>,
dynamodb_override_endpoint: Option<String>,
) -> Result<Self, DynamoDbConfigError> {
let dynamodb_client = aws_sdk_dynamodb::Client::new(sdk_config);
let dynamodb_sdk_config =
Self::create_dynamodb_sdk_config(sdk_config, dynamodb_override_endpoint);

let dynamodb_client = aws_sdk_dynamodb::Client::new(&dynamodb_sdk_config);

let lock_table_name = lock_table_name
.or_else(|| std::env::var(constants::LOCK_TABLE_KEY_NAME).ok())
Expand Down Expand Up @@ -177,6 +181,24 @@ impl DynamoDbLockClient {
config,
})
}
fn create_dynamodb_sdk_config(
sdk_config: &SdkConfig,
dynamodb_override_endpoint: Option<String>,
) -> SdkConfig {
/*
if dynamodb_override_endpoint exists/AWS_ENDPOINT_URL_DYNAMODB is specified by user
use dynamodb_override_endpoint to create dynamodb client
*/
let dynamodb_sdk_config = match dynamodb_override_endpoint {
Some(dynamodb_endpoint_url) => sdk_config
.to_owned()
.to_builder()
.endpoint_url(dynamodb_endpoint_url)
.build(),
None => sdk_config.to_owned(),
};
dynamodb_sdk_config
}

/// Create the lock table where DynamoDb stores the commit information for all delta tables.
///
Expand Down Expand Up @@ -663,6 +685,7 @@ fn extract_version_from_filename(name: &str) -> Option<i64> {
#[cfg(test)]
mod tests {
use super::*;
use aws_config::Region;
use object_store::memory::InMemory;
use serial_test::serial;

Expand Down Expand Up @@ -711,4 +734,31 @@ mod tests {
.unwrap();
assert_eq!(logstore.name(), "DefaultLogStore");
}

#[test]
#[serial]
fn test_create_dynamodb_sdk_config() {
let sdk_config = SdkConfig::builder()
.region(Region::from_static("eu-west-1"))
.endpoint_url("http://localhost:1234")
.build();
let dynamodb_sdk_config = DynamoDbLockClient::create_dynamodb_sdk_config(
&sdk_config,
Some("http://localhost:2345".to_string()),
);
assert_eq!(
dynamodb_sdk_config.endpoint_url(),
Some("http://localhost:2345"),
);
assert_eq!(
dynamodb_sdk_config.region().unwrap().to_string(),
"eu-west-1".to_string(),
);
let dynamodb_sdk_no_override_config =
DynamoDbLockClient::create_dynamodb_sdk_config(&sdk_config, None);
assert_eq!(
dynamodb_sdk_no_override_config.endpoint_url(),
Some("http://localhost:1234"),
);
}
}
1 change: 1 addition & 0 deletions crates/aws/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl S3DynamoDbLogStore {
.extra_opts
.get(constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME)
.cloned(),
s3_options.dynamodb_endpoint.clone(),
)
.map_err(|err| DeltaTableError::ObjectStore {
source: ObjectStoreError::Generic {
Expand Down
59 changes: 59 additions & 0 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
pub struct S3StorageOptions {
pub virtual_hosted_style_request: bool,
pub locking_provider: Option<String>,
pub dynamodb_endpoint: Option<String>,
pub s3_pool_idle_timeout: Duration,
pub sts_pool_idle_timeout: Duration,
pub s3_get_internal_server_error_retries: usize,
Expand All @@ -122,6 +123,7 @@ impl PartialEq for S3StorageOptions {
fn eq(&self, other: &Self) -> bool {
self.virtual_hosted_style_request == other.virtual_hosted_style_request
&& self.locking_provider == other.locking_provider
&& self.dynamodb_endpoint == other.dynamodb_endpoint
&& self.s3_pool_idle_timeout == other.s3_pool_idle_timeout
&& self.sts_pool_idle_timeout == other.sts_pool_idle_timeout
&& self.s3_get_internal_server_error_retries
Expand Down Expand Up @@ -219,6 +221,7 @@ impl S3StorageOptions {
Ok(Self {
virtual_hosted_style_request,
locking_provider: str_option(options, s3_constants::AWS_S3_LOCKING_PROVIDER),
dynamodb_endpoint: str_option(options, s3_constants::AWS_ENDPOINT_URL_DYNAMODB),
s3_pool_idle_timeout: Duration::from_secs(s3_pool_idle_timeout),
sts_pool_idle_timeout: Duration::from_secs(sts_pool_idle_timeout),
s3_get_internal_server_error_retries,
Expand Down Expand Up @@ -421,6 +424,10 @@ impl ObjectStore for S3StorageBackend {
pub mod s3_constants {
/// Custom S3 endpoint.
pub const AWS_ENDPOINT_URL: &str = "AWS_ENDPOINT_URL";
/// Custom DynamoDB endpoint.
/// If DynamoDB endpoint is not supplied, will use S3 endpoint (AWS_ENDPOINT_URL)
/// If it is supplied, this endpoint takes precedence over the global endpoint set in AWS_ENDPOINT_URL for DynamoDB
pub const AWS_ENDPOINT_URL_DYNAMODB: &str = "AWS_ENDPOINT_URL_DYNAMODB";
/// The AWS region.
pub const AWS_REGION: &str = "AWS_REGION";
/// The AWS profile.
Expand Down Expand Up @@ -490,6 +497,7 @@ pub mod s3_constants {
/// field of [crate::storage::s3::S3StorageOptions].
pub const S3_OPTS: &[&str] = &[
AWS_ENDPOINT_URL,
AWS_ENDPOINT_URL_DYNAMODB,
AWS_REGION,
AWS_PROFILE,
AWS_ACCESS_KEY_ID,
Expand Down Expand Up @@ -613,6 +621,7 @@ mod tests {
.build(),
virtual_hosted_style_request: false,
locking_provider: Some("dynamodb".to_string()),
dynamodb_endpoint: None,
s3_pool_idle_timeout: Duration::from_secs(15),
sts_pool_idle_timeout: Duration::from_secs(10),
s3_get_internal_server_error_retries: 10,
Expand Down Expand Up @@ -674,6 +683,51 @@ mod tests {
.build(),
virtual_hosted_style_request: true,
locking_provider: Some("another_locking_provider".to_string()),
dynamodb_endpoint: None,
s3_pool_idle_timeout: Duration::from_secs(1),
sts_pool_idle_timeout: Duration::from_secs(2),
s3_get_internal_server_error_retries: 3,
extra_opts: hashmap! {
s3_constants::AWS_S3_ADDRESSING_STYLE.to_string() => "virtual".to_string()
},
allow_unsafe_rename: false,
},
options
);
});
}

#[test]
#[serial]
fn storage_options_from_map_with_dynamodb_endpoint_test() {
ScopedEnv::run(|| {
clear_env_of_aws_keys();
let options = S3StorageOptions::from_map(&hashmap! {
s3_constants::AWS_ENDPOINT_URL.to_string() => "http://localhost:1234".to_string(),
s3_constants::AWS_ENDPOINT_URL_DYNAMODB.to_string() => "http://localhost:2345".to_string(),
s3_constants::AWS_REGION.to_string() => "us-west-2".to_string(),
s3_constants::AWS_PROFILE.to_string() => "default".to_string(),
s3_constants::AWS_S3_ADDRESSING_STYLE.to_string() => "virtual".to_string(),
s3_constants::AWS_S3_LOCKING_PROVIDER.to_string() => "another_locking_provider".to_string(),
s3_constants::AWS_S3_ASSUME_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/another_role".to_string(),
s3_constants::AWS_S3_ROLE_SESSION_NAME.to_string() => "another_session_name".to_string(),
s3_constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string() => "another_token_file".to_string(),
s3_constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "1".to_string(),
s3_constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "2".to_string(),
s3_constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES.to_string() => "3".to_string(),
s3_constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(),
s3_constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(),
}).unwrap();

assert_eq!(
S3StorageOptions {
sdk_config: SdkConfig::builder()
.endpoint_url("http://localhost:1234".to_string())
.region(Region::from_static("us-west-2"))
.build(),
virtual_hosted_style_request: true,
locking_provider: Some("another_locking_provider".to_string()),
dynamodb_endpoint: Some("http://localhost:2345".to_string()),
s3_pool_idle_timeout: Duration::from_secs(1),
sts_pool_idle_timeout: Duration::from_secs(2),
s3_get_internal_server_error_retries: 3,
Expand All @@ -693,6 +747,10 @@ mod tests {
ScopedEnv::run(|| {
clear_env_of_aws_keys();
std::env::set_var(s3_constants::AWS_ENDPOINT_URL, "http://localhost");
std::env::set_var(
s3_constants::AWS_ENDPOINT_URL_DYNAMODB,
"http://localhost:dynamodb",
);
std::env::set_var(s3_constants::AWS_REGION, "us-west-1");
std::env::set_var(s3_constants::AWS_PROFILE, "default");
std::env::set_var(s3_constants::AWS_ACCESS_KEY_ID, "wrong_key_id");
Expand Down Expand Up @@ -724,6 +782,7 @@ mod tests {
.build(),
virtual_hosted_style_request: false,
locking_provider: Some("dynamodb".to_string()),
dynamodb_endpoint: Some("http://localhost:dynamodb".to_string()),
s3_pool_idle_timeout: Duration::from_secs(1),
sts_pool_idle_timeout: Duration::from_secs(2),
s3_get_internal_server_error_retries: 3,
Expand Down
3 changes: 2 additions & 1 deletion crates/aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};

use aws_sdk_dynamodb::types::BillingMode;
use deltalake_aws::logstore::{RepairLogEntryResult, S3DynamoDbLogStore};
use deltalake_aws::storage::S3StorageOptions;
use deltalake_aws::storage::{s3_constants, S3StorageOptions};
use deltalake_aws::{CommitEntry, DynamoDbConfig, DynamoDbLockClient};
use deltalake_core::kernel::{Action, Add, DataType, PrimitiveType, StructField, StructType};
use deltalake_core::logstore::LogStore;
Expand Down Expand Up @@ -42,6 +42,7 @@ fn make_client() -> TestResult<DynamoDbLockClient> {
None,
None,
None,
None,
)?)
}

Expand Down

0 comments on commit 0a44a0d

Please sign in to comment.