-
-
Notifications
You must be signed in to change notification settings - Fork 2.3k
refactor: Add structure for dispatching iceberg to native scans #22405
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
scan_iceberg
until collect()
time@@ -241,56 +216,25 @@ impl SlicePushDown { | |||
mut unified_scan_args, | |||
predicate, | |||
scan_type, | |||
}, Some(state)) if predicate.is_none() && matches!(&*scan_type, FileScan::NDJson {.. }) => { | |||
unified_scan_args.pre_slice = Some(state.to_slice_enum()); | |||
}, Some(state)) if predicate.is_none() && match &*scan_type { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drive-by de-duplicate the match blocks
@@ -56,7 +56,7 @@ def test_scan_iceberg_snapshot_id(self, iceberg_path: str) -> None: | |||
|
|||
def test_scan_iceberg_snapshot_id_not_found(self, iceberg_path: str) -> None: | |||
with pytest.raises(ValueError, match="Snapshot ID not found"): | |||
pl.scan_iceberg(iceberg_path, snapshot_id=1234567890) | |||
pl.scan_iceberg(iceberg_path, snapshot_id=1234567890).collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We currently eagerly query the dataset for the snapshot ID, but this PR defers all IO operations until collection time.
Note I had to change the exception type because we are going through the Rust, I plan to change it back after #22410.
A rebase away |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #22405 +/- ##
==========================================
+ Coverage 81.05% 81.07% +0.01%
==========================================
Files 1643 1650 +7
Lines 231847 232316 +469
Branches 2720 2738 +18
==========================================
+ Hits 187935 188342 +407
- Misses 43269 43319 +50
- Partials 643 655 +12 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Updated:
|
Introduces code structure that allows us to dispatch iceberg to native parquet scans. There is also a transparent fallback to using python for scanning as we do not support some datasets at the moment due to deletion vectors / schema evolution.
Note that the native scans are disabled by default as it can fail on datasets with type changes.
Review notes:
ExpandDatasets
optimization pass that calls the file resolution after slice/predicate pushdown.Doc: Native/python dispatch override
The dispatch can be manually overridden with the following controls:
reader_overrde: 'native' | 'pyiceberg'
parameter toscan_iceberg()
POLARS_ICEBERG_READER_OVERRIDE
in the environment('native' | 'pyiceberg')
:collect
ion of the query (i.e. it is not serialized)reader_overrde
was passed toscan_iceberg()
There will be log lines in
POLARS_VERBOSE
to verify this: