Skip to content

Commit dd46ea3

Browse files
rebase
1 parent 5a05abf commit dd46ea3

File tree

25 files changed

+1082
-241
lines changed

25 files changed

+1082
-241
lines changed

crates/polars-plan/src/dsl/builder_dsl.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl DslBuilder {
3434
})?;
3535

3636
Ok(DslPlan::Scan {
37-
sources: ScanSources::Buffers(Arc::default()),
37+
sources: ScanSources::default(),
3838
file_info: Some(FileInfo {
3939
schema: schema.clone(),
4040
reader_schema: Some(either::Either::Right(schema)),
@@ -107,6 +107,25 @@ impl DslBuilder {
107107
.into())
108108
}
109109

110+
#[cfg(feature = "python")]
111+
pub fn scan_python_dataset(
112+
dataset_object: polars_utils::python_function::PythonObject,
113+
) -> DslBuilder {
114+
use super::python_dataset::PythonDatasetProvider;
115+
116+
DslPlan::Scan {
117+
sources: ScanSources::default(),
118+
file_info: None,
119+
unified_scan_args: Default::default(),
120+
scan_type: Box::new(FileScan::PythonDataset {
121+
dataset_object: Arc::new(PythonDatasetProvider::new(dataset_object)),
122+
cached_ir: Default::default(),
123+
}),
124+
cached_ir: Default::default(),
125+
}
126+
.into()
127+
}
128+
110129
pub fn cache(self) -> Self {
111130
let input = Arc::new(self.0);
112131
let id = input.as_ref() as *const DslPlan as usize;

crates/polars-plan/src/dsl/file_scan.rs renamed to crates/polars-plan/src/dsl/file_scan/mod.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::hash::Hash;
2+
use std::sync::Mutex;
23

34
use polars_io::cloud::CloudOptions;
45
#[cfg(feature = "csv")]
@@ -17,6 +18,11 @@ use strum_macros::IntoStaticStr;
1718

1819
use super::*;
1920

21+
#[cfg(feature = "python")]
22+
pub mod python_dataset;
23+
#[cfg(feature = "python")]
24+
pub use python_dataset::{DATASET_PROVIDER_VTABLE, PythonDatasetProviderVTable};
25+
2026
bitflags::bitflags! {
2127
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2228
pub struct ScanFlags : u32 {
@@ -48,6 +54,14 @@ pub enum FileScan {
4854
metadata: Option<Arc<arrow::io::ipc::read::FileMetadata>>,
4955
},
5056

57+
#[cfg(feature = "python")]
58+
PythonDataset {
59+
dataset_object: Arc<python_dataset::PythonDatasetProvider>,
60+
61+
#[cfg_attr(feature = "serde", serde(skip, default))]
62+
cached_ir: Arc<Mutex<Option<ExpandedDataset>>>,
63+
},
64+
5165
#[cfg_attr(feature = "serde", serde(skip))]
5266
Anonymous {
5367
options: Arc<AnonymousScanOptions>,
@@ -199,10 +213,20 @@ mod _file_scan_eq_hash {
199213
metadata: Option<usize>,
200214
},
201215

216+
#[cfg(feature = "python")]
217+
PythonDataset {
218+
dataset_object: usize,
219+
cached_ir: usize,
220+
},
221+
202222
Anonymous {
203223
options: &'a crate::dsl::AnonymousScanOptions,
204224
function: usize,
205225
},
226+
227+
/// Variant to ensure the lifetime is used regardless of feature gate combination.
228+
#[expect(unused)]
229+
Phantom(&'a ()),
206230
}
207231

208232
impl<'a> From<&'a FileScan> for FileScanEqHashWrap<'a> {
@@ -226,6 +250,15 @@ mod _file_scan_eq_hash {
226250
metadata: metadata.as_ref().map(arc_as_ptr),
227251
},
228252

253+
#[cfg(feature = "python")]
254+
FileScan::PythonDataset {
255+
dataset_object,
256+
cached_ir,
257+
} => FileScanEqHashWrap::PythonDataset {
258+
dataset_object: arc_as_ptr(dataset_object),
259+
cached_ir: arc_as_ptr(cached_ir),
260+
},
261+
229262
FileScan::Anonymous { options, function } => FileScanEqHashWrap::Anonymous {
230263
options,
231264
function: arc_as_ptr(function),
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use std::fmt::Debug;
2+
use std::sync::OnceLock;
3+
4+
use polars_core::error::PolarsResult;
5+
use polars_core::schema::SchemaRef;
6+
use polars_utils::pl_str::PlSmallStr;
7+
use polars_utils::python_function::PythonObject;
8+
9+
use crate::dsl::DslPlan;
10+
11+
/// This is for `polars-python` to inject so that the implementation can be done there:
12+
/// * The impls for converting from Python objects are there.
13+
pub static DATASET_PROVIDER_VTABLE: OnceLock<PythonDatasetProviderVTable> = OnceLock::new();
14+
15+
pub struct PythonDatasetProviderVTable {
16+
pub reader_name: fn(dataset_object: &PythonObject) -> PlSmallStr,
17+
18+
pub schema: fn(dataset_object: &PythonObject) -> PolarsResult<SchemaRef>,
19+
20+
#[expect(clippy::type_complexity)]
21+
pub to_dataset_scan: fn(
22+
dataset_object: &PythonObject,
23+
limit: Option<usize>,
24+
projection: Option<&[PlSmallStr]>,
25+
) -> PolarsResult<DslPlan>,
26+
}
27+
28+
pub fn dataset_provider_vtable() -> Result<&'static PythonDatasetProviderVTable, &'static str> {
29+
DATASET_PROVIDER_VTABLE
30+
.get()
31+
.ok_or("DATASET_PROVIDER_VTABLE not initialized")
32+
}
33+
34+
/// Currently intended only for Iceberg support
35+
#[derive(Debug)]
36+
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
37+
pub struct PythonDatasetProvider {
38+
dataset_object: PythonObject,
39+
}
40+
41+
impl PythonDatasetProvider {
42+
pub fn new(dataset_object: PythonObject) -> Self {
43+
Self { dataset_object }
44+
}
45+
46+
pub fn reader_name(&self) -> PlSmallStr {
47+
(dataset_provider_vtable().unwrap().reader_name)(&self.dataset_object)
48+
}
49+
50+
pub fn schema(&self) -> PolarsResult<SchemaRef> {
51+
(dataset_provider_vtable().unwrap().schema)(&self.dataset_object)
52+
}
53+
54+
pub fn to_dataset_scan(
55+
&self,
56+
limit: Option<usize>,
57+
projection: Option<&[PlSmallStr]>,
58+
) -> PolarsResult<DslPlan> {
59+
(dataset_provider_vtable().unwrap().to_dataset_scan)(
60+
&self.dataset_object,
61+
limit,
62+
projection,
63+
)
64+
}
65+
}

crates/polars-plan/src/dsl/python_dsl/source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub struct PythonOptionsDsl {
2323
}
2424

2525
impl PythonOptionsDsl {
26-
pub(crate) fn get_schema(&self) -> PolarsResult<SchemaRef> {
26+
pub fn get_schema(&self) -> PolarsResult<SchemaRef> {
2727
match self.schema_fn.as_ref().expect("should be set").as_ref() {
2828
Either::Left(func) => Python::with_gil(|py| {
2929
let schema = func

crates/polars-plan/src/plans/conversion/dsl_to_ir.rs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,12 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
180180
FileScan::NDJson { .. } => {
181181
sources.expand_paths(unified_scan_args, cloud_options)?
182182
},
183+
#[cfg(feature = "python")]
184+
FileScan::PythonDataset { .. } => {
185+
// There are a lot of places that short-circuit if the paths is empty,
186+
// so we just give a dummy path here.
187+
ScanSources::Paths(Arc::from(["dummy".into()]))
188+
},
183189
FileScan::Anonymous { .. } => sources,
184190
};
185191

@@ -244,6 +250,20 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
244250
cloud_options,
245251
)
246252
.map_err(|e| e.context(failed_here!(ndjson scan)))?,
253+
#[cfg(feature = "python")]
254+
FileScan::PythonDataset { dataset_object, .. } => {
255+
if crate::dsl::DATASET_PROVIDER_VTABLE.get().is_none() {
256+
polars_bail!(ComputeError: "DATASET_PROVIDER_VTABLE (python) not initialized")
257+
}
258+
259+
let schema = dataset_object.schema()?;
260+
261+
FileInfo {
262+
schema: schema.clone(),
263+
reader_schema: Some(either::Either::Right(schema)),
264+
row_estimation: (None, usize::MAX),
265+
}
266+
},
247267
FileScan::Anonymous { .. } => {
248268
file_info.expect("FileInfo should be set for AnonymousScan")
249269
},
@@ -292,22 +312,6 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
292312
file_info.update_schema_with_hive_schema(hive_schema);
293313
}
294314

295-
unified_scan_args.include_file_paths = unified_scan_args
296-
.include_file_paths
297-
.as_ref()
298-
.filter(|_| match &*scan_type {
299-
#[cfg(feature = "parquet")]
300-
FileScan::Parquet { .. } => true,
301-
#[cfg(feature = "ipc")]
302-
FileScan::Ipc { .. } => true,
303-
#[cfg(feature = "csv")]
304-
FileScan::Csv { .. } => true,
305-
#[cfg(feature = "json")]
306-
FileScan::NDJson { .. } => true,
307-
FileScan::Anonymous { .. } => false,
308-
})
309-
.cloned();
310-
311315
if let Some(ref file_path_col) = unified_scan_args.include_file_paths {
312316
let schema = Arc::make_mut(&mut file_info.schema);
313317

crates/polars-plan/src/plans/functions/count.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ pub fn count_rows(
5656
),
5757
#[cfg(feature = "json")]
5858
FileScan::NDJson { options } => count_rows_ndjson(sources, cloud_options),
59+
#[cfg(feature = "python")]
60+
FileScan::PythonDataset { .. } => unreachable!(),
5961
FileScan::Anonymous { .. } => {
6062
unreachable!()
6163
},

0 commit comments

Comments
 (0)