-
Notifications
You must be signed in to change notification settings - Fork 0
251: feat(rust/sedona-datasource): Implement generic RecordBatchReader-based format #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughIntroduces a new Rust crate Changes
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| let mut self_clone = self.clone(); | ||
| Ok(Box::pin(async move { | ||
| self_clone.args.src.meta.replace(file_meta.object_meta); | ||
| self_clone.args.src.range = file_meta.range; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Range-based repartitioning always errors on open
The ExternalFileOpener::open method returns an error when file_meta.range.is_some(), then assigns file_meta.range to self_clone.args.src.range. This logic is broken: the assignment at line 370 always assigns None since any Some value triggers the error at line 362. This prevents SupportsRepartition::ByRange from working, even though the codebase implements range-based repartitioning in the repartitioned method. When repartitioning is enabled, files with ranges will always error instead of being processed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:good-but-wont-fix; category:bug; feedback:The Bugbot AI reviewer is correct that the check whether partitioning is supported seems to be a copy from some other file (the error message suggests so). The check should be removed to allow opening partitioned files by range.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review completed. 2 suggestions posted.
Comment augment review to trigger a new review.
| let statistics = &self.projected_statistics; | ||
| Ok(statistics | ||
| .clone() | ||
| .expect("projected_statistics must be set")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
statistics() will panic if projected_statistics hasn’t been set; consider handling the unset case (e.g., return unknown stats) instead of panicking, as callers may query statistics before with_statistics() is applied.
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:bug; feedback:The Augment AI reviewer is correct that aborting the application (due to a panic) should be avoided. It would be better to return Option instead.
| _conf: FileSinkConfig, | ||
| _order_requirements: Option<LexRequirement>, | ||
| ) -> Result<Arc<dyn ExecutionPlan>> { | ||
| not_impl_err!("writing not yet supported for SimpleSedonaFormat") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error text mentions "SimpleSedonaFormat", which doesn’t match this type; aligning the message with the current format name will avoid confusion for users. (Also applies to the "SimpleOpener" message below.)
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:bug; feedback:The Augment AI reviewer is correct that the error message seems to be copied from somewhere else. It should be corrected to prevent misleading the developer who may have to debug the issue.
Pull Request Review: sedona-datasource crateOverviewThis PR introduces a new sedona-datasource crate that provides a generic RecordBatchReader-backed DataFusion file format. The implementation creates a bridge between custom file formats and DataFusion ecosystem through the ExternalFormatSpec trait. Code Quality and Best PracticesStrengths
Potential Bugs and IssuesCritical Issues1. expect() panic in production code (format.rs:310-314) 2. Wrong struct name in error message (format.rs:361-365) 3. Inconsistent error message (format.rs:192) Medium Priority Issues4. Heuristic URL resolution (spec.rs:170-192) 5. Missing path validation (provider.rs:52-54) Performance ConsiderationsGood practices: Concurrent schema inference with buffered(), efficient Arc usage, deterministic schema sorting. Potential optimizations: Filter pushdown always returns PushedDown::No (intentional but consider opt-in), statistics cloning on every call. Security ConcernsNo major security issues found. Good input validation and error handling (except expect()). Proper object store integration. Recommendations: Consider size limits or timeouts for file operations. Test CoverageExcellent test coverage including format registration, glob patterns, projections, filters, listing tables, options, and error cases. Suggested additions: Concurrent access tests, large file tests, malformed data tests, empty file tests, range partitioning tests, statistics inference tests. Additional ObservationsGood code organization with clear separation of concerns. Appropriate dependencies. Consider whether External is the best naming prefix vs Custom or Generic. SummaryWell-designed and well-implemented feature with high quality code and good test coverage. Must fix before merge:
Recommended improvements:
Overall: Strong work! Once critical issues are addressed, this will be a solid addition to the codebase. |
|
Findings
Questions
Change Summary
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (6)
Cargo.toml(1 hunks)rust/sedona-datasource/Cargo.toml(1 hunks)rust/sedona-datasource/src/format.rs(1 hunks)rust/sedona-datasource/src/lib.rs(1 hunks)rust/sedona-datasource/src/provider.rs(1 hunks)rust/sedona-datasource/src/spec.rs(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
rust/sedona-datasource/src/lib.rs (1)
sedona-cli/src/command.rs (1)
format(186-186)
rust/sedona-datasource/src/provider.rs (1)
rust/sedona-datasource/src/format.rs (3)
new(57-59)new(96-98)new(212-222)
rust/sedona-datasource/src/format.rs (2)
rust/sedona-datasource/src/spec.rs (5)
infer_schema(45-45)infer_stats(94-96)extension(55-57)with_options(63-66)open_reader(51-52)rust/sedona-datasource/src/provider.rs (1)
external_listing_table(36-76)
rust/sedona-datasource/src/spec.rs (1)
rust/sedona-datasource/src/format.rs (7)
infer_schema(122-155)infer_schema(473-481)open_reader(492-525)extension(453-455)with_options(457-471)infer_stats(157-175)infer_stats(483-490)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: claude-review
- GitHub Check: codex
| fn get_ext_with_compression( | ||
| &self, | ||
| _file_compression_type: &FileCompressionType, | ||
| ) -> Result<String> { | ||
| not_impl_err!("extension with compression type") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement get_ext_with_compression for uncompressed reads
DataFusion’s FileFormat contract expects formats to return a valid suffix when asked for the extension under UNCOMPRESSED; custom formats upstream do this and only error once a real compression codec is requested. Returning not_impl_err! even for the default path means callers that just need the plain extension will now fail. Please return self.get_ext() for FileCompressionType::UNCOMPRESSED and keep erroring for codecs you truly don’t support. (apache.googlesource.com)
fn get_ext_with_compression(
&self,
_file_compression_type: &FileCompressionType,
) -> Result<String> {
- not_impl_err!("extension with compression type")
+ match _file_compression_type {
+ FileCompressionType::UNCOMPRESSED => Ok(self.get_ext()),
+ other => not_impl_err!(format!(
+ "compression type {other:?} is not supported for {}",
+ self.get_ext()
+ )),
+ }
}🤖 Prompt for AI Agents
In rust/sedona-datasource/src/format.rs around lines 111 to 116, the
get_ext_with_compression currently always returns not_impl_err! which breaks
callers that request the uncompressed extension; change the method to check if
_file_compression_type is FileCompressionType::UNCOMPRESSED and in that case
return self.get_ext(), otherwise continue returning an error for unsupported
compression codecs so existing unsupported-codec behavior remains unchanged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:bug; feedback:The CodeRabbit AI reviewer is correct! The UNCOMPESSED case could be supported by delegating to get_ext() method. This would prevent an error if the file is not really compressed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martin-augment Thanks! Agreed—handling UNCOMPRESSED by delegating to get_ext() avoids unnecessary failures while keeping unsupported codecs as NotImplemented. Here’s the minimal change and focused tests.
Rust patch (rust/sedona-datasource/src/format.rs):
fn get_ext_with_compression(
&self,
file_compression_type: &FileCompressionType,
) -> Result<String> {
match file_compression_type {
FileCompressionType::UNCOMPRESSED => Ok(self.get_ext()),
other => not_impl_err!(
"compression type {:?} is not supported for {}",
other,
self.get_ext()
),
}
}Unit tests (append to rust/sedona-datasource/src/format.rs):
#[cfg(test)]
mod compression_ext_tests {
use super::*;
use std::{collections::HashMap, sync::Arc};
use arrow_array::RecordBatchReader;
use arrow_schema::Schema;
use async_trait::async_trait;
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion_common::{not_impl_err, Result, DataFusionError};
struct DummySpec;
#[async_trait]
impl crate::spec::ExternalFormatSpec for DummySpec {
async fn infer_schema(&self, _location: &crate::spec::Object) -> Result<Schema> {
Ok(Schema::empty())
}
async fn open_reader(
&self,
_args: &crate::spec::OpenReaderArgs,
) -> Result<Box<dyn RecordBatchReader + Send>> {
not_impl_err!("not used in this test")
}
fn extension(&self) -> &str {
"dummy"
}
fn with_options(
&self,
_options: &HashMap<String, String>,
) -> Result<Arc<dyn crate::spec::ExternalFormatSpec>> {
Ok(Arc::new(DummySpec))
}
}
#[test]
fn get_ext_with_compression_uncompressed_returns_plain_ext() {
let format = ExternalFileFormat::new(Arc::new(DummySpec));
let ext = format
.get_ext_with_compression(&FileCompressionType::UNCOMPRESSED)
.expect("should return plain extension");
assert_eq!(ext, "dummy".to_string());
}
#[test]
fn get_ext_with_compression_unsupported_codec_errors() {
let format = ExternalFileFormat::new(Arc::new(DummySpec));
let err = format
.get_ext_with_compression(&FileCompressionType::GZIP)
.unwrap_err();
assert!(matches!(err, DataFusionError::NotImplemented(_)));
}
}Notes:
- Kept unsupported codecs as NotImplemented.
- Used not_impl_err! with format arguments (no nested format! to avoid macro misuse).
If you want, I can open a follow-up issue to track potential support for specific codecs (e.g., GZIP/ZSTD).
value:useful; category:bug; feedback:The Claude AI reviewer is correct that aborting the application (due to a panic) should be avoided. It would be better to return |
value:useful; category:bug; feedback:The Claude AI reviewer is correct that the error message seems to be copied from somewhere else. It should be corrected to prevent misleading the developer who may have to debug the issue. |
value:useful; category:bug; feedback:The Claude AI reviewer is correct that the error message seems to be copied from somewhere else. It should be corrected to prevent misleading the developer who may have to debug the issue. |
value:good-but-wont-fix; category:bug; feedback:The Claude AI reviewer is correct that the way the url is extracted/detected is very fragile. But the author has documented that in a comment above the problematic line. |
value:good-but-wont-fix; category:bug; feedback:The Codex AI reviewer is correct that the check whether partitioning is supported seems to be a copy from some other file (the error message suggests so). The check should be removed to allow opening partitioned files by range. |
251: To review by AI
Note
Introduce a new
sedona-datasourcecrate providing a generic RecordBatchReader-backed DataFusion file format and listing table utilities, plus tests and workspace wiring.rust/sedona-datasourceExternalFormatSpectrait to describe external formats (schema inference, open reader, options, repartition support, stats).ExternalFormatFactory/ExternalFileFormatbridging to DataFusionFileFormat, handling schema/stats inference, projections, filter capture, and optional repartitioning.ExternalFileSourceandExternalFileOpenerto create readers per file/partition.provider::external_listing_tableto buildListingTablefrom URLs with optional extension checks and table options.Object,OpenReaderArgs,SupportsRepartition) and URL resolution logic.EchoSpecexample covering direct paths, globs, projections/filters, listing tables, options, and error cases.rust/sedona-datasourceto workspace members; updatesCargo.lockand dependencies accordingly.Written by Cursor Bugbot for commit e28d215. This will update automatically on new commits. Configure here.