Skip to content

Commit f8ff82a

Browse files
authored
feat: Adds Instrumented Object Store Registry to datafusion-cli (#17953)
* feat: Adds Instrumented Object Store Registry to datafusion-cli - Adds a new Object Store Registry wrapper to datafusion-cli to support interacting with instrumented object store instances - Adds basic tests for the new registry wrapper * Adds doc comments to new public types and methods
1 parent 3b37ae0 commit f8ff82a

File tree

3 files changed

+81
-0
lines changed

3 files changed

+81
-0
lines changed

datafusion-cli/src/main.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ use datafusion::execution::context::SessionConfig;
2727
use datafusion::execution::memory_pool::{
2828
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
2929
};
30+
use datafusion::execution::object_store::DefaultObjectStoreRegistry;
3031
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
3132
use datafusion::logical_expr::ExplainFormat;
3233
use datafusion::prelude::SessionContext;
3334
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
3435
use datafusion_cli::functions::{MetadataCacheFunc, ParquetMetadataFunc};
36+
use datafusion_cli::object_storage::instrumented::InstrumentedObjectStoreRegistry;
3537
use datafusion_cli::{
3638
exec,
3739
pool_type::PoolType,
@@ -206,6 +208,11 @@ async fn main_inner() -> Result<()> {
206208
rt_builder = rt_builder.with_disk_manager_builder(builder);
207209
}
208210

211+
let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new(Arc::new(
212+
DefaultObjectStoreRegistry::new(),
213+
)));
214+
rt_builder = rt_builder.with_object_store_registry(instrumented_registry.clone());
215+
209216
let runtime_env = rt_builder.build_arc()?;
210217

211218
// enable dynamic file query

datafusion-cli/src/object_storage.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
pub mod instrumented;
19+
1820
use async_trait::async_trait;
1921
use aws_config::BehaviorVersion;
2022
use aws_credential_types::provider::{
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use datafusion::execution::object_store::ObjectStoreRegistry;
21+
use object_store::ObjectStore;
22+
use url::Url;
23+
24+
/// Provides access to wrapped [`ObjectStore`] instances that record requests for reporting
25+
#[derive(Debug)]
26+
pub struct InstrumentedObjectStoreRegistry {
27+
inner: Arc<dyn ObjectStoreRegistry>,
28+
}
29+
30+
impl InstrumentedObjectStoreRegistry {
31+
/// Returns a new [`InstrumentedObjectStoreRegistry`] that wraps the provided
32+
/// [`ObjectStoreRegistry`]
33+
pub fn new(registry: Arc<dyn ObjectStoreRegistry>) -> Self {
34+
Self { inner: registry }
35+
}
36+
}
37+
38+
impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
39+
fn register_store(
40+
&self,
41+
url: &Url,
42+
store: Arc<dyn ObjectStore>,
43+
) -> Option<Arc<dyn ObjectStore>> {
44+
self.inner.register_store(url, store)
45+
}
46+
47+
fn get_store(&self, url: &Url) -> datafusion::common::Result<Arc<dyn ObjectStore>> {
48+
self.inner.get_store(url)
49+
}
50+
}
51+
52+
#[cfg(test)]
53+
mod tests {
54+
use datafusion::execution::object_store::DefaultObjectStoreRegistry;
55+
56+
use super::*;
57+
58+
#[test]
59+
fn instrumented_registry() {
60+
let reg = Arc::new(InstrumentedObjectStoreRegistry::new(Arc::new(
61+
DefaultObjectStoreRegistry::new(),
62+
)));
63+
let store = object_store::memory::InMemory::new();
64+
65+
let url = "mem://test".parse().unwrap();
66+
let registered = reg.register_store(&url, Arc::new(store));
67+
assert!(registered.is_none());
68+
69+
let fetched = reg.get_store(&url);
70+
assert!(fetched.is_ok())
71+
}
72+
}

0 commit comments

Comments
 (0)