Skip to content

Commit 7785aca

Browse files
committed
add ObjectStoreRegistry that normalizes env vars for resolve
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
1 parent c5e7a33 commit 7785aca

File tree

7 files changed

+208
-11
lines changed

7 files changed

+208
-11
lines changed

vortex-python/src/dataset.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ use crate::arrow::ToPyArrow;
3232
use crate::error::PyVortexResult;
3333
use crate::expr::PyExpr;
3434
use crate::install_module;
35-
use crate::object_store_urls::ResolvedStore;
36-
use crate::object_store_urls::resolve_store;
35+
use crate::object_store::resolve::ResolvedStore;
36+
use crate::object_store::resolve::resolve_store;
3737

3838
pub(crate) fn init(py: Python, parent: &Bound<PyModule>) -> PyResult<()> {
3939
let m = PyModule::new(py, "dataset")?;

vortex-python/src/file.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ use crate::error::PyVortexResult;
3636
use crate::expr::PyExpr;
3737
use crate::install_module;
3838
use crate::iter::PyArrayIterator;
39-
use crate::object_store_urls::ResolvedStore;
40-
use crate::object_store_urls::resolve_store;
39+
use crate::object_store::resolve::ResolvedStore;
40+
use crate::object_store::resolve::resolve_store;
4141
use crate::scan::PyRepeatedScan;
4242

4343
pub(crate) fn init(py: Python, parent: &Bound<PyModule>) -> PyResult<()> {

vortex-python/src/io.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ use crate::error::PyVortexResult;
3636
use crate::expr::PyExpr;
3737
use crate::install_module;
3838
use crate::iter::PyArrayIterator;
39-
use crate::object_store_urls::ResolvedStore;
40-
use crate::object_store_urls::resolve_store;
39+
use crate::object_store::resolve::ResolvedStore;
40+
use crate::object_store::resolve::resolve_store;
4141

4242
pub(crate) fn init(py: Python, parent: &Bound<PyModule>) -> PyResult<()> {
4343
let m = PyModule::new(py, "io")?;

vortex-python/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ mod expr;
1717
mod file;
1818
mod io;
1919
mod iter;
20-
mod object_store_urls;
20+
mod object_store;
2121
mod python_repr;
2222
mod registry;
2323
pub mod scalar;
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
pub(crate) mod registry;
5+
pub(crate) mod resolve;
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Apache Software Foundation (ASF)
3+
4+
//! This file is an adapted version of the `DefaultObjectStoreRegistry` from the object_store crate,
5+
//! but modified to resolve configurations out of environment variables case-insensitively. This
6+
//! is similar to how all the `Store::from_env` builders work for the various object stores.
7+
//!
8+
//! See also <https://github.com/apache/arrow-rs-object-store/issues/529>
9+
10+
#![allow(clippy::disallowed_types)]
11+
12+
use std::collections::HashMap;
13+
use std::sync::Arc;
14+
15+
use object_store::ObjectStore;
16+
use object_store::parse_url_opts;
17+
use object_store::path::Path;
18+
use object_store::path::PathPart;
19+
use object_store::registry::ObjectStoreRegistry;
20+
use parking_lot::RwLock;
21+
use url::Url;
22+
23+
#[derive(Debug, Default)]
24+
struct PathEntry {
25+
/// Store, if defined at this path
26+
store: Option<Arc<dyn ObjectStore>>,
27+
/// Child [`PathEntry`], keyed by the next path segment in their path
28+
children: HashMap<String, Self>,
29+
}
30+
31+
impl PathEntry {
32+
/// Lookup a store based on URL path
33+
///
34+
/// Returns the store and its path segment depth
35+
fn lookup(&self, to_resolve: &Url) -> Option<(&Arc<dyn ObjectStore>, usize)> {
36+
let mut current = self;
37+
let mut ret = self.store.as_ref().map(|store| (store, 0));
38+
let mut depth = 0;
39+
// Traverse the PathEntry tree to find the longest match
40+
for segment in path_segments(to_resolve.path()) {
41+
match current.children.get(segment) {
42+
Some(e) => {
43+
current = e;
44+
depth += 1;
45+
if let Some(store) = &current.store {
46+
ret = Some((store, depth))
47+
}
48+
}
49+
None => break,
50+
}
51+
}
52+
ret
53+
}
54+
}
55+
56+
/// An implementation of the [`ObjectStoreRegistry`] that normalizes environment variables
57+
/// before doing lookups.
58+
#[derive(Debug, Default)]
59+
pub(crate) struct Registry {
60+
/// Mapping from [`url_key`] to [`PathEntry`]
61+
map: RwLock<HashMap<String, PathEntry>>,
62+
}
63+
64+
impl ObjectStoreRegistry for Registry {
65+
fn register(&self, url: Url, store: Arc<dyn ObjectStore>) -> Option<Arc<dyn ObjectStore>> {
66+
let mut map = self.map.write();
67+
let key = url_key(&url);
68+
let mut entry = map.entry(key.to_string()).or_default();
69+
70+
for segment in path_segments(url.path()) {
71+
entry = entry.children.entry(segment.to_string()).or_default();
72+
}
73+
entry.store.replace(store)
74+
}
75+
76+
fn resolve(&self, to_resolve: &Url) -> object_store::Result<(Arc<dyn ObjectStore>, Path)> {
77+
let key = url_key(to_resolve);
78+
{
79+
let map = self.map.read();
80+
81+
if let Some((store, depth)) = map.get(key).and_then(|entry| entry.lookup(to_resolve)) {
82+
let path = path_suffix(to_resolve, depth)?;
83+
return Ok((Arc::clone(store), path));
84+
}
85+
}
86+
87+
let normalized_env = std::env::vars().map(|(k, v)| (k.to_ascii_lowercase(), v));
88+
89+
if let Ok((store, path)) = parse_url_opts(to_resolve, normalized_env) {
90+
let depth = num_segments(to_resolve.path()) - num_segments(path.as_ref());
91+
92+
let mut map = self.map.write();
93+
let mut entry = map.entry(key.to_string()).or_default();
94+
for segment in path_segments(to_resolve.path()).take(depth) {
95+
entry = entry.children.entry(segment.to_string()).or_default();
96+
}
97+
let store = Arc::clone(match &entry.store {
98+
None => entry.store.insert(Arc::from(store)),
99+
Some(x) => x, // Racing creation - use existing
100+
});
101+
102+
let path = path_suffix(to_resolve, depth)?;
103+
return Ok((store, path));
104+
}
105+
106+
Err(object_store::Error::Generic {
107+
store: "ObjectStoreRegistry",
108+
source: "URL could not be resolved".into(),
109+
})
110+
}
111+
}
112+
113+
/// Extracts the scheme and authority of a URL (components before the Path)
114+
fn url_key(url: &Url) -> &str {
115+
&url[..url::Position::AfterPort]
116+
}
117+
118+
/// Returns the non-empty segments of a path
119+
///
120+
/// Note: We don't use [`Url::path_segments`] as we only want non-empty paths
121+
fn path_segments(s: &str) -> impl Iterator<Item = &str> {
122+
s.split('/').filter(|x| !x.is_empty())
123+
}
124+
125+
/// Returns the number of non-empty path segments in a path
126+
fn num_segments(s: &str) -> usize {
127+
path_segments(s).count()
128+
}
129+
130+
/// Returns the path of `url` skipping the first `depth` segments
131+
fn path_suffix(url: &Url, depth: usize) -> Result<Path, object_store::Error> {
132+
let segments = path_segments(url.path()).skip(depth);
133+
let path = segments
134+
.map(PathPart::parse)
135+
.collect::<Result<_, _>>()
136+
.map_err(|e| object_store::Error::Generic {
137+
store: "ObjectStoreRegistry",
138+
source: Box::new(e),
139+
})?;
140+
Ok(path)
141+
}
142+
143+
#[cfg(test)]
144+
mod tests {
145+
use std::fmt::Write;
146+
147+
use object_store::registry::ObjectStoreRegistry;
148+
use url::Url;
149+
150+
use crate::object_store::registry::Registry;
151+
152+
fn with_var<F>(key: &str, value: &str, func: F)
153+
where
154+
F: FnOnce(),
155+
{
156+
let old_val = std::env::var(key).ok();
157+
158+
// SAFETY: these unit tests run single-threaded.
159+
unsafe { std::env::set_var(key, value) };
160+
161+
func();
162+
163+
// Set the variable back to its original value
164+
match old_val {
165+
None => {
166+
unsafe { std::env::remove_var(key) };
167+
}
168+
Some(val) => {
169+
unsafe { std::env::set_var(key, val) };
170+
}
171+
}
172+
}
173+
174+
#[test]
175+
#[allow(clippy::use_debug)]
176+
fn test_resolve_url() {
177+
with_var("AWS_REGION", "us-east-3", || {
178+
let registry = Registry::default();
179+
let (store, _) = registry
180+
.resolve(&Url::parse("s3://my-bucket/test").unwrap())
181+
.unwrap();
182+
183+
// NOTE(aduffy): object_store doesn't let us downcast stores, the only way to verify
184+
// that a configuration was added was to validate that it ends up in the Debug
185+
// output :/
186+
let mut debug_str = String::new();
187+
write!(&mut debug_str, "{store:?}").unwrap();
188+
189+
assert!(debug_str.contains("us-east-3"));
190+
});
191+
}
192+
}
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ use std::sync::LazyLock;
77

88
use object_store::ObjectStore;
99
use object_store::path::Path;
10-
use object_store::registry::DefaultObjectStoreRegistry;
1110
use object_store::registry::ObjectStoreRegistry;
1211
use url::Url;
1312
use vortex::error::VortexResult;
1413

15-
static REGISTRY: LazyLock<DefaultObjectStoreRegistry> =
16-
LazyLock::new(DefaultObjectStoreRegistry::new);
14+
use crate::object_store::registry::Registry;
15+
16+
static REGISTRY: LazyLock<Registry> = LazyLock::new(Registry::default);
1717

1818
/// Resolve a path to either a local file system path, or a registered object store.
1919
///
@@ -81,7 +81,7 @@ mod test {
8181
use object_store::local::LocalFileSystem;
8282
use object_store::path::Path;
8383

84-
use crate::object_store_urls::resolve_store;
84+
use crate::object_store::resolve::resolve_store;
8585

8686
#[test]
8787
fn test_resolve() {

0 commit comments

Comments
 (0)