Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add with_tokio_runtime to HTTP stores #4040

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ use crate::aws::checksum::Checksum;
use crate::aws::credential::{AwsCredential, CredentialExt, CredentialProvider};
use crate::aws::STRICT_PATH_ENCODE_SET;
use crate::client::pagination::stream_paginated;
use crate::client::retry::RetryExt;
use crate::client::retry::{ClientConfig, RetryExt};
use crate::multipart::UploadPart;
use crate::path::DELIMITER;
use crate::util::{format_http_range, format_prefix};
use crate::{
BoxStream, ClientOptions, ListResult, MultipartId, ObjectMeta, Path, Result,
RetryConfig, StreamExt,
StreamExt,
};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
Expand Down Expand Up @@ -208,7 +208,7 @@ pub struct S3Config {
pub bucket: String,
pub bucket_endpoint: String,
pub credentials: Box<dyn CredentialProvider>,
pub retry_config: RetryConfig,
pub client_config: ClientConfig,
pub client_options: ClientOptions,
pub sign_payload: bool,
pub checksum: Option<Checksum>,
Expand Down Expand Up @@ -271,7 +271,7 @@ impl S3Client {
self.config.sign_payload,
None,
)
.send_retry(&self.config.retry_config)
.send_retry(&self.config.client_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
Expand Down Expand Up @@ -317,7 +317,7 @@ impl S3Client {
self.config.sign_payload,
payload_sha256,
)
.send_retry(&self.config.retry_config)
.send_retry(&self.config.client_config)
.await
.context(PutRequestSnafu {
path: path.as_ref(),
Expand Down Expand Up @@ -345,7 +345,7 @@ impl S3Client {
self.config.sign_payload,
None,
)
.send_retry(&self.config.retry_config)
.send_retry(&self.config.client_config)
.await
.context(DeleteRequestSnafu {
path: path.as_ref(),
Expand All @@ -370,7 +370,7 @@ impl S3Client {
self.config.sign_payload,
None,
)
.send_retry(&self.config.retry_config)
.send_retry(&self.config.client_config)
.await
.context(CopyRequestSnafu {
path: from.as_ref(),
Expand Down Expand Up @@ -422,7 +422,7 @@ impl S3Client {
self.config.sign_payload,
None,
)
.send_retry(&self.config.retry_config)
.send_retry(&self.config.client_config)
.await
.context(ListRequestSnafu)?
.bytes()
Expand Down Expand Up @@ -476,7 +476,7 @@ impl S3Client {
self.config.sign_payload,
None,
)
.send_retry(&self.config.retry_config)
.send_retry(&self.config.client_config)
.await
.context(CreateMultipartRequestSnafu)?
.bytes()
Expand Down Expand Up @@ -521,7 +521,7 @@ impl S3Client {
self.config.sign_payload,
None,
)
.send_retry(&self.config.retry_config)
.send_retry(&self.config.client_config)
.await
.context(CompleteMultipartRequestSnafu)?;

Expand Down
36 changes: 18 additions & 18 deletions object_store/src/aws/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
// under the License.

use crate::aws::STRICT_ENCODE_SET;
use crate::client::retry::RetryExt;
use crate::client::retry::{ClientConfig, RetryExt};
use crate::client::token::{TemporaryToken, TokenCache};
use crate::util::hmac_sha256;
use crate::{Result, RetryConfig};
use crate::Result;
use bytes::Buf;
use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
Expand Down Expand Up @@ -315,7 +315,7 @@ impl CredentialProvider for StaticCredentialProvider {
pub struct InstanceCredentialProvider {
pub cache: TokenCache<Arc<AwsCredential>>,
pub client: Client,
pub retry_config: RetryConfig,
pub client_config: ClientConfig,
pub imdsv1_fallback: bool,
pub metadata_endpoint: String,
}
Expand All @@ -325,7 +325,7 @@ impl CredentialProvider for InstanceCredentialProvider {
Box::pin(self.cache.get_or_insert_with(|| {
instance_creds(
&self.client,
&self.retry_config,
&self.client_config,
&self.metadata_endpoint,
self.imdsv1_fallback,
)
Expand All @@ -348,15 +348,15 @@ pub struct WebIdentityProvider {
pub session_name: String,
pub endpoint: String,
pub client: Client,
pub retry_config: RetryConfig,
pub client_config: ClientConfig,
}

impl CredentialProvider for WebIdentityProvider {
fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
Box::pin(self.cache.get_or_insert_with(|| {
web_identity(
&self.client,
&self.retry_config,
&self.client_config,
&self.token_path,
&self.role_arn,
&self.session_name,
Expand Down Expand Up @@ -392,7 +392,7 @@ impl From<InstanceCredentials> for AwsCredential {
/// <https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials>
async fn instance_creds(
client: &Client,
retry_config: &RetryConfig,
config: &ClientConfig,
endpoint: &str,
imdsv1_fallback: bool,
) -> Result<TemporaryToken<Arc<AwsCredential>>, StdError> {
Expand All @@ -404,7 +404,7 @@ async fn instance_creds(
let token_result = client
.request(Method::PUT, token_url)
.header("X-aws-ec2-metadata-token-ttl-seconds", "600") // 10 minute TTL
.send_retry(retry_config)
.send_retry(config)
.await;

let token = match token_result {
Expand All @@ -425,7 +425,7 @@ async fn instance_creds(
role_request = role_request.header(AWS_EC2_METADATA_TOKEN_HEADER, token);
}

let role = role_request.send_retry(retry_config).await?.text().await?;
let role = role_request.send_retry(config).await?.text().await?;

let creds_url = format!("{endpoint}/{CREDENTIALS_PATH}/{role}");
let mut creds_request = client.request(Method::GET, creds_url);
Expand All @@ -434,7 +434,7 @@ async fn instance_creds(
}

let creds: InstanceCredentials =
creds_request.send_retry(retry_config).await?.json().await?;
creds_request.send_retry(config).await?.json().await?;

let now = Utc::now();
let ttl = (creds.expiration - now).to_std().unwrap_or_default();
Expand Down Expand Up @@ -478,7 +478,7 @@ impl From<AssumeRoleCredentials> for AwsCredential {
/// <https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-technical-overview.html>
async fn web_identity(
client: &Client,
retry_config: &RetryConfig,
config: &ClientConfig,
token_path: &str,
role_arn: &str,
session_name: &str,
Expand All @@ -497,7 +497,7 @@ async fn web_identity(
("Version", "2011-06-15"),
("WebIdentityToken", &token),
])
.send_retry(retry_config)
.send_retry(config)
.await?
.bytes()
.await?;
Expand Down Expand Up @@ -709,7 +709,7 @@ mod tests {
// For example https://github.com/aws/amazon-ec2-metadata-mock
let endpoint = env::var("EC2_METADATA_ENDPOINT").unwrap();
let client = Client::new();
let retry_config = RetryConfig::default();
let config = ClientConfig::default();

// Verify only allows IMDSv2
let resp = client
Expand All @@ -724,7 +724,7 @@ mod tests {
"Ensure metadata endpoint is set to only allow IMDSv2"
);

let creds = instance_creds(&client, &retry_config, &endpoint, false)
let creds = instance_creds(&client, &config, &endpoint, false)
.await
.unwrap();

Expand All @@ -749,7 +749,7 @@ mod tests {

let endpoint = server.url();
let client = Client::new();
let retry_config = RetryConfig::default();
let config = ClientConfig::default();

// Test IMDSv2
server.push_fn(|req| {
Expand All @@ -775,7 +775,7 @@ mod tests {
Response::new(Body::from(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#))
});

let creds = instance_creds(&client, &retry_config, endpoint, true)
let creds = instance_creds(&client, &config, endpoint, true)
.await
.unwrap();

Expand Down Expand Up @@ -808,7 +808,7 @@ mod tests {
Response::new(Body::from(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#))
});

let creds = instance_creds(&client, &retry_config, endpoint, true)
let creds = instance_creds(&client, &config, endpoint, true)
.await
.unwrap();

Expand All @@ -825,7 +825,7 @@ mod tests {
);

// Should fail
instance_creds(&client, &retry_config, endpoint, false)
instance_creds(&client, &config, endpoint, false)
.await
.unwrap_err();
}
Expand Down
78 changes: 51 additions & 27 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use std::ops::Range;
use std::str::FromStr;
use std::sync::Arc;
use tokio::io::AsyncWrite;
use tokio::runtime::Handle;
use tracing::info;
use url::Url;

Expand All @@ -53,6 +54,7 @@ use crate::aws::credential::{
AwsCredential, CredentialProvider, InstanceCredentialProvider,
StaticCredentialProvider, WebIdentityProvider,
};
use crate::client::retry::ClientConfig;
use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart};
use crate::util::str_is_truthy;
use crate::{
Expand Down Expand Up @@ -414,8 +416,8 @@ pub struct AmazonS3Builder {
token: Option<String>,
/// Url
url: Option<String>,
/// Retry config
retry_config: RetryConfig,
/// Client config
client_config: ClientConfig,
/// When set to true, fallback to IMDSv1
imdsv1_fallback: bool,
/// When set to true, virtual hosted style request has to be used
Expand Down Expand Up @@ -897,7 +899,17 @@ impl AmazonS3Builder {

/// Set the retry configuration
pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
self.retry_config = retry_config;
self.client_config.retry = retry_config;
self
}

/// Set the tokio runtime to use to perform IO
///
/// This allows isolating IO into a dedicated [`Runtime`](tokio::runtime::Runtime) either
/// to ensure acceptable scheduling jitter in the presence of CPU-bound tasks, or to allow
/// using `object_store` outside of a tokio context
pub fn with_tokio_runtime(mut self, runtime: Handle) -> Self {
self.client_config.runtime = Some(runtime);
self
}

Expand Down Expand Up @@ -1025,7 +1037,7 @@ impl AmazonS3Builder {
role_arn,
endpoint,
client,
retry_config: self.retry_config.clone(),
client_config: self.client_config.clone(),
}) as _
}
_ => match self.profile {
Expand All @@ -1043,7 +1055,7 @@ impl AmazonS3Builder {
Box::new(InstanceCredentialProvider {
cache: Default::default(),
client: client_options.client()?,
retry_config: self.retry_config.clone(),
client_config: self.client_config.clone(),
imdsv1_fallback: self.imdsv1_fallback,
metadata_endpoint: self
.metadata_endpoint
Expand Down Expand Up @@ -1078,7 +1090,7 @@ impl AmazonS3Builder {
bucket,
bucket_endpoint,
credentials,
retry_config: self.retry_config,
client_config: self.client_config,
client_options: self.client_options,
sign_payload: !self.unsigned_payload,
checksum: self.checksum_algorithm,
Expand Down Expand Up @@ -1110,8 +1122,8 @@ fn profile_credentials(
mod tests {
use super::*;
use crate::tests::{
get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list_opts, rename_and_copy, stream_get,
dedicated_tokio, get_nonexistent_object, list_uses_directories_correctly,
list_with_delimiter, put_get_delete_list_opts, rename_and_copy, stream_get,
};
use bytes::Bytes;
use std::collections::HashMap;
Expand Down Expand Up @@ -1388,30 +1400,42 @@ mod tests {
assert!(builder.is_err());
}

#[tokio::test]
async fn s3_test() {
let config = maybe_skip_integration!();
let is_local = matches!(&config.endpoint, Some(e) if e.starts_with("http://"));
let integration = config.build().unwrap();
#[test]
fn s3_test() {
let builder = maybe_skip_integration!();
let is_local = matches!(&builder.endpoint, Some(e) if e.starts_with("http://"));

let test = |integration| async move {
// Localstack doesn't support listing with spaces https://github.com/localstack/localstack/issues/6328
put_get_delete_list_opts(&integration, is_local).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
stream_get(&integration).await;
};

let (handle, shutdown) = dedicated_tokio();

// Localstack doesn't support listing with spaces https://github.com/localstack/localstack/issues/6328
put_get_delete_list_opts(&integration, is_local).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
stream_get(&integration).await;
let integration = builder.clone().build().unwrap();
handle.block_on(test(integration));

// run integration test with unsigned payload enabled
let config = maybe_skip_integration!().with_unsigned_payload(true);
let is_local = matches!(&config.endpoint, Some(e) if e.starts_with("http://"));
let integration = config.build().unwrap();
put_get_delete_list_opts(&integration, is_local).await;
let integration = builder.clone().with_unsigned_payload(true).build().unwrap();
handle.block_on(test(integration));

// run integration test with checksum set to sha256
let config = maybe_skip_integration!().with_checksum_algorithm(Checksum::SHA256);
let is_local = matches!(&config.endpoint, Some(e) if e.starts_with("http://"));
let integration = config.build().unwrap();
put_get_delete_list_opts(&integration, is_local).await;
let integration = builder
.clone()
.with_checksum_algorithm(Checksum::SHA256)
.build()
.unwrap();
handle.block_on(test(integration));

// run integration test without tokio runtime
let integration = builder.with_tokio_runtime(handle).build().unwrap();
futures::executor::block_on(test(integration));

shutdown();
}

#[tokio::test]
Expand Down
Loading