Skip to content

Commit 6601959

Browse files
authored
Adds prefix filtering for table URLs (apache#18780)
## Which issue does this PR close? This is a follow-on PR spurred by this comment chain: - apache#18146 (comment) This work is associated with: - apache#17211 ## Rationale for this change The implementation prior to merging apache#18146 was capable of only listing files under a specific prefix when the known prefixes could be matched to filters. This PR re-introduces that capability, alleviating the need to list and filter every file for a table when the filters match. ## What changes are included in this PR? - Adds the ability to list files backing a table URL optionally filtered by a path prefix - Reintroduces the ability for partitioned listing tables to only list prefixes that match an input filter - Adds tests for new functionality ## Are these changes tested? Yes. There is existing coverage on many of the changes, new tests have been added, and existing integration tests have been updated to show the change in behavior. ## Are there any user-facing changes? no ## cc @alamb
1 parent 45e9352 commit 6601959

File tree

3 files changed

+83
-14
lines changed

3 files changed

+83
-14
lines changed

datafusion/catalog-listing/src/helpers.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,8 +381,14 @@ pub async fn pruned_partition_list<'a>(
381381
file_extension: &'a str,
382382
partition_cols: &'a [(String, DataType)],
383383
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
384+
let prefix = if !partition_cols.is_empty() {
385+
evaluate_partition_prefix(partition_cols, filters)
386+
} else {
387+
None
388+
};
389+
384390
let objects = table_path
385-
.list_all_files(ctx, store, file_extension)
391+
.list_prefixed_files(ctx, store, prefix, file_extension)
386392
.await?
387393
.try_filter(|object_meta| futures::future::ready(object_meta.size > 0));
388394

datafusion/core/tests/datasource/object_store_access.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ async fn query_partitioned_csv_file() {
166166
------- Object Store Request Summary -------
167167
RequestCountingObjectStore()
168168
Total Requests: 2
169-
- LIST prefix=data
169+
- LIST prefix=data/a=2
170170
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
171171
"
172172
);
@@ -220,7 +220,7 @@ async fn query_partitioned_csv_file() {
220220
------- Object Store Request Summary -------
221221
RequestCountingObjectStore()
222222
Total Requests: 2
223-
- LIST prefix=data
223+
- LIST prefix=data/a=2/b=20
224224
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
225225
"
226226
);

datafusion/datasource/src/url.rs

Lines changed: 74 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -233,27 +233,37 @@ impl ListingTableUrl {
233233
Some(stripped.split_terminator(DELIMITER))
234234
}
235235

236-
/// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
237-
pub async fn list_all_files<'a>(
236+
/// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`,
237+
/// optionally filtering by a path prefix
238+
pub async fn list_prefixed_files<'a>(
238239
&'a self,
239240
ctx: &'a dyn Session,
240241
store: &'a dyn ObjectStore,
242+
prefix: Option<Path>,
241243
file_extension: &'a str,
242244
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
243245
let exec_options = &ctx.config_options().execution;
244246
let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
245247

248+
let prefix = if let Some(prefix) = prefix {
249+
let mut p = self.prefix.parts().collect::<Vec<_>>();
250+
p.extend(prefix.parts());
251+
Path::from_iter(p.into_iter())
252+
} else {
253+
self.prefix.clone()
254+
};
255+
246256
let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
247-
list_with_cache(ctx, store, &self.prefix).await?
257+
list_with_cache(ctx, store, &prefix).await?
248258
} else {
249-
match store.head(&self.prefix).await {
259+
match store.head(&prefix).await {
250260
Ok(meta) => futures::stream::once(async { Ok(meta) })
251261
.map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
252262
.boxed(),
253263
// If the head command fails, it is likely that object doesn't exist.
254264
// Retry as though it were a prefix (aka a collection)
255265
Err(object_store::Error::NotFound { .. }) => {
256-
list_with_cache(ctx, store, &self.prefix).await?
266+
list_with_cache(ctx, store, &prefix).await?
257267
}
258268
Err(e) => return Err(e.into()),
259269
}
@@ -269,6 +279,17 @@ impl ListingTableUrl {
269279
.boxed())
270280
}
271281

282+
/// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
283+
pub async fn list_all_files<'a>(
284+
&'a self,
285+
ctx: &'a dyn Session,
286+
store: &'a dyn ObjectStore,
287+
file_extension: &'a str,
288+
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
289+
self.list_prefixed_files(ctx, store, None, file_extension)
290+
.await
291+
}
292+
272293
/// Returns this [`ListingTableUrl`] as a string
273294
pub fn as_str(&self) -> &str {
274295
self.as_ref()
@@ -306,7 +327,7 @@ impl ListingTableUrl {
306327
async fn list_with_cache<'b>(
307328
ctx: &'b dyn Session,
308329
store: &'b dyn ObjectStore,
309-
prefix: &'b Path,
330+
prefix: &Path,
310331
) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
311332
match ctx.runtime_env().cache_manager.get_list_files_cache() {
312333
None => Ok(store
@@ -701,6 +722,35 @@ mod tests {
701722
panic!("Expected PermissionDenied error");
702723
};
703724

725+
// Test prefix filtering with partition-style paths
726+
create_file(&store, "/data/a=1/file1.parquet").await;
727+
create_file(&store, "/data/a=1/b=100/file2.parquet").await;
728+
create_file(&store, "/data/a=2/b=200/file3.parquet").await;
729+
create_file(&store, "/data/a=2/b=200/file4.csv").await;
730+
731+
assert_eq!(
732+
list_prefixed_files("/data/", &store, Some(Path::from("a=1")), "parquet")
733+
.await?,
734+
vec!["data/a=1/b=100/file2.parquet", "data/a=1/file1.parquet"],
735+
);
736+
737+
assert_eq!(
738+
list_prefixed_files(
739+
"/data/",
740+
&store,
741+
Some(Path::from("a=1/b=100")),
742+
"parquet"
743+
)
744+
.await?,
745+
vec!["data/a=1/b=100/file2.parquet"],
746+
);
747+
748+
assert_eq!(
749+
list_prefixed_files("/data/", &store, Some(Path::from("a=2")), "parquet")
750+
.await?,
751+
vec!["data/a=2/b=200/file3.parquet"],
752+
);
753+
704754
Ok(())
705755
}
706756

@@ -712,27 +762,40 @@ mod tests {
712762
.expect("failed to create test file");
713763
}
714764

715-
/// Runs "list_all_files" and returns their paths
765+
/// Runs "list_prefixed_files" with no prefix to list all files and returns their paths
716766
///
717767
/// Panic's on error
718768
async fn list_all_files(
719769
url: &str,
720770
store: &dyn ObjectStore,
721771
file_extension: &str,
722772
) -> Result<Vec<String>> {
723-
try_list_all_files(url, store, file_extension).await
773+
try_list_prefixed_files(url, store, None, file_extension).await
774+
}
775+
776+
/// Runs "list_prefixed_files" and returns their paths
777+
///
778+
/// Panic's on error
779+
async fn list_prefixed_files(
780+
url: &str,
781+
store: &dyn ObjectStore,
782+
prefix: Option<Path>,
783+
file_extension: &str,
784+
) -> Result<Vec<String>> {
785+
try_list_prefixed_files(url, store, prefix, file_extension).await
724786
}
725787

726-
/// Runs "list_all_files" and returns their paths
727-
async fn try_list_all_files(
788+
/// Runs "list_prefixed_files" and returns their paths
789+
async fn try_list_prefixed_files(
728790
url: &str,
729791
store: &dyn ObjectStore,
792+
prefix: Option<Path>,
730793
file_extension: &str,
731794
) -> Result<Vec<String>> {
732795
let session = MockSession::new();
733796
let url = ListingTableUrl::parse(url)?;
734797
let files = url
735-
.list_all_files(&session, store, file_extension)
798+
.list_prefixed_files(&session, store, prefix, file_extension)
736799
.await?
737800
.try_collect::<Vec<_>>()
738801
.await?

0 commit comments

Comments
 (0)