Skip to content

Commit 8527565

Browse files
committed
feat: spawn io with spawn service
Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com>
1 parent 3d2bbef commit 8527565

File tree

18 files changed

+162
-116
lines changed

18 files changed

+162
-116
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ arrow-ord = { version = "55" }
4343
arrow-row = { version = "55" }
4444
arrow-schema = { version = "55" }
4545
arrow-select = { version = "55" }
46-
object_store = { version = "0.12.0" }
46+
object_store = { version = "0.12.1" }
4747
parquet = { version = "55" }
4848

4949
# datafusion

crates/aws/src/storage.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ use deltalake_core::{DeltaResult, DeltaTableError, ObjectStoreError, Path};
1818
use futures::stream::BoxStream;
1919
use futures::Future;
2020
use object_store::aws::AmazonS3;
21+
use object_store::client::SpawnedReqwestConnector;
2122
use object_store::RetryConfig;
23+
use tokio::runtime::Handle;
2224
use tracing::log::*;
2325
use url::Url;
2426

@@ -39,13 +41,19 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
3941
url: &Url,
4042
storage_options: &HashMap<String, String>,
4143
retry: &RetryConfig,
44+
handle: Option<Handle>,
4245
) -> DeltaResult<(ObjectStoreRef, Path)> {
4346
let options = self.with_env_s3(storage_options);
4447

4548
// All S3-likes should start their builder the same way
4649
let mut builder = AmazonS3Builder::new()
4750
.with_url(url.to_string())
4851
.with_retry(retry.clone());
52+
53+
if let Some(handle) = handle {
54+
builder = builder.with_http_connector(SpawnedReqwestConnector::new(handle));
55+
}
56+
4957
for (key, value) in options.iter() {
5058
if let Ok(key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()) {
5159
builder = builder.with_config(key, value.clone());

crates/aws/tests/integration_s3_dynamodb.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ async fn test_create_s3_table() -> TestResult<()> {
107107
deltalake_aws::constants::AWS_FORCE_CREDENTIAL_LOAD.into() => "true".into(),
108108
deltalake_aws::constants::AWS_ENDPOINT_URL.into() => "http://localhost:4566".into(),
109109
};
110-
let log_store = logstore_for(Url::parse(&table_uri)?, storage_options, None)?;
110+
let storage_config = StorageConfig::parse_options(storage_options)?;
111+
let log_store = logstore_for(Url::parse(&table_uri)?, storage_config)?;
111112

112113
let payload = PutPayload::from_static(b"test-drivin");
113114
let _put = log_store

crates/azure/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ use deltalake_core::logstore::{
88
};
99
use deltalake_core::{DeltaResult, DeltaTableError, Path};
1010
use object_store::azure::{AzureConfigKey, MicrosoftAzureBuilder};
11+
use object_store::client::SpawnedReqwestConnector;
1112
use object_store::{ObjectStoreScheme, RetryConfig};
13+
use tokio::runtime::Handle;
1214
use url::Url;
1315

1416
mod config;
@@ -40,12 +42,18 @@ impl ObjectStoreFactory for AzureFactory {
4042
url: &Url,
4143
options: &HashMap<String, String>,
4244
retry: &RetryConfig,
45+
handle: Option<Handle>,
4346
) -> DeltaResult<(ObjectStoreRef, Path)> {
4447
let config = config::AzureConfigHelper::try_new(options.as_azure_options())?.build()?;
4548

4649
let mut builder = MicrosoftAzureBuilder::new()
4750
.with_url(url.to_string())
4851
.with_retry(retry.clone());
52+
53+
if let Some(handle) = handle {
54+
builder = builder.with_http_connector(SpawnedReqwestConnector::new(handle));
55+
}
56+
4957
for (key, value) in config.iter() {
5058
builder = builder.with_config(*key, value.clone());
5159
}

crates/catalog-unity/src/lib.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::collections::HashMap;
1717
use std::future::Future;
1818
use std::str::FromStr;
1919
use std::sync::Arc;
20+
use tokio::runtime::Handle;
2021

2122
use crate::credential::{
2223
AzureCliCredential, ClientSecretOAuthProvider, CredentialProvider, WorkspaceOAuthProvider,
@@ -840,6 +841,7 @@ impl ObjectStoreFactory for UnityCatalogFactory {
840841
table_uri: &Url,
841842
options: &HashMap<String, String>,
842843
_retry: &RetryConfig,
844+
handle: Option<Handle>,
843845
) -> DeltaResult<(ObjectStoreRef, Path)> {
844846
let (table_path, temp_creds) = UnityCatalogBuilder::execute_uc_future(
845847
UnityCatalogBuilder::get_uc_location_and_token(table_uri.as_str()),
@@ -850,8 +852,11 @@ impl ObjectStoreFactory for UnityCatalogFactory {
850852

851853
// TODO(roeap): we should not have to go through the table here.
852854
// ideally we just create the right storage ...
853-
let mut builder =
854-
DeltaTableBuilder::from_uri(&table_path).with_io_runtime(IORuntime::default());
855+
let mut builder = DeltaTableBuilder::from_uri(&table_path);
856+
857+
if let Some(handle) = handle {
858+
builder = builder.with_io_runtime(IORuntime::RT(handle));
859+
}
855860
if !storage_options.is_empty() {
856861
builder = builder.with_storage_options(storage_options.clone());
857862
}

crates/core/src/logstore/config.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,9 @@ use std::collections::HashMap;
1010

1111
use ::object_store::RetryConfig;
1212
use object_store::{path::Path, prefix::PrefixStore, ObjectStore, ObjectStoreScheme};
13-
use tokio::runtime::Handle;
1413

15-
use super::storage::runtime::RuntimeConfig;
1614
use super::storage::LimitConfig;
15+
use super::{storage::runtime::RuntimeConfig, IORuntime};
1716
use crate::{DeltaResult, DeltaTableError};
1817

1918
pub trait TryUpdateKey: Default {
@@ -91,8 +90,9 @@ where
9190
pub struct StorageConfig {
9291
/// Runtime configuration.
9392
///
94-
/// Configuration to set up a dedicated IO runtime to execute IO related operations.
95-
pub runtime: Option<RuntimeConfig>,
93+
/// Configuration to set up a dedicated IO runtime to execute IO related operations or
94+
/// dedicated handle.
95+
pub runtime: Option<IORuntime>,
9696

9797
pub retry: ::object_store::RetryConfig,
9898

@@ -119,21 +119,12 @@ impl StorageConfig {
119119
/// Depending on the configuration, the following layers may be added:
120120
/// - Retry layer: Adds retry logic to the object store.
121121
/// - Limit layer: Limits the number of concurrent requests to the object store.
122-
/// - Runtime layer: Executes IO related operations on a dedicated runtime.
123122
pub fn decorate_store<T: ObjectStore + Clone>(
124123
&self,
125124
store: T,
126125
table_root: &url::Url,
127-
handle: Option<Handle>,
128126
) -> DeltaResult<Box<dyn ObjectStore>> {
129-
let inner = if let Some(runtime) = &self.runtime {
130-
Box::new(runtime.decorate(store, handle)) as Box<dyn ObjectStore>
131-
} else {
132-
Box::new(store) as Box<dyn ObjectStore>
133-
};
134-
135-
let inner = Self::decorate_prefix(inner, table_root)?;
136-
127+
let inner = Self::decorate_prefix(store, table_root)?;
137128
Ok(inner)
138129
}
139130

@@ -169,7 +160,9 @@ where
169160
};
170161

171162
let result = ParseResult::<RuntimeConfig>::from_iter(&config.raw);
172-
config.runtime = (!result.is_default).then_some(result.config);
163+
if let Some(runtime_config) = (!result.is_default).then_some(result.config) {
164+
config.runtime = Some(IORuntime::Config(runtime_config));
165+
};
173166

174167
let result = ParseResult::<LimitConfig>::from_iter(result.unparsed);
175168
config.limit = (!result.is_default).then_some(result.config);
@@ -218,7 +211,7 @@ impl StorageConfig {
218211
let (runtime, remainder): (RuntimeConfig, _) = try_parse_impl(&props.raw)?;
219212
// NOTE: we only want to assign an actual runtime config we consumed an option
220213
if props.raw.len() > remainder.len() {
221-
props.runtime = Some(runtime);
214+
props.runtime = Some(IORuntime::Config(runtime));
222215
}
223216

224217
let result = ParseResult::<LimitConfig>::from_iter(remainder);
@@ -235,6 +228,12 @@ impl StorageConfig {
235228
props.unknown_properties = remainder;
236229
Ok(props)
237230
}
231+
232+
// Provide an IO Runtime directly
233+
pub fn with_io_runtime(mut self, rt: IORuntime) -> Self {
234+
self.runtime = Some(rt);
235+
self
236+
}
238237
}
239238

240239
pub(super) fn try_parse_impl<T, K, V, I>(options: I) -> DeltaResult<(T, HashMap<String, String>)>

crates/core/src/logstore/factories.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ use std::{
44
};
55

66
use dashmap::DashMap;
7-
use object_store::path::Path;
87
use object_store::RetryConfig;
8+
use object_store::{path::Path, DynObjectStore};
9+
use tokio::runtime::Handle;
910
use url::Url;
1011

11-
use super::{default_logstore, LogStore, ObjectStoreRef, StorageConfig};
12+
use super::{default_logstore, DeltaIOStorageBackend, LogStore, ObjectStoreRef, StorageConfig};
1213
use crate::{DeltaResult, DeltaTableError};
1314

1415
/// Factory registry to manage [`ObjectStoreFactory`] instances
@@ -29,6 +30,7 @@ pub trait ObjectStoreFactory: Send + Sync {
2930
url: &Url,
3031
options: &HashMap<String, String>,
3132
retry: &RetryConfig,
33+
handle: Option<Handle>,
3234
) -> DeltaResult<(ObjectStoreRef, Path)>;
3335
}
3436

@@ -41,8 +43,14 @@ impl ObjectStoreFactory for DefaultObjectStoreFactory {
4143
url: &Url,
4244
options: &HashMap<String, String>,
4345
_retry: &RetryConfig,
46+
handle: Option<Handle>,
4447
) -> DeltaResult<(ObjectStoreRef, Path)> {
45-
default_parse_url_opts(url, options)
48+
let (mut store, path) = default_parse_url_opts(url, options)?;
49+
50+
if let Some(handle) = handle {
51+
store = Arc::new(DeltaIOStorageBackend::new(store, handle)) as Arc<DynObjectStore>;
52+
}
53+
Ok((store, path))
4654
}
4755
}
4856

@@ -85,8 +93,8 @@ where
8593
let storage_config = StorageConfig::parse_options(options)?;
8694
if let Some(factory) = object_store_factories().get(&scheme) {
8795
let (store, _prefix) =
88-
factory.parse_url_opts(url, &storage_config.raw, &storage_config.retry)?;
89-
let store = storage_config.decorate_store(store, url, None)?;
96+
factory.parse_url_opts(url, &storage_config.raw, &storage_config.retry, None)?;
97+
let store = storage_config.decorate_store(store, url)?;
9098
Ok(Arc::new(store))
9199
} else {
92100
Err(DeltaTableError::InvalidTableLocation(url.clone().into()))

0 commit comments

Comments
 (0)